What do you think? In particular, I'm not sure about my exception handling approach to raise errors when posting objects; but I couldn't come up with anything better.
public class Thing
{
public Thing(int number)
{
Number = number;
}
public int Number { get; set; }
}
class Program
{
static void Main(string[] args)
{
var thingBatcher = new Batcher<Thing>(ProcessBatch, 100, TimeSpan.FromSeconds(30));
try
{
for (var i = 0; i < int.MaxValue; i++)
{
thingBatcher.Post(new Thing(i));
}
}
catch (Exception ex)
{
Console.WriteLine("Here:" + ex.Message);
throw;
}
Console.ReadLine();
}
private static void ProcessBatch(IEnumerable<Thing> things)
{
Console.WriteLine("Batch found: " + things.Count());
if (things.Any(thing => thing.Number == 400))
{
throw new InvalidOperationException("Something bad happened");
}
}
}
public class Batcher<T>
{
private BufferBlock<T> _buffer;
private ActionBlock<IEnumerable<T>> _actionBlock;
public Batcher(
Action<IEnumerable<T>> processBatch,
int batchSize,
TimeSpan maxTimeWaitingForBatch)
{
Initialize(batchSize, processBatch, maxTimeWaitingForBatch);
}
private void Initialize(int batchSize,
Action<IEnumerable<T>> processBatch,
TimeSpan maxTimeWaitingForBatch)
{
_buffer = new BufferBlock<T>();
var batchStockEvents = new BatchBlock<T>(batchSize);
// Use a timer to make sure that items do not remain in memory for too long
var triggerBatchTimer = new Timer(delegate { batchStockEvents.TriggerBatch(); });
// Use a transform block to reset the timer whenever an item is inserted to avoid unnessary batches
var timeoutBlock = new TransformBlock<T, T>((value) =>
{
triggerBatchTimer.Change(maxTimeWaitingForBatch, TimeSpan.FromMilliseconds(-1));
return value;
});
_actionBlock = new ActionBlock<IEnumerable<T>>(processBatch);
_buffer.LinkTo(timeoutBlock);
timeoutBlock.LinkTo(batchStockEvents);
batchStockEvents.LinkTo(_actionBlock);
_actionBlock.Completion.ContinueWith(x =>
{
// todo log exception here
Console.WriteLine(x.Exception.Message);
}, TaskContinuationOptions.OnlyOnFaulted);
}
public void Post(T someObject)
{
if (_actionBlock.Completion.IsCompleted)
{
throw new InvalidOperationException("The batcher is not accepting new objects as it is in a failed state");
}
_buffer.Post(someObject);
}
}