MPMCQueue.h
A bounded multi-producer multi-consumer concurrent queue written in C++11.
It's battle hardened and used daily in production:
- In the Frostbite game engine developed by Electronic Arts for the following games:
- In the low latency trading infrastructure at Charlesworth Research and Marquette Partners.
It's been cited by the following papers:
- Peizhao Ou and Brian Demsky. 2018. Towards understanding the costs of avoiding out-of-thin-air results. Proc. ACM Program. Lang. 2, OOPSLA, Article 136 (October 2018), 29 pages. DOI: https://doi.org/10.1145/3276506
Example
MPMCQueue<int> q(10);
auto t1 = std::thread([&] {
int v;
q.pop(v);
std::cout << "t1 " << v << "\n";
});
auto t2 = std::thread([&] {
int v;
q.pop(v);
std::cout << "t2 " << v << "\n";
});
q.push(1);
q.push(2);
t1.join();
t2.join();Usage
-
MPMCQueue<T>(size_t capacity);Constructs a new
MPMCQueueholding items of typeTwith capacitycapacity. -
void emplace(Args &&... args);Enqueue an item using inplace construction. Blocks if queue is full.
-
bool try_emplace(Args &&... args);Try to enqueue an item using inplace construction. Returns
trueon success andfalseif queue is full. -
void push(const T &v);Enqueue an item using copy construction. Blocks if queue is full.
-
template <typename P> void push(P &&v);Enqueue an item using move construction. Participates in overload resolution only if
std::is_nothrow_constructible<T, P&&>::value == true. Blocks if queue is full. -
bool try_push(const T &v);Try to enqueue an item using copy construction. Returns
trueon success andfalseif queue is full. -
template <typename P> bool try_push(P &&v);Try to enqueue an item using move construction. Participates in overload resolution only if
std::is_nothrow_constructible<T, P&&>::value == true. Returnstrueon success andfalseif queue is full. -
void pop(T &v);Dequeue an item by copying or moving the item into
v. Blocks if queue is empty. -
bool try_pop(T &v);Try to dequeue an item by copying or moving the item into
v. Returntrueon sucess andfalseif the queue is empty.
All operations except construction and destruction are thread safe.
Implementation
Enqeue:
- Acquire next write ticket from head.
- Wait for our turn (2 * (ticket / capacity)) to write slot (ticket % capacity).
- Set turn = turn + 1 to inform the readers we are done writing.
Dequeue:
- Acquire next read ticket from tail.
- Wait for our turn (2 * (ticket / capacity) + 1) to read slot (ticket % capacity).
- Set turn = turn + 1 to inform the writers we are done reading.
References:
- Daniel Orozco, Elkin Garcia, Rishi Khan, Kelly Livingston, and Guang R. Gao. 2012. Toward high-throughput algorithms on many-core architectures. ACM Trans. Archit. Code Optim. 8, 4, Article 49 (January 2012), 21 pages. DOI: https://doi.org/10.1145/2086696.2086728
- Dave Dice. 2014. PTLQueue : a scalable bounded-capacity MPMC queue.
- Oleksandr Otenko. US 8607249 B2: System and method for efficient concurrent queue implementation.
- Massimiliano Meneghin, Davide Pasetto, Hubertus Franke. 2012. Performance evaluation of inter-thread communication mechanisms on multicore/multithreaded architectures. DOI: https://doi.org/10.1145/2287076.2287098
- Paul E. McKenney. 2010. Memory Barriers: a Hardware View for Software Hackers.
- Dmitry Vyukov. 2014. Bounded MPMC queue.
Testing
Testing concurrency algorithms is hard. I'm using two approaches to test the implementation:
- A single threaded test that the functionality works as intended, including that the element constructor and destructor is invoked correctly.
- A multithreaded fuzz test that all elements are enqueued and dequeued correctly under heavy contention.
TODO
- Add allocator supports so that the queue could be used with huge pages and shared memory
- Add benchmarks and compare to
boost::lockfree::queueand others - Use C++20 concepts instead of
static_assertif available - Use
std::hardware_destructive_interference_sizeif available - Add API for zero-copy deqeue and batch dequeue operations
About
This project was created by Erik Rigtorp <erik@rigtorp.se>.
