I'm in need of a solution that runs constantly incoming requests in a per-resource sequence, but parallel in general.

The use-case:

Many clients connect to a server and start issuing work. The work of a single client needs to run in sequential order, so the downward code doesn't need to cope with concurrency, but in general all work should be run on multiple threads. I'm trusting the .NET framework a lot here, which I hope is a good thing.

I've also read into DataFlow and parallel Rx but could not find a general solution there. But hints into that direction are welcome!

class TaskGroup
{
    public int CurrentlyQueuedTasks { get { return _currentlyQueued; } }

    private readonly object _previousTaskMonitor;
    private Task _previousTask;
    private int _currentlyQueued;

    public TaskGroup()
    {
        _previousTaskMonitor = new object();
        _previousTask = Task.CompletedTask;
    }

    public void Append(Action action)
    {
        lock(_previousTaskMonitor)
        {
            Interlocked.Increment(ref _currentlyQueued);
            _previousTask = _previousTask.ContinueWith(task =>
            {
                try
                {
                    action();
                }catch(Exception)
                {
                    //TODO
                }
                finally
                {
                    Interlocked.Decrement(ref _currentlyQueued);
                }
            });
        }
    }
}
share|improve this question
2  
It's not clear what you're asking here. Does your code work as intended? – RubberDuck Feb 6 at 16:57
    
Yes, it seems to work, but i haven't tested it heavily against concurrency issues. I'm also asking for improvements or maybe parts of the library that i missed. – Vengarioth Feb 6 at 18:36
up vote 2 down vote accepted

This is an interesting approach. I would have used a queue by default since it seems to express the semantics a bit clearer (the queuing is slightly more obvious). Also ContinueWith creates a Task wrapping the original task which I'm not sure if that has any form of performance downsides (it probably shouldn't). I hacked a quick benchmark together with the alternative being implemented using the BlockingCollection:

using System;
using System.Diagnostics;
using System.Threading.Tasks;
using System.Threading;
using System.Collections.Concurrent;

public interface IAppendable
{
    void Append(Action action);
}

public class TaskGroup : IAppendable
{
    public int CurrentlyQueuedTasks { get { return _currentlyQueued; } }

    private readonly object _previousTaskMonitor;
    private Task _previousTask;
    private int _currentlyQueued;

    public TaskGroup()
    {
        _previousTaskMonitor = new object();
        _previousTask = Task.FromResult(false);
    }

    public void Append(Action action)
    {
        lock(_previousTaskMonitor)
        {
            Interlocked.Increment(ref _currentlyQueued);
            _previousTask = _previousTask.ContinueWith(task =>
            {
                try
                {
                    action();
                }catch(Exception)
                {
                    //TODO
                }
                finally
                {
                    Interlocked.Decrement(ref _currentlyQueued);
                }
            });
        }
    }
}

public class QueueAppendable : IAppendable, IDisposable
{
    public int CurrentlyQueuedTasks { get { return _Queue.Count; } }

    BlockingCollection<Action> _Queue = new BlockingCollection<Action>();

    public QueueAppendable()
    {
        Task.Factory.StartNew(() =>
        {
            while (true)
            {
                try 
                {
                    var action = _Queue.Take();
                    action();
                }
                catch (InvalidOperationException)
                {
                    break;
                }
                catch
                {
                    // TODO log me
                }
            }
        });
    }

    public void Append(Action action)
    {
        _Queue.Add(action);
    }

    public void Dispose()
    {
        _Queue.CompleteAdding();
    }
}

public class Test
{
    public static void TimeIt(string name, IAppendable appendable)
    {
        var finishEvent = new ManualResetEvent(false);
        var sw = new Stopwatch();
        sw.Start();
        for (int i = 0; i < 2000; ++i)
        {
            appendable.Append(() => { Thread.Sleep(1); });
        }
        appendable.Append(() => { finishEvent.Set(); });
        finishEvent.WaitOne();
        sw.Stop();
        Console.WriteLine("{0} elapsed time: {1}ms", name, sw.ElapsedMilliseconds);
        (appendable as IDisposable)?.Dispose();
    }

    public static void Main()
    {
        TimeIt("TaskGroup", new TaskGroup());
        TimeIt("Queue", new QueueAppendable());
    }
}

Output:

TaskGroup elapsed time: 2135ms
Queue elapsed time: 2121ms

So there is pretty much no performance difference between the two however I think the BlockingCollection approach has a few advantages:

  1. Easier to debug. You can simply set a break point and peek the queue. This is quite difficult to do with the wrapped task approach.
  2. No use use of lower level synchronization primitives. The first time I read your code I instinctively thought "Hang he's got a lock why the Interlocked calls" until I realized that the decrement happen in the async task outside of the lock. With the BlockingQueue you program against a slightly higher level of abstraction which is often a good thing.
  3. Fewer class members which reduce the state complexity of the object (the queue is the only member).

Apart from that I think your approach should be fine. You may want to consider adding support for cancellation via CancellationToken

share|improve this answer
    
Wait, what prevents the Queue implementation's tasks from running parallel instead of sequential? – Vengarioth Feb 13 at 18:25
    
@Vengarioth: The queue only has one task - the one which is pulling items of the queue and executing them. – ChrisWue Feb 14 at 0:02
    
Is it blocking the thread when there are no tasks in the Queue? – Vengarioth Feb 14 at 20:35
    
@Vengarioth: Yes, see BlockingCollection.Take – ChrisWue Feb 14 at 20:36
    
In my case, that is not an option unfortunately, i expanded my proposed solution to a linked list of tasks to perform basic list operations, but thank you a lot for your review! – Vengarioth Feb 17 at 0:08

Your Answer

 
discard

By posting your answer, you agree to the privacy policy and terms of service.

Not the answer you're looking for? Browse other questions tagged or ask your own question.