瀏覽標籤:

Thread

[C#][TPL] 自定義 TaskScheuler 管理執行續

最近利用 Task 撰寫非同步的程式時碰到了許多未釐清的用法,這邊筆記一下自定義  TaskScheuler 的方法。

public class StaticThreadTaskScheduler : TaskScheduler, IDisposable
{
    private readonly List<Thread> _threads = new List<Thread>();
    private readonly BlockingCollection<Task> _tasks = new BlockingCollection<Task>();
    private readonly AutoResetEvent _lockObj = new AutoResetEvent(true);

    public new int MaximumConcurrencyLevel { get; private set; }

    public StaticThreadTaskScheduler() : this(Environment.ProcessorCount) { }

    public StaticThreadTaskScheduler(int maximumConcurrencyLevel)
    {
        MaximumConcurrencyLevel = maximumConcurrencyLevel;
        initalize();
    }

    protected override IEnumerable<Task> GetScheduledTasks()
    {
        return _tasks;
    }

    protected override void QueueTask(Task task)
    {
        if (task != null)
        {
            _lockObj.WaitOne();
            _tasks.Add(task);
            _lockObj.Set();
        }
    }

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        return false;
    }

    private void initalize()
    {
        for (int i = 0; i < MaximumConcurrencyLevel; i++)
        {
            Thread thread = new Thread(execute);
            thread.IsBackground = true;
            thread.Start();
            _threads.Add(thread);
        }
    }

    private void execute()
    {
        foreach (var task in _tasks.GetConsumingEnumerable())
        {
            TryExecuteTask(task);
        }
    }

    #region IDisposable Support

    private bool _disposedValue = false;

    protected virtual void Dispose(bool disposing)
    {
        if (!_disposedValue)
        {
            if (disposing)
            {
                _lockObj.WaitOne();
                foreach (var thread in _threads)
                {
                    thread.Join();
                }
                _threads.Clear();
                _tasks.Dispose();
                _lockObj.Dispose();
            }
            _disposedValue = true;
        }
    }

    public void Dispose()
    {
        Dispose(true);
    }

    #endregion
}

為了測試 StaticThreadTaskScheduler 是否真的如預期,我寫了一小段 Testing Code 去驗證。

class Program
{
    static StaticThreadTaskScheduler _scheduler = new StaticThreadTaskScheduler(1);
    static void Main(string[] args)
    {
        foreach (var item in Enumerable.Range(1, 100))
        {
            Task.Factory
                .StartNew(() =>
                    WorkingAsync(item),
                    CancellationToken.None,
                    TaskCreationOptions.None,
                    _scheduler)
                .Unwrap();
        }
        Console.ReadKey();
        _scheduler.Dispose();
        Console.ReadKey();
    }

    static async Task WorkingAsync(int i)
    {
        while (true)
        {
            Console.WriteLine($"Thread ID: {Thread.CurrentThread.ManagedThreadId}, Working ID: {i}");
            await
                Task.Factory
                    .StartNew(async () =>
                        {
                            await Task.Delay(1000);
                            Console.WriteLine($"Thread ID: {Thread.CurrentThread.ManagedThreadId}, Sub Working ID: {i}");
                        },
                        CancellationToken.None,
                        TaskCreationOptions.None,
                        _scheduler)
                    .Unwrap();
        }
    }
}

結果如下: