We have created a message queue system using priority messages. The idea behind this code is as follows:
- We have a list of clients.
- All clients know if (and what kind of priority) messages they have available for sending.
- A client needs to send their higher priority message first.
- All clients need to keep sending, if there are clients with lower priority messages these clients also need time to send but higher priority is still more important.
We have the following mechanism:
- We look for the client that is next in line with a priority message. We send send this high priority message.
- We look (different counter) for the next in line with any message while priority doesn't matter. We send this message.
For example, in step 1, we find Client b to be the first to have a high priority message and we send it.
Then step 2 finds Client A to have a low priority message and we send this message.
Then back to step 1. Client B still has another priority message but we are going for the next one so all clients will have the opportunity to send, so Client C's high priority message is now sent.
Then in step 2 again, we have to send a message for Client A so we are now going for the next Client which is B. As we said, Client B still has a high priority message so we send this message.
This keeps going on and on and on.
/// <summary>
/// Background worker that handles the polling of the modules for sending their messages.
/// </summary>
private void CommunicationProcess()
{
mLogger.DebugFormat(
"CommunicationProcess(): Starting thread for communication on {0}.",
mConnectionString);
var moduleWalker = mModuleConnections.GetEnumerator();
string currentModule = string.Empty;
bool processStepOverOnce = false;
while (mStopCommunication == false)
{
if (mModuleConnections.Count > 0)
{
try
{
processStepOverOnce = !processStepOverOnce;
// Step over once is used to process also other
// modules and not only one module with messages.
if (processStepOverOnce == false)
{
foreach (PriorityOfMessage priority in Enum.GetValues(typeof(PriorityOfMessage)))
{
currentModule = this.SearchNextModuleWithMessages(currentModule, priority);
if (string.IsNullOrEmpty(currentModule) == false)
{
break;
}
}
processStepOverOnce = string.IsNullOrEmpty(currentModule);
}
if (processStepOverOnce == true)
{
if (moduleWalker.MoveNext() == false)
{
moduleWalker = mModuleConnections.GetEnumerator();
moduleWalker.MoveNext();
}
currentModule = ((ModuleInformation)moduleWalker.Current).Address;
}
if (mStopCommunication == false)
{
var currentConnection = mModuleConnections.Find(currentModule);
if (currentConnection != null)
{
if (currentConnection.MessagesAvailable == true)
{
this.CommunicateToModule(currentConnection);
}
}
}
Thread.Sleep(1);
}
catch (Exception exception)
{
mPropagateExceptionDelegate("CommunicationProcess", exception);
}
}
else
{
Thread.Sleep(1000);
}
}
mStopCommunication = false;
mLogger.DebugFormat(
"CommunicationProcess(): Thread for splitterlayer communication on {0} stopped.",
mConnectionString);
}
/// <summary>
/// Communicate to a module, by sending the messages maintaining for that module.
/// </summary>
/// <param name="moduleState">The module to communicate with.</param>
private void CommunicateToModule(ModuleInformationTCTcp moduleState)
{
if (moduleState.ConnectionState == ConnectionState.Connected)
{
var messageParts = FillMessagesBySize(moduleState);
if (messageParts != null)
{
long splitteraddress = ConnectionConvertions.TCAddress2SplitterAddress(moduleState.Address);
if (mConnection.SendMessage(splitteraddress, messageParts) == true)
{
lock (moduleState.SynchRoot)
{
if (moduleState.SendingMessages.Count > 0)
{
try
{
var messages = new List<PriorityMessage>(moduleState.SendingMessages);
moduleState.SendingMessages.Clear();
foreach (var message in messages)
{
mLogger.InfoFormat(
"CommunicateToModule(): Message {0} is sent for module {1}.",
message.Message,
moduleState.Address);
message.SendingState = SendingState.Sent;
}
this.MessagesSent(moduleState.Address, messages);
}
catch (Exception exception)
{
mLogger.InfoFormat(
"CommunicateToModule(): EXCEPTION",
exception);
}
}
}
}
else
{
mLogger.ErrorFormat(
"CommunicateToModule(): Error, could not sent messages for module {0}.",
moduleState.Address);
}
}
}
}
/// <summary>
/// Search the next module that's having data to be send to a module.
/// </summary>
/// <param name="lastModule">The former and so latest module that had messages.</param>
/// <param name="priority">The usage of priority messages.</param>
/// <returns>The next module with messages.</returns>
private string SearchNextModuleWithMessages(string lastModule, PriorityOfMessage priority)
{
string address = string.Empty;
var modulesFound = mModuleConnections
.AddressesInformation
.Where(module => (module.QueuedMessages[priority].Count > 0) || (module.SendingMessages.Count > 0))
.Select(module => module.Address)
.ToList();
if (modulesFound.Count() > 0)
{
address = modulesFound.Where(last => string.Compare(last, lastModule) > 1).FirstOrDefault();
if (string.IsNullOrEmpty(address) == true)
{
address = modulesFound.FirstOrDefault();
}
}
return address;
}
/// <summary>
/// Create the message to be send for the current module.
/// </summary>
/// <param name="moduleState">The current module polled.</param>
/// <returns>The message to be sent for the module polled.</returns>
private static MessageParts FillMessagesBySize(ModuleInformationTCTcp moduleState)
{
var mplMessages = new MessageParts();
var parts = 0;
var capacityAvailable = moduleState.CapacityAvailable;
if (capacityAvailable > 250)
{
capacityAvailable = moduleState.CapacityAvailable / 3;
if (capacityAvailable < 250)
{
capacityAvailable = moduleState.CapacityAvailable / 2;
if (capacityAvailable < 250)
{
capacityAvailable = moduleState.CapacityAvailable;
}
}
var sizeOfBuffer = capacityAvailable;
lock (moduleState.SynchRoot)
{
moduleState.SendingMessages.ForEach(message => message.SendingState = SendingState.Unknown);
moduleState.QueuedMessages[PriorityOfMessage.High].InsertRange(0, moduleState.SendingMessages);
moduleState.SendingMessages.Clear();
}
if (moduleState.MessagesAvailable == true)
{
PriorityMessage messageToSend = null;
var continueProcess = true;
var sizeOverflow = false;
while ((sizeOfBuffer > 100) & (continueProcess == true) & (sizeOverflow == false) & (parts < 5))
{
messageToSend = null;
foreach (PriorityOfMessage priority in Enum.GetValues(typeof(PriorityOfMessage)))
{
messageToSend = moduleState.QueuedMessages[priority].FirstOrDefault();
if (messageToSend != null)
{
if (messageToSend.IsValid == true)
{
messageToSend.SendingState = SendingState.Sending;
}
else
{
messageToSend.SendingState = SendingState.TimeOut;
}
if (messageToSend.SendingState == SendingState.Sending)
{
if (sizeOfBuffer - ConnectionConvertions.LengthOfMessage(messageToSend.Message) > 100)
{
lock (moduleState.SynchRoot)
{
moduleState.QueuedMessages[priority].Remove(messageToSend);
moduleState.SendingMessages.Add(messageToSend);
}
}
else
{
messageToSend = null;
sizeOverflow = true;
}
}
else
{
lock (moduleState.SynchRoot)
{
moduleState.QueuedMessages[priority].Remove(messageToSend);
moduleState.SendingMessages.Add(messageToSend);
}
messageToSend = null;
}
break;
}
}
if (messageToSend != null)
{
if (messageToSend.Message != null)
{
if (string.IsNullOrEmpty(messageToSend.Message.ToString()) == false)
{
var message = messageToSend.Message.ToString();
if (message == "TS")
{
var now = DateTime.Now;
message = string.Format(
"{0}{1}{2}{3}",
message,
now.ToString("yyMMdd"),
(int)now.DayOfWeek,
now.ToString("HHmmss"));
}
// The TCTCP variant has no necessary to decorate the messages.
mplMessages.AddMessagePart(ConnectionConvertions.MessageType2MessagePartType(messageToSend.Message), message);
parts++;
sizeOfBuffer = capacityAvailable - mplMessages.Length;
}
}
}
continueProcess = moduleState.QueuedMessages.Available;
}
}
}
if (mplMessages.Length == 0)
{
return null;
}
else
{
return mplMessages;
}
}
The code is a bit of legacy so I think there might be some performance gain, but I cannot really find it myself, so I am curious what the community thinks of this priority message handler.