Code Review Stack Exchange is a question and answer site for peer programmer code reviews. It's 100% free, no registration required.

Sign up
Here's how it works:
  1. Anybody can ask a question
  2. Anybody can answer
  3. The best answers are voted up and rise to the top

The following is an TaskScheduler that always run tasks in a thread it maintains. When created, a name of the thread was specified. Once you schedule the first task, until it is been Disposeed, a thread will be created and wait for tasks to execute.

The reason of this class is that sometimes there is a need to guarantee that some tasks must be always scheduled in a specific thread (not the UI thread though). For example, some 3 party dll may have resource leak if you keep creating new threads to call its functions.

using Task = System.Threading.Tasks.Task;
using Thread = System.Threading.Thread;
using Barrier = System.Threading.Barrier;
using Monitor = System.Threading.Monitor;
using IDisposable = System.IDisposable;
using TaskEnum = System.Collections.Generic.IEnumerable<System.Threading.Tasks.Task>;
using TaskQueue = System.Collections.Generic.Queue<System.Threading.Tasks.Task>;
using Enumerable = System.Linq.Enumerable;
using ObjectDisposedException = System.ObjectDisposedException;

using _Imported_Extensions_;
namespace _Imported_Extensions_
{
    public static class Extensions
    {
        public static bool Any(this TaskEnum te)
        {
            return Enumerable.Any(te);
        }

        public static TaskEnum ToList(this TaskEnum te)
        {
            return Enumerable.ToList(te);
        }
    }
}

namespace TaskUtils
{
    public class SameThreadTaskScheduler : System.Threading.Tasks.TaskScheduler, IDisposable
    {
        #region publics
        public SameThreadTaskScheduler(string name)
        {
            scheduledTasks = new TaskQueue();
            threadName = name;
        }
        public override int MaximumConcurrencyLevel { get { return 1; } }
        public void Dispose()
        {
            lock (scheduledTasks)
            {
                quit = true;
                Monitor.PulseAll(scheduledTasks);
            }
        }
        #endregion

        #region protected overrides
        protected override TaskEnum GetScheduledTasks()
        {
            lock (scheduledTasks)
            {
                return scheduledTasks.ToList();
            }
        }
        protected override void QueueTask(Task task)
        {
            if (myThread == null)
                myThread = StartThread(threadName);
            if (!myThread.IsAlive)
                throw new ObjectDisposedException("My thread is not alive, so this object has been disposed!");
            lock (scheduledTasks)
            {
                scheduledTasks.Enqueue(task);
                Monitor.PulseAll(scheduledTasks);
            }
        }
        protected override bool TryExecuteTaskInline(Task task, bool task_was_previously_queued)
        {
            return false;
        }
        #endregion

        private readonly TaskQueue scheduledTasks;
        private Thread myThread;
        private readonly string threadName;
        private bool quit;

        private Thread StartThread(string name)
        {
            var t = new Thread(MyThread) { Name = name };
            using (var start = new Barrier(2))
            {
                t.Start(start);
                ReachBarrier(start);
            }
            return t;
        }
        private void MyThread(object o)
        {
            Task tsk;
            lock (scheduledTasks)
            {
                //When reaches the barrier, we know it holds the lock.
                //
                //So there is no Pulse call can trigger until
                //this thread starts to wait for signals.
                //
                //It is important not to call StartThread within a lock.
                //Otherwise, deadlock!
                ReachBarrier(o as Barrier);
                tsk = WaitAndDequeueTask();
            }
            for (; ; )
            {
                if (tsk == null)
                    break;
                TryExecuteTask(tsk);
                lock (scheduledTasks)
                {
                    tsk = WaitAndDequeueTask();
                }
            }
        }
        private Task WaitAndDequeueTask()
        {
            while (!scheduledTasks.Any() && !quit)
                Monitor.Wait(scheduledTasks);
            return quit ? null : scheduledTasks.Dequeue();
        }

        private static void ReachBarrier(Barrier b)
        {
            if (b != null)
                b.SignalAndWait();
        }
    }
}

I used an unusual using block and put all method extensions in use into a single class. The reason is that I want to specify exactly what I wanted from the outside of the code. It is fine to use traditional using block instead without change any class code, but anyway focus on the class!

What I am concerning is its concurrency correctness. I want to know although this seems to be working, is it actually correct? Are there better way (simpler) to achieve this? Coding style advises are also welcome, thanks.

Specific Questions

Is it safe to use Pulse rather than PulseAll in this case?

share|improve this question
    
    
Any final solution with full source code sample application ? IMHO, better samples for minimize learning curve are real applications with full source code and good patterns – Kiquenet Sep 4 '14 at 6:52
up vote 4 down vote accepted
using Task = System.Threading.Tasks.Task;
using Thread = System.Threading.Thread;
using Barrier = System.Threading.Barrier;
using Monitor = System.Threading.Monitor;
using IDisposable = System.IDisposable;

You don't need to write all those usings one class at a time. In C#, the common approach is to add a using once for each namespace you need. This is considered a bad practice in C++ (maybe that's why you did it this way?), but that's only because in C++, namespaces are not structured properly (almost everything is directly in std) and because the naming conventions there (list, not List) make collisions more likely.

using TaskEnum = System.Collections.Generic.IEnumerable<System.Threading.Tasks.Task>;
using TaskQueue = System.Collections.Generic.Queue<System.Threading.Tasks.Task>;

This is also not necessary, just add the necessary namespace usings, and the write IEnumerable<Task> or Queue<Task>, that's not that long.


namespace _Imported_Extensions_

_Imported_Extensions_ is a weird name for a namespace. Why all the underscores? And the convention is to use PascalCase (e.g. ImportedExtensions) for namespaces too.

And what does the name even mean? Why is it important to stress out that those extensions were imported? And from where?

Also, it's not common to have multiple namespaces in the same file. If the class is used only in this file, put it in the same namespace as everything else in that file.


public static bool Any(this TaskEnum te)
public static TaskEnum ToList(this TaskEnum te)

Both of the extension methods are completely unnecessary. If you just added using System.Linq;, both would work by themselves.


if (myThread == null)
    myThread = StartThread(threadName);

This is not thread-safe. If two threads call this method at the same time, StartThread() will be called twice and two threads will be created.

Also, why is the thread started here and not in the constructor?


if (!myThread.IsAlive)

I don't think this is the right check here. Checking quit would be better, because that means enqueuing stops working as soon as the scheduler is disposed.


I don't like that your fields are in the middle of the class. If you put them at (or near) the top, they will be easier to find.


I think the way you're using Barrier is clumsy. If you want a notification that the worker thread is ready, use something like ManualResetEvent.

Also, you seem to be trying to protect against Barrier being null, but that can never happen here. So doing that just makes your code longer and more confusing.

Even better option would be to use a queue that already supports blocking when no items are available: BlockingCollection.


Is it safe to use Pulse rather than PulseAll in this case?

Yes, it is, since you're always going to have only one thread waiting.


Also, if I wanted something like this, I would either use ConcurrentExclusiveSchedulerPair.ExclusiveScheduler, if the tasks didn't have to execute on the same thread, just being mutually exclusive.

Or some scheduler from ParallelExtensionsExtras, if a single thread was a requirement.

share|improve this answer
    
No, my way of use "using" is from Java and Haskell. In those languages, people prefer precise symbol import. – Earth Engine Mar 10 '14 at 22:43
    
+1 Live example. From all the good answers provided by everyone here I am now getting a better understanding of appropriate use of this topic and IMHO other programmers will definitely benifit from the knowledge sharing here. Some of the implementations that I had worked in the past could have been better implemented using this knowledge. IMHO, better samples for minimize learning curve are real applications with full source code and good patterns – Kiquenet Sep 4 '14 at 6:53

Not exactly a review, but since you are asking for simpler way... The simpler way is to run your tasks via dispatcher. Just run it on background thread:

_thread = new Thread(() =>
               {
                   _dispatcher = Dispatcher.CurrentDispatcher;
                   Dispatcher.Run();
               });
_thread.Start();

And use _dispatcher.BeginInvoke and _dispatcher.Invoke to run your tasks on that thread. It is a lot simpler than reinventing the wheel. The obvious downside is wpf dependency.

share|improve this answer
1  
Sorry, I marked another answer, because yours is "not exactly a review". But still, +1ed. – Earth Engine Mar 14 '14 at 12:39

Your Answer

 
discard

By posting your answer, you agree to the privacy policy and terms of service.

Not the answer you're looking for? Browse other questions tagged or ask your own question.