Code Review Stack Exchange is a question and answer site for peer programmer code reviews. It's 100% free, no registration required.

Sign up
Here's how it works:
  1. Anybody can ask a question
  2. Anybody can answer
  3. The best answers are voted up and rise to the top

After learning the basics (isolated state, communication and computation) of the the Actor-Model I I wrote my own implementation and I'm ready for a through code review.

You will be able to see that in the test code I've created 2 tasks which are simultaneously sends multiple messages to an instance of the Actor class.

What I'm wondering the most is if the use of threading within the actor is correct for processing the incoming messages or should I process messages without a thread.

Full code:

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace ActorModel
{
    /// <summary>
    /// Client/Test Code
    /// </summary>
    public static class Program
    {
        private static void Main()
        {
            var actor = new Actor<string>(x => Console.WriteLine(x.ToUpper())).Start();
            var task1 = Task.Factory.StartNew(() => SendMessagesToActor(actor));
            var task2 = Task.Factory.StartNew(() => SendMessagesToActor(actor));
            Task.WaitAll(new[] {task1, task2});
            Console.WriteLine();
            Console.WriteLine();
            Console.WriteLine("PRESS ENTER TO STOP THE ACTOR");
            Console.ReadLine();

            actor.Stop();
        }

        private static void SendMessagesToActor(Actor<string> actor)
        {
            var counter = 0;
            while (counter < 5)
            {
                actor.Send(String.Format("message #[{0}] from thread #[{1}]", counter,
                                         Thread.CurrentThread.ManagedThreadId));
                Thread.Sleep(100); // To avoid of OutOfMemory issues
                counter++;
            }
        }
    }

    /// <summary>
    /// Actor-Based Class 
    /// </summary>
    /// <remarks>basics: Immutability, Communication and Computation</remarks>
    /// <typeparam name="TMessage"></typeparam>
    public class Actor<TMessage> where TMessage : class
    {
        private volatile bool _started;
        private readonly Action<TMessage> _messageHandler;
        private readonly ConcurrentQueue<TMessage> _messagesQueue; // Consider replace with TPL workflow
        private readonly Task _processingTask;
        private readonly CancellationTokenSource _source;

        public Actor(Action<TMessage> messageHandler)
        {
            if (messageHandler == null)
            {
                throw new ArgumentNullException("messageHandler");
            }
            _messagesQueue = new ConcurrentQueue<TMessage>();
            _messageHandler = messageHandler;

            _source = new CancellationTokenSource();
            _processingTask = new Task(() => ProcessMessages(_source.Token), _source.Token, TaskCreationOptions.LongRunning);
        }

        //---------------------------------------------------------------------------------------------------------------------------------------------------

        public Actor<TMessage> Start()
        {
            if (!_started)
            {
                _processingTask.Start();
                _started = true;
            }

            return this;
        }

        public void Stop()
        {
            Console.WriteLine("PROCESSING STOP REQUESTED");
            _source.Cancel();
        }

        //---------------------------------------------------------------------------------------------------------------------------------------------------

        public void Send(TMessage message)
        {
            _messagesQueue.Enqueue(message); // any capacity bounding is required here?
        }

        //---------------------------------------------------------------------------------------------------------------------------------------------------

        private void ProcessMessages(CancellationToken ct)
        {
            while (true)
            {
                if (_messagesQueue.Count > 0) 
                {
                    TMessage message;
                    var hasRemoved = _messagesQueue.TryDequeue(out message);

                    if (hasRemoved)
                    {
                        _messageHandler(message);
                    }

                    continue;
                }

                if (ct.IsCancellationRequested)
                {
                    Console.WriteLine("PROCESSING STOPED");
                    return;
                }
            }
        }
    }
}
share|improve this question

There is one drawback(?) in your actor, in some cases it can't be interrupted.
Actor can't be stopped untill there are tasks in inner queue.
So if some outer threads will enque new tasks little bit faster then actor can process it, then you wan't be able to stop your actor.
If it's an actors feature - ok, else you can fix it by removing continue or placing cancellation check at the top of the loop. Another way to handle this case is changing Send method behaviour.

share|improve this answer
    
Thanks, it's actually a good point if I want to enforce stopping the actor – Yair Nevet 15 hours ago

Your Answer

 
discard

By posting your answer, you agree to the privacy policy and terms of service.

Not the answer you're looking for? Browse other questions tagged or ask your own question.