最近利用 Task 撰寫非同步的程式時碰到了許多未釐清的用法,這邊筆記一下自定義 TaskScheuler 的方法。
public class StaticThreadTaskScheduler : TaskScheduler, IDisposable
{
private readonly List<Thread> _threads = new List<Thread>();
private readonly BlockingCollection<Task> _tasks = new BlockingCollection<Task>();
private readonly AutoResetEvent _lockObj = new AutoResetEvent(true);
public new int MaximumConcurrencyLevel { get; private set; }
public StaticThreadTaskScheduler() : this(Environment.ProcessorCount) { }
public StaticThreadTaskScheduler(int maximumConcurrencyLevel)
{
MaximumConcurrencyLevel = maximumConcurrencyLevel;
initalize();
}
protected override IEnumerable<Task> GetScheduledTasks()
{
return _tasks;
}
protected override void QueueTask(Task task)
{
if (task != null)
{
_lockObj.WaitOne();
_tasks.Add(task);
_lockObj.Set();
}
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return false;
}
private void initalize()
{
for (int i = 0; i < MaximumConcurrencyLevel; i++)
{
Thread thread = new Thread(execute);
thread.IsBackground = true;
thread.Start();
_threads.Add(thread);
}
}
private void execute()
{
foreach (var task in _tasks.GetConsumingEnumerable())
{
TryExecuteTask(task);
}
}
#region IDisposable Support
private bool _disposedValue = false;
protected virtual void Dispose(bool disposing)
{
if (!_disposedValue)
{
if (disposing)
{
_lockObj.WaitOne();
foreach (var thread in _threads)
{
thread.Join();
}
_threads.Clear();
_tasks.Dispose();
_lockObj.Dispose();
}
_disposedValue = true;
}
}
public void Dispose()
{
Dispose(true);
}
#endregion
}
為了測試 StaticThreadTaskScheduler 是否真的如預期,我寫了一小段 Testing Code 去驗證。
class Program
{
static StaticThreadTaskScheduler _scheduler = new StaticThreadTaskScheduler(1);
static void Main(string[] args)
{
foreach (var item in Enumerable.Range(1, 100))
{
Task.Factory
.StartNew(() =>
WorkingAsync(item),
CancellationToken.None,
TaskCreationOptions.None,
_scheduler)
.Unwrap();
}
Console.ReadKey();
_scheduler.Dispose();
Console.ReadKey();
}
static async Task WorkingAsync(int i)
{
while (true)
{
Console.WriteLine($"Thread ID: {Thread.CurrentThread.ManagedThreadId}, Working ID: {i}");
await
Task.Factory
.StartNew(async () =>
{
await Task.Delay(1000);
Console.WriteLine($"Thread ID: {Thread.CurrentThread.ManagedThreadId}, Sub Working ID: {i}");
},
CancellationToken.None,
TaskCreationOptions.None,
_scheduler)
.Unwrap();
}
}
}
結果如下:






