I have a process that does a bunch of document processing. One of the tasks being performed is OCR, which is the slowest part of the process, so I decided to run that in another thread.
I decided the producer/consumer would work best because I will be adding document ids as each document is being processed (and there will be thousands of documents processed).
I also decided to use the Parallel.ForEach instead of kicking off many consumer threads because I figured the Parallel.ForEach will utilize the resources much better than I could do it on my own.
Another requirement is that I need to be able to cancel the process.
Question: Are my decision above sound decision (regarding using the BlockingCollection and Parallel.ForEach)?
Review: Is my code resilient? Am I doing things correctly regarding handling the cancellation?
Note: I have some questions within the code.
public class DocumentOcrQueue
{
private readonly BlockingCollection<int> _documentIds = new BlockingCollection<int>();
private Task _consumer;
private CancellationTokenSource _cts;
public void Add(int documentId)
{
_documentIds.Add(documentId);
}
/// <summary>
/// Start the OCR Service. Once the service has started it will start processing document Ids in the queue
/// </summary>
public void Start()
{
try
// The token source for issuing the cancelation request.
_cts = new CancellationTokenSource();
var cts = _cts;
// Question: I put the cancellation token as the parameter of the
// Task.Run but I am not sure if this is needed.
// Any advice on this?
_consumer = Task.Run(() => OcrConsumer(cts.Token), cts.Token);
}
private void OcrConsumer(CancellationToken cancellationToken)
{
var options = new ParallelOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount,
CancellationToken = cancellationToken
};
try
{
// GetConsumingPartitioner is an extension method. See class below.
Parallel.ForEach(_documentIds.GetConsumingPartitioner(), options, id =>
{
options.CancellationToken.ThrowIfCancellationRequested();
PerformOcr(id);
});
}
catch (OperationCanceledException)
{
}
catch (AggregateException ae)
{
bool notCancelledExceptionOccurred = false;
foreach (var ex in ae.InnerExceptions.Where(e => !(e is OperationCanceledException)))
{
notCancelledExceptionOccurred = true;
// Log exception
}
if (notCancelledExceptionOccurred)
throw;
}
catch (Exception ex)
{
// Log exception
throw;
}
finally
{
_cts.Dispose();
}
}
private void PerformOcr(int documentId)
{
// Do OCR work here.
}
public void Stop(bool stopImmediately = false)
{
_documentIds.CompleteAdding();
if (stopImmediately)
_cts.Cancel();
// Question
// Does it matter if I use Task.WaitAll or _consumer.Wait()?
// Is there an advantage of one over the other?
Task.WaitAll(_consumer);
// _consumer.Wait();
}
}
// BlockingCollection.GetConsumingEnumerable and Parallel.ForEach can work against each other so found this sample code which solves this.
// http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx
static class BlockingCollectionExtention
{
public static Partitioner<T> GetConsumingPartitioner<T>(this BlockingCollection<T> collection)
{
return new BlockingCollectionPartitioner<T>(collection);
}
private class BlockingCollectionPartitioner<T> : Partitioner<T>
{
private readonly BlockingCollection<T> _collection;
internal BlockingCollectionPartitioner(
BlockingCollection<T> collection)
{
if (collection == null)
throw new ArgumentNullException(nameof(collection));
_collection = collection;
}
public override bool SupportsDynamicPartitions => true;
public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
{
if (partitionCount < 1)
throw new ArgumentOutOfRangeException(nameof(partitionCount));
var dynamicPartitioner = GetDynamicPartitions();
return Enumerable.Range(0, partitionCount).Select(_ => dynamicPartitioner.GetEnumerator()).ToArray();
}
public override IEnumerable<T> GetDynamicPartitions()
{
return _collection.GetConsumingEnumerable();
}
}
}