Tell me more ×
Code Review Stack Exchange is a question and answer site for peer programmer code reviews. It's 100% free, no registration required.

I have some kind of high thoughput message handler receiving ordered messages. Now the task is - adding monitoring stuff in it in order to recognize wrong input messages, more exactly: it must notify about missed messages and just ignore the stale ones (with smaller order).

Due to high thoughput - it mustn't block.

So it must:

  1. not use locks
  2. track for missing messages
  3. track for stale messages and not let them break the order

Let's have some long counter in message for ordering - here is my version keeping in mind possible ABA issues:

public class OrderedNonBlockingProcessor implements YetAnotherMessageHandler {
private static Logger log = Logger.getLogger(OrderedNonBlockingProcessor.class);

private final AtomicStampedReference<Long> messageCounter = new AtomicStampedReference<Long>((long)0, 0);


@Override
public boolean onSomeHighThroughputMessage(final YetAnotherMessage message) {
    final StampedReferencePairPub current = getAndSetMessageCounter(message.getCounter());
    final long expectedCounter = current.ref + 1;

    if (expectedCounter == message.getCounter()) {
        processBusinessStuff(message);
        return true;
    } else if (expectedCounter > message.getCounter()) {
        /* ignore stale message: attempt to restore the sequence to prevent an error on next good message
         */
        final int expectedStamp = current.stamp + 1;
        boolean restored = messageCounter.compareAndSet(message.getCounter(), current.ref, expectedStamp, expectedStamp + 1);

        log.error(String.format("messaging system ordering bug: got stale message %s while expected %s! Sequence restored: %s",
                message.getCounter(), expectedCounter, restored));

        // some other notifying stuff...

    } else if (expectedCounter < message.getCounter()) {
        log.error(String.format("got forward message %s while expected %s, probably missed: %s",
                message.getCounter(), expectedCounter, message.getCounter() - expectedCounter));

        // some other notifying stuff...

    }
    return false;
}

private void processBusinessStuff(YetAnotherMessage message) {
    log.info(String.format("process message %s", message.getCounter()));
    // some business logic...
}

private StampedReferencePairPub getAndSetMessageCounter(final long newValue) {
    while (true) {
        StampedReferencePairPub current = new StampedReferencePairPub(messageCounter.getReference(), messageCounter.getStamp());
        if (messageCounter.compareAndSet(current.ref, newValue, current.stamp, current.stamp + 1))
            return current;
    }
}

public static class StampedReferencePairPub {
    public final long ref;
    public final int stamp;

    StampedReferencePairPub(long r, int i) {
        ref = r; stamp = i;
    }
}

}

(Here is a junit for that)

One problem left - false warning in case of concurrent messages after stale one: consider a sequence: 1,3,2,4 - there will be one "stale message" warning (correct) and can be one more "missing messages" on 4th one - the latter be the false notification.

Are there any ways to avoid it?

share|improve this question
updated code, added ABA fix. – yetanothercoder Mar 30 '12 at 7:06

1 Answer

I think that there is a certain amount of fuzziness to the question but I'll make a couple of suggestions and perhaps they will help.

First of all, I would recommend giving serious consideration to using Akka actors (http://akka.io/). Actors are a very straightforward way to handle multi-threaded processing without locks.

Just to be clear, I think that you are saying that the program receives messages that have an order but does not necessarily receive them IN order and so it needs to restore the correct order before they can be processed.

If this is true, then I would recommend using TWO actors. The first actor would get the messages from wherever they are coming from and dealing with putting them into the correct order. I assume that it would be ok to cache out-of-order messages until it finds the next message to be processed, perhaps on a priority heap.

Once the first actor finds the next message to be processed, it would send it on to the second actor who would then handle the actual processing (by calling processBusinessStuff in your example).

share|improve this answer
I've heard about Akka - it's cool and fancy now, but for this task - seems overkill: I need just add some monitoring to existing handler. Anyway current solution with just 2 CAS instructions is simpler and in-place (no additional memory for cache or messsage queues) – yetanothercoder Mar 26 '12 at 10:50
Is the third paragraph of my answer a correct description of the nature of the problem? – Donald.McLean Mar 26 '12 at 12:48
not exactly: it must notify about missed messages and just ignore the stale ones (with smaller order) – yetanothercoder Mar 26 '12 at 13:03
You're right then. That's a much simpler problem. – Donald.McLean Mar 26 '12 at 13:07

Your Answer

 
discard

By posting your answer, you agree to the privacy policy and terms of service.

Not the answer you're looking for? Browse other questions tagged or ask your own question.