I have some kind of high thoughput message handler receiving ordered messages. Now the task is - adding monitoring stuff in it in order to recognize wrong input messages, more exactly: it must notify about missed messages and just ignore the stale ones (with smaller order).
Due to high thoughput - it mustn't block.
So it must:
- not use locks
- track for missing messages
- track for stale messages and not let them break the order
Let's have some long counter in message for ordering - here is my version keeping in mind possible ABA issues:
public class OrderedNonBlockingProcessor implements YetAnotherMessageHandler {
private static Logger log = Logger.getLogger(OrderedNonBlockingProcessor.class);
private final AtomicStampedReference<Long> messageCounter = new AtomicStampedReference<Long>((long)0, 0);
@Override
public boolean onSomeHighThroughputMessage(final YetAnotherMessage message) {
final StampedReferencePairPub current = getAndSetMessageCounter(message.getCounter());
final long expectedCounter = current.ref + 1;
if (expectedCounter == message.getCounter()) {
processBusinessStuff(message);
return true;
} else if (expectedCounter > message.getCounter()) {
/* ignore stale message: attempt to restore the sequence to prevent an error on next good message
*/
final int expectedStamp = current.stamp + 1;
boolean restored = messageCounter.compareAndSet(message.getCounter(), current.ref, expectedStamp, expectedStamp + 1);
log.error(String.format("messaging system ordering bug: got stale message %s while expected %s! Sequence restored: %s",
message.getCounter(), expectedCounter, restored));
// some other notifying stuff...
} else if (expectedCounter < message.getCounter()) {
log.error(String.format("got forward message %s while expected %s, probably missed: %s",
message.getCounter(), expectedCounter, message.getCounter() - expectedCounter));
// some other notifying stuff...
}
return false;
}
private void processBusinessStuff(YetAnotherMessage message) {
log.info(String.format("process message %s", message.getCounter()));
// some business logic...
}
private StampedReferencePairPub getAndSetMessageCounter(final long newValue) {
while (true) {
StampedReferencePairPub current = new StampedReferencePairPub(messageCounter.getReference(), messageCounter.getStamp());
if (messageCounter.compareAndSet(current.ref, newValue, current.stamp, current.stamp + 1))
return current;
}
}
public static class StampedReferencePairPub {
public final long ref;
public final int stamp;
StampedReferencePairPub(long r, int i) {
ref = r; stamp = i;
}
}
}
One problem left - false warning in case of concurrent messages after stale one: consider a sequence: 1,3,2,4 - there will be one "stale message" warning (correct) and can be one more "missing messages" on 4th one - the latter be the false notification.
Are there any ways to avoid it?