This code consistently executes commands with a given priority. How can I to improve it?
public class PriorizatableCommandQueue<T> : IDisposable
{
private readonly object _lock;
private readonly Action<T> _action;
private ConcurrentQueue<T> _forLowestPriority;
private ConcurrentQueue<T> _forBelowNormalPriority;
private ConcurrentQueue<T> _forNormalPriority;
private ConcurrentQueue<T> _forAboveNormalPriority;
private ConcurrentQueue<T> _forHighestPriority;
private ThreadPriority _currentPriority;
private CancellationTokenSource _cts;
private Task _task;
private bool _isExecuting;
private bool _isDisposed;
public PriorizatableCommandQueue(Action<T> action)
{
_forLowestPriority = new ConcurrentQueue<T>();
_forBelowNormalPriority = new ConcurrentQueue<T>();
_forNormalPriority = new ConcurrentQueue<T>();
_forAboveNormalPriority = new ConcurrentQueue<T>();
_forHighestPriority = new ConcurrentQueue<T>();
_lock = new object();
_action = action;
_currentPriority = ThreadPriority.Normal;
_cts = new CancellationTokenSource();
_task = new Task(() => { }, _cts.Token);
_task.Start();
_cts.Cancel();
_isExecuting = false;
_isDisposed = false;
}
private void ExecuteCommands(ConcurrentQueue<T> queue, CancellationToken token)
{
T command;
while (true)
{
if (token.IsCancellationRequested)
{
lock (_lock)
_isExecuting = false;
throw new OperationCanceledException(token);
}
if (queue.TryDequeue(out command))
_action(command);
else
break;
}
lock (_lock)
{
queue = null;
if (_forHighestPriority.Count > 0)
{
queue = _forHighestPriority;
_currentPriority = ThreadPriority.Highest;
}
else if (_forAboveNormalPriority.Count > 0)
{
queue = _forAboveNormalPriority;
_currentPriority = ThreadPriority.AboveNormal;
}
else if (_forNormalPriority.Count > 0)
{
queue = _forNormalPriority;
_currentPriority = ThreadPriority.Normal;
}
else if (_forBelowNormalPriority.Count > 0)
{
queue = _forBelowNormalPriority;
_currentPriority = ThreadPriority.BelowNormal;
}
else if (_forLowestPriority.Count > 0)
{
queue = _forLowestPriority;
_currentPriority = ThreadPriority.Lowest;
}
if (queue == null)
_isExecuting = false;
else
_task = _task.ContinueWith((task) => ExecuteCommands(queue, token), token);
}
}
public void AddCommand(T command)
{
AddCommand(command, ThreadPriority.Normal);
}
public void AddCommand(T command, ThreadPriority priority)
{
if (_isDisposed)
throw new ObjectDisposedException(GetType().FullName);
ConcurrentQueue<T> queue;
switch (priority)
{
case ThreadPriority.Lowest:
queue = _forLowestPriority;
break;
case ThreadPriority.BelowNormal:
queue = _forBelowNormalPriority;
break;
case ThreadPriority.Normal:
queue = _forNormalPriority;
break;
case ThreadPriority.AboveNormal:
queue = _forAboveNormalPriority;
break;
case ThreadPriority.Highest:
queue = _forHighestPriority;
break;
default:
queue = _forNormalPriority;
break;
}
queue.Enqueue(command);
lock (_lock)
{
if (_currentPriority < priority)
{
_cts.Cancel();
_currentPriority = priority;
_cts = new CancellationTokenSource();
_task = _task.ContinueWith((task) => ExecuteCommands(queue, _cts.Token), _cts.Token);
_isExecuting = true;
}
else if (!_isExecuting)
{
_currentPriority = priority;
_cts = new CancellationTokenSource();
_task = _task.ContinueWith((task) => ExecuteCommands(queue, _cts.Token), _cts.Token);
_isExecuting = true;
}
}
}
public void ClearQueue()
{
if (_isDisposed)
throw new ObjectDisposedException(GetType().FullName);
_cts.Cancel();
_forLowestPriority = new ConcurrentQueue<T>();
_forBelowNormalPriority = new ConcurrentQueue<T>();
_forNormalPriority = new ConcurrentQueue<T>();
_forAboveNormalPriority = new ConcurrentQueue<T>();
_forHighestPriority = new ConcurrentQueue<T>();
}
public void ClearQueueAndWait()
{
ClearQueue();
try
{
_task.Wait();
}
catch (AggregateException ex)
{
ex.Handle((OperationCanceledException) => true);
}
}
public async Task ClearQueueAndWaitAsync()
{
ClearQueue();
try
{
await _task;
}
catch (OperationCanceledException) { }
}
public void Dispose()
{
if (_isDisposed)
return;
ClearQueueAndWait();
_isDisposed = true;
}
}