Attempting to jump into the Windows Server ServiceBus 1.1 code-base along with adopting the new TPL async methods. But I could not find an easy way to just spin up N number of handlers for message sessions (might have 100 or so concurrent sessions). So it would be great to get some feedback on the following code, any suggestions on an easier way would be great... note tried to keep code sample simple for the questions purpose.
///==============================================================
/// SAMPLE USAGE
SubClient = SubscriptionClient.Create(TopicName, SubscriptionName);
SessionsMessagingOptions opts = new SessionsMessagingOptions()
{
NumberOfSesssions = 5,
ReceiveTimeOut = TimeSpan.FromSeconds (5),
AutoMarkMessageComplete = true,
MessageHandler = msg =>
{
_logger.Log(string.Format("Processing recived Message: SessionId = {0}, Body = {1}",
msg.SessionId,
msg.GetBody<string>()));
}
};
SubClient.HandleSessions(opts);
///==============================================================
public class SessionsMessagingOptions
{
public Int32 NumberOfSesssions { get; set; }
public TimeSpan ReceiveTimeOut { get; set; }
public Boolean AutoMarkMessageComplete { get; set; }
public Action<BrokeredMessage> MessageHandler { get; set; }
}
///==============================================================
public static class SubscriptionClientExtensions
{
public static void HandleSessionsV2(this SubscriptionClient sc, SessionsMessagingOptions opts)
{
for (Int32 nIndex = 0; nIndex < opts.NumberOfSesssions; nIndex++)
{
HandleSession(sc, opts);
}
}
public static async Task<MessageSession> HandleSession(SubscriptionClient sc, SessionsMessagingOptions opts)
{
do
{
MessageSession ms = null;
try
{
ms = await sc.AcceptMessageSessionAsync().ConfigureAwait(false);
foreach (var msg in await ms.ReceiveBatchAsync(5, opts.ReceiveTimeOut).ConfigureAwait(false))
{
if (msg == null)
break;
try
{
opts.MessageHandler(msg);
if (opts.AutoMarkMessageComplete)
msg.Complete();
}
catch (Exception)
{
// log the exception
}
}
}
catch (TimeoutException)
{
// log timeout occurred
}
catch (Exception)
{
// look into other exception types to handle here
}
finally
{
if (ms != null)
{
if (!ms.IsClosed)
ms.Close();
}
}
} while (true);
}
public static void HandleSessions(this SubscriptionClient sc, SessionsMessagingOptions opts)
{
Action<Task<MessageSession>> sessionAction = null;
Action<Task<BrokeredMessage>> msgHandler = null;
sessionAction = new Action<Task<MessageSession>>(tMS =>
{
if (tMS.IsFaulted) // session timed out - repeat
{
sc.AcceptMessageSessionAsync().ContinueWith(sessionAction);
return;
}
MessageSession msgSession = null;
try
{
msgSession = tMS.Result;
}
catch (Exception)
{
return; // task cancelation exception
}
msgHandler = new Action<Task<BrokeredMessage>>(taskBM =>
{
if (taskBM.IsFaulted)
return;
BrokeredMessage bMsg = null;
try
{
bMsg = taskBM.Result;
}
catch (Exception)
{
return; // task cancelation exception
}
if (bMsg == null)
{
sc.AcceptMessageSessionAsync().ContinueWith(sessionAction); // session is dead
return;
}
opts.MessageHandler(bMsg); // client code to handle the message
if (opts.AutoMarkMessageComplete)
bMsg.Complete();
msgSession.ReceiveAsync(opts.ReceiveTimeOut).ContinueWith(msgHandler); // repeat
});
msgSession.ReceiveAsync(opts.ReceiveTimeOut).ContinueWith(msgHandler); // start listening
});
for (Int32 nIndex = 0; nIndex < opts.NumberOfSesssions; nIndex++)
{
sc.AcceptMessageSessionAsync().ContinueWith(sessionAction);
}
}
}