I've written concurrent queue based on std::queue
.
#include <queue>
#include <shared_mutex>
#include <condition_variable>
template <typename T, typename S = typename std::queue<T>::container_type>
class concurrent_queue {
// no std::shared_mutex yet
using mutex_type = std::shared_timed_mutex;
public:
explicit concurrent_queue(const S &sequence)
: queue_(sequence) { }
explicit concurrent_queue(S &&sequence = S())
: queue_(std::move(sequence)) { }
concurrent_queue(const concurrent_queue &other)
{
std::shared_lock<mutex_type> other_lock(other.mutex_);
queue_ = other.queue_;
}
concurrent_queue(concurrent_queue &&other)
{
std::unique_lock<mutex_type> lock(mutex_, std::defer_lock);
std::shared_lock<mutex_type> other_lock(other.mutex_, std::defer_lock);
std::lock(lock, other_lock);
queue_ = std::move(other.queue_);
}
concurrent_queue &operator=(const concurrent_queue &other)
{
if (this != &other) {
std::unique_lock<mutex_type> lock(mutex_, std::defer_lock);
std::shared_lock<mutex_type> other_lock(other.mutex_, std::defer_lock);
std::lock(lock, other_lock);
queue_ = other.queue_;
}
return *this;
}
concurrent_queue &operator=(concurrent_queue &&other)
{
if (this != &other) { // not needed?
std::unique_lock<mutex_type> lock(mutex_, std::defer_lock);
std::shared_lock<mutex_type> other_lock(other.mutex_, std::defer_lock);
std::lock(lock, other_lock);
queue_ = std::move(other.queue_);
}
return *this;
}
~concurrent_queue() = default;
T pop()
{
std::unique_lock<mutex_type> lock(mutex_);
condition_variable_.wait(lock, [this] { return !queue_.empty(); });
T item = queue_.front(); // use auto?
queue_.pop();
return item;
}
bool pop(T &item)
{
std::unique_lock<mutex_type> lock(mutex_);
if (queue_.empty()) {
return false;
} else {
item = queue_.front();
queue_.pop();
return true;
}
}
void push(const T &item)
{
std::unique_lock<mutex_type> lock(mutex_);
queue_.push(item);
lock.unlock();
condition_variable_.notify_one();
}
void push(T &&item)
{
std::unique_lock<mutex_type> lock(mutex_);
queue_.push(std::move(item));
lock.unlock();
condition_variable_.notify_one();
}
template <typename ...Args>
void emplace(Args &&... args)
{
std::unique_lock<mutex_type> lock(mutex_);
queue_.emplace(std::forward<Args>(args)...);
lock.unlock();
condition_variable_.notify_one();
}
private:
mutable mutex_type mutex_;
std::condition_variable_any condition_variable_;
std::queue<T, S> queue_;
};
Things I'd like to know about this code:
- Possible deadlocks - it seems to be working fine in my program, but I'm only using one producer and one consumer. (see #3)
- Exception safety - I guess
pop
methods can throw, but I don't see how this could be avoided without hacks like external locking (lock
-front
-pop
-unlock
) - Testing - I've done some really basic tests, things like multiple readers and writers. It has never crashed but I don't really know how to test it.
- Anything that comes to your mind.