Take the 2-minute tour ×
Code Review Stack Exchange is a question and answer site for peer programmer code reviews. It's 100% free, no registration required.

Attempting to jump into the Windows Server ServiceBus 1.1 code-base along with adopting the new TPL async methods. But I could not find an easy way to just spin up N number of handlers for message sessions (might have 100 or so concurrent sessions). So it would be great to get some feedback on the following code, any suggestions on an easier way would be great... note tried to keep code sample simple for the questions purpose.

///==============================================================
/// SAMPLE USAGE

SubClient = SubscriptionClient.Create(TopicName, SubscriptionName);
SessionsMessagingOptions opts = new SessionsMessagingOptions()
{
    NumberOfSesssions = 5,
    ReceiveTimeOut = TimeSpan.FromSeconds (5),
    AutoMarkMessageComplete = true,
    MessageHandler = msg =>
    {
       _logger.Log(string.Format("Processing recived Message: SessionId = {0}, Body = {1}",
         msg.SessionId,
         msg.GetBody<string>()));
    }
};
SubClient.HandleSessions(opts);

///==============================================================
public class SessionsMessagingOptions
{
    public Int32 NumberOfSesssions { get; set; }
    public TimeSpan ReceiveTimeOut { get; set; }
    public Boolean AutoMarkMessageComplete { get; set; }
    public Action<BrokeredMessage> MessageHandler { get; set; }
}

///==============================================================
public static class SubscriptionClientExtensions
{
    public static void HandleSessionsV2(this SubscriptionClient sc, SessionsMessagingOptions opts)
    {
        for (Int32 nIndex = 0; nIndex < opts.NumberOfSesssions; nIndex++)
        {
            HandleSession(sc, opts);
        }
    }

    public static async Task<MessageSession> HandleSession(SubscriptionClient sc, SessionsMessagingOptions opts)
    {
        do
        {
            MessageSession ms = null;
            try
            {
                ms = await sc.AcceptMessageSessionAsync().ConfigureAwait(false);

                foreach (var msg in await ms.ReceiveBatchAsync(5, opts.ReceiveTimeOut).ConfigureAwait(false))
                {
                    if (msg == null)
                        break;
                    try
                    {
                        opts.MessageHandler(msg);

                        if (opts.AutoMarkMessageComplete)
                            msg.Complete();
                    }
                    catch (Exception)
                    {
                        // log the exception   
                    }
                } 
            }
            catch (TimeoutException)
            {
                // log timeout occurred
            }
            catch (Exception)
            {
               // look into other exception types to handle here
            }
            finally
            {
                if (ms != null)
                {
                    if (!ms.IsClosed)
                        ms.Close();
                }
            }

        } while (true);
    }

    public static void HandleSessions(this SubscriptionClient sc, SessionsMessagingOptions opts)
    {
        Action<Task<MessageSession>> sessionAction = null;
        Action<Task<BrokeredMessage>> msgHandler = null;

        sessionAction = new Action<Task<MessageSession>>(tMS =>
        {
            if (tMS.IsFaulted) // session timed out - repeat
            {
                sc.AcceptMessageSessionAsync().ContinueWith(sessionAction);
                return;
            }

            MessageSession msgSession = null;
            try
            {
                msgSession = tMS.Result;
            }
            catch (Exception)
            {
                return; // task cancelation exception
            }

            msgHandler = new Action<Task<BrokeredMessage>>(taskBM =>
            {
                if (taskBM.IsFaulted)
                    return;

                BrokeredMessage bMsg = null;
                try
                {
                    bMsg = taskBM.Result;
                }
                catch (Exception)
                {
                    return; // task cancelation exception
                }

                if (bMsg == null)
                {
                    sc.AcceptMessageSessionAsync().ContinueWith(sessionAction); // session is dead
                    return;
                }

                opts.MessageHandler(bMsg); // client code to handle the message

                if (opts.AutoMarkMessageComplete)
                    bMsg.Complete();

                msgSession.ReceiveAsync(opts.ReceiveTimeOut).ContinueWith(msgHandler); // repeat
            });

            msgSession.ReceiveAsync(opts.ReceiveTimeOut).ContinueWith(msgHandler); // start listening
        });

        for (Int32 nIndex = 0; nIndex < opts.NumberOfSesssions; nIndex++)
        {
            sc.AcceptMessageSessionAsync().ContinueWith(sessionAction);
        }
    }
}
share|improve this question

Your Answer

 
discard

By posting your answer, you agree to the privacy policy and terms of service.

Browse other questions tagged or ask your own question.