Code Review Stack Exchange is a question and answer site for peer programmer code reviews. Join them; it only takes a minute:

Sign up
Here's how it works:
  1. Anybody can ask a question
  2. Anybody can answer
  3. The best answers are voted up and rise to the top

I am trying to exploit asynchronism for parallelism. This is my first attempt at a parallel event subscriber. In your expert opinions, is this a valid approach?

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using EventStore.ClientAPI;

namespace Sandbox
{
    public class SomeEventSubscriber
    {
        private Position? _latestPosition;
        private readonly Dictionary<Type, Action<object>> _eventHandlerMapping;
        private IEventStoreConnection _connection;

        public Dictionary<Type, Action<object>> EventHandlerMapping
        {
            get { return _eventHandlerMapping; }
        }

        public SomeEventSubscriber()
        {
            _eventHandlerMapping = CreateEventHandlerMapping();
            _latestPosition = Position.Start;
        }

        public void Start()
        {
            ConnectToEventstore();
        }

        private void ConnectToEventstore()
        {
            _connection = EventStoreConnectionWrapper.Connect();
            _connection.Connected +=
            (sender, args) => _connection.SubscribeToAllFrom(_latestPosition, false, EventOccured, LiveProcessingStarted, HandleSubscriptionDropped);
        }

        private Dictionary<Type, Action<object>> CreateEventHandlerMapping()
        {
            return new Dictionary<Type, Action<object>>
            {
                {typeof (FakeEvent1), o => Handle(o as FakeEvent1)},
                {typeof (FakeEvent2), o => Handle(o as FakeEvent2)},
            };
        }

        private void Handle(FakeEvent1 eventToHandle)
        {
            SomethingLongRunning(eventToHandle);
        }

        private void Handle(FakeEvent2 eventToHandle)
        {
            SomethingLongRunning(eventToHandle);
        }

        private void SomethingLongRunning(BaseFakeEvent eventToHandle)
        {
            Console.WriteLine("Start Handling: " + eventToHandle.GetType());
            for (int i = 0; i < 10000000; i++)
            {

            }
            Console.WriteLine("Finished Handling: " + eventToHandle.GetType());
        }

        private void EventOccured(EventStoreCatchUpSubscription eventStoreCatchUpSubscription,
            ResolvedEvent resolvedEvent)
        {
            if (resolvedEvent.OriginalEvent.EventType.StartsWith("$") || resolvedEvent.OriginalEvent.EventStreamId.StartsWith("$"))
                return;

            var @event = EventSerialization.DeserializeEvent(resolvedEvent.OriginalEvent);
            if (@event != null)
            {
                var eventType = @event.GetType();
                if (_eventHandlerMapping.ContainsKey(eventType))
                {
                    var task = Task.Factory.StartNew(() => _eventHandlerMapping[eventType](@event));
                    Console.WriteLine("The task is running asynchronously...");
                }
            }
            if (resolvedEvent.OriginalPosition != null) _latestPosition = resolvedEvent.OriginalPosition.Value;
        }

        private void HandleSubscriptionDropped(EventStoreCatchUpSubscription subscription, SubscriptionDropReason dropReason, Exception ex)
        {
            if (dropReason == SubscriptionDropReason.ProcessingQueueOverflow)
            {
                //TODO: Wait and reconnect probably with back off
            }

            if (dropReason == SubscriptionDropReason.UserInitiated)
                return;

            if (SubscriptionDropMayBeRecoverable(dropReason))
            {
                Start();
            }
        }

        private static bool SubscriptionDropMayBeRecoverable(SubscriptionDropReason dropReason)
        {
            return dropReason == SubscriptionDropReason.Unknown || dropReason == SubscriptionDropReason.SubscribingError ||
                   dropReason == SubscriptionDropReason.ServerError || dropReason == SubscriptionDropReason.ConnectionClosed;
        }

        private static void LiveProcessingStarted(EventStoreCatchUpSubscription eventStoreCatchUpSubscription)
        {

        }
    }
}
share|improve this question
up vote 2 down vote accepted
var task = Task.Factory.StartNew(() => _eventHandlerMapping[eventType](@event));
Console.WriteLine("The task is running asynchronously...");
task.Wait();

This code doesn't make any sense. You're running the event handler on another thread, but then you're immediately waiting for it to finish. The end result is your code will behave exactly as if you called the handler directly, only with more code and some added overhead.

share|improve this answer
    
Thanks. Is this your answer? Should I remove Task.Wait()? – csetzkorn Dec 18 '14 at 19:24
    
Sorry I muddled this up with await:stackoverflow.com/questions/9519414/… – csetzkorn Dec 18 '14 at 19:31

Your Answer

 
discard

By posting your answer, you agree to the privacy policy and terms of service.

Not the answer you're looking for? Browse other questions tagged or ask your own question.