最近利用 Task 撰寫非同步的程式時碰到了許多未釐清的用法,這邊筆記一下自定義 TaskScheuler 的方法。
public class StaticThreadTaskScheduler : TaskScheduler, IDisposable { private readonly List<Thread> _threads = new List<Thread>(); private readonly BlockingCollection<Task> _tasks = new BlockingCollection<Task>(); private readonly AutoResetEvent _lockObj = new AutoResetEvent(true); public new int MaximumConcurrencyLevel { get; private set; } public StaticThreadTaskScheduler() : this(Environment.ProcessorCount) { } public StaticThreadTaskScheduler(int maximumConcurrencyLevel) { MaximumConcurrencyLevel = maximumConcurrencyLevel; initalize(); } protected override IEnumerable<Task> GetScheduledTasks() { return _tasks; } protected override void QueueTask(Task task) { if (task != null) { _lockObj.WaitOne(); _tasks.Add(task); _lockObj.Set(); } } protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { return false; } private void initalize() { for (int i = 0; i < MaximumConcurrencyLevel; i++) { Thread thread = new Thread(execute); thread.IsBackground = true; thread.Start(); _threads.Add(thread); } } private void execute() { foreach (var task in _tasks.GetConsumingEnumerable()) { TryExecuteTask(task); } } #region IDisposable Support private bool _disposedValue = false; protected virtual void Dispose(bool disposing) { if (!_disposedValue) { if (disposing) { _lockObj.WaitOne(); foreach (var thread in _threads) { thread.Join(); } _threads.Clear(); _tasks.Dispose(); _lockObj.Dispose(); } _disposedValue = true; } } public void Dispose() { Dispose(true); } #endregion }
為了測試 StaticThreadTaskScheduler 是否真的如預期,我寫了一小段 Testing Code 去驗證。
class Program { static StaticThreadTaskScheduler _scheduler = new StaticThreadTaskScheduler(1); static void Main(string[] args) { foreach (var item in Enumerable.Range(1, 100)) { Task.Factory .StartNew(() => WorkingAsync(item), CancellationToken.None, TaskCreationOptions.None, _scheduler) .Unwrap(); } Console.ReadKey(); _scheduler.Dispose(); Console.ReadKey(); } static async Task WorkingAsync(int i) { while (true) { Console.WriteLine($"Thread ID: {Thread.CurrentThread.ManagedThreadId}, Working ID: {i}"); await Task.Factory .StartNew(async () => { await Task.Delay(1000); Console.WriteLine($"Thread ID: {Thread.CurrentThread.ManagedThreadId}, Sub Working ID: {i}"); }, CancellationToken.None, TaskCreationOptions.None, _scheduler) .Unwrap(); } } }
結果如下: