Take the 2-minute tour ×
Code Review Stack Exchange is a question and answer site for peer programmer code reviews. It's 100% free, no registration required.

I started a C++11 library of concurrency primitives in order to

  1. study and compare their performance;
  2. provide high-quality implementation of those to use in my projects.

Its main target platform is Linux x86-64. It relies upon the futex() system call for some of its functionality. Other parts of the library are platform-independent though.

The full source code of the library could be found here.

First, there are 3 families of synchronisation objects: wrappers for standard C++ library mutex and condition variables, then similar pthread-based objects, and finally futex-based objects. They share a common set of methods and thus are interchangeable as template parameters.

#ifndef EVENK_SYNCH_H_
#define EVENK_SYNCH_H_

#include <thread>
#include <mutex>
#include <condition_variable>
#include <system_error>

#include <pthread.h>

#include "evenk/backoff.h"
#include "evenk/futex.h"

namespace ev {
namespace concurrency {

//
// Mutexes
//

class StdMutex : public std::mutex {
 public:
  void Lock() { lock(); }
  void Unlock() { unlock(); }
};

class PosixMutex {
 public:
  PosixMutex() noexcept : mutex_(PTHREAD_MUTEX_INITIALIZER) {}

  PosixMutex(const PosixMutex&) = delete;
  PosixMutex& operator=(const PosixMutex&) = delete;

  ~PosixMutex() noexcept { pthread_mutex_destroy(&mutex_); }

  void Lock() {
    int ret = pthread_mutex_lock(&mutex_);
    if (ret)
      throw std::system_error(ret, std::system_category(),
                              "pthread_mutex_lock()");
  }

  void Unlock() {
    int ret = pthread_mutex_unlock(&mutex_);
    if (ret)
      throw std::system_error(ret, std::system_category(),
                              "pthread_mutex_unlock()");
  }

 private:
  friend class PosixCondVar;

  pthread_mutex_t mutex_;
};

class FutexLock {
 public:
  FutexLock() noexcept : futex_(0) {}

  FutexLock(const FutexLock&) = delete;
  FutexLock& operator=(const FutexLock&) = delete;

  void Lock() { Lock(NoBackoff{}); }

  template <typename Backoff>
  void Lock(Backoff backoff) {
    for (std::uint32_t value = 0; !futex_.compare_exchange_strong(
             value, 1, std::memory_order_acquire, std::memory_order_relaxed);
         value = 0) {
      if (backoff()) {
        if (value == 2 || futex_.exchange(2, std::memory_order_acquire)) {
          do
            futex_wait(futex_, 2);
          while (futex_.exchange(2, std::memory_order_acquire));
        }
        break;
      }
    }
  }

  void Unlock() {
    if (futex_.fetch_sub(1, std::memory_order_release) != 1) {
      futex_.store(0, std::memory_order_relaxed);
      ev::futex_wake(futex_, 1);
    }
  }

 private:
  friend class FutexCondVar;

  std::atomic<std::uint32_t> futex_;
};

//
// Lock Guard
//

template <typename LockType>
class LockGuard {
 public:
  LockGuard(LockType& lock) : lock_ptr_(&lock), owns_lock_(false) { Lock(); }

  template <typename Backoff>
  LockGuard(LockType& lock, Backoff backoff)
      : lock_ptr_(&lock), owns_lock_(false) {
    Lock(backoff);
  }

  LockGuard(LockType& lock, std::adopt_lock_t) noexcept : lock_ptr_(&lock),
                                                          owns_lock_(true) {}

  LockGuard(LockType& lock, std::defer_lock_t) noexcept : lock_ptr_(&lock),
                                                          owns_lock_(false) {}

  LockGuard(const LockGuard&) = delete;
  LockGuard& operator=(const LockGuard&) = delete;

  ~LockGuard() {
    if (owns_lock_) lock_ptr_->Unlock();
  }

  void Lock() {
    lock_ptr_->Lock();
    owns_lock_ = true;
  }

  template <typename Backoff>
  void Lock(Backoff backoff) {
    lock_ptr_->Lock(backoff);
    owns_lock_ = true;
  }

  void Unlock() {
    lock_ptr_->Unlock();
    owns_lock_ = false;
  }

  LockType* GetLockPtr() { return lock_ptr_; }

  bool OwnsLock() { return owns_lock_; }

 private:
  LockType* lock_ptr_;
  bool owns_lock_;
};

//
// Condition Variables
//

class StdCondVar : public std::condition_variable {
 public:
  void Wait(LockGuard<StdMutex>& guard) {
    std::unique_lock<std::mutex> lock(*guard.GetLockPtr(), std::adopt_lock);
    wait(lock);
    lock.release();
  }

  void NotifyOne() { notify_one(); }
  void NotifyAll() { notify_all(); }
};

class PosixCondVar {
 public:
  PosixCondVar() noexcept : condition_(PTHREAD_COND_INITIALIZER) {}

  PosixCondVar(const PosixCondVar&) = delete;
  PosixCondVar& operator=(const PosixCondVar&) = delete;

  ~PosixCondVar() noexcept { pthread_cond_destroy(&condition_); }

  void Wait(LockGuard<PosixMutex>& guard) {
    int ret = pthread_cond_wait(&condition_, &guard.GetLockPtr()->mutex_);
    if (ret)
      throw std::system_error(ret, std::system_category(),
                              "pthread_cond_wait()");
  }

  void NotifyOne() {
    int ret = pthread_cond_signal(&condition_);
    if (ret)
      throw std::system_error(ret, std::system_category(),
                              "pthread_cond_signal()");
  }

  void NotifyAll() {
    int ret = pthread_cond_broadcast(&condition_);
    if (ret)
      throw std::system_error(ret, std::system_category(),
                              "pthread_cond_broadcast()");
  }

 private:
  pthread_cond_t condition_;
};

class FutexCondVar {
 public:
  FutexCondVar() noexcept : futex_(0), count_(0), owner_(nullptr) {}

  FutexCondVar(const FutexCondVar&) = delete;
  FutexCondVar& operator=(const FutexCondVar&) = delete;

  void Wait(LockGuard<FutexLock>& guard) {
    FutexLock* owner = guard.GetLockPtr();
    if (owner_ != nullptr && owner_ != owner)
      throw std::invalid_argument(
          "different locks used for the same condition variable.");
    owner_.store(owner, std::memory_order_relaxed);

    count_.fetch_add(1, std::memory_order_relaxed);
    std::atomic_thread_fence(std::memory_order_acq_rel);
    std::uint32_t value = futex_.load(std::memory_order_relaxed);

    owner->Unlock();

    ev::futex_wait(futex_, value);

    count_.fetch_sub(1, std::memory_order_relaxed);
    while (owner->futex_.exchange(2, std::memory_order_acquire))
      futex_wait(owner->futex_, 2);
  }

  void NotifyOne() {
    futex_.fetch_add(1, std::memory_order_acquire);
    if (count_.load(std::memory_order_relaxed)) ev::futex_wake(futex_, 1);
  }

  void NotifyAll() {
    futex_.fetch_add(1, std::memory_order_acquire);
    if (count_.load(std::memory_order_relaxed)) {
      FutexLock* owner = owner_.load(std::memory_order_relaxed);
      if (owner) ev::futex_requeue(futex_, 1, INT_MAX, owner->futex_);
    }
  }

 private:
  std::atomic<std::uint32_t> futex_;
  std::atomic<std::uint32_t> count_;
  std::atomic<FutexLock*> owner_;
};

//
// Synchronization Traits
//

class StdSynch {
 public:
  using LockType = StdMutex;
  using CondVarType = StdCondVar;
};

class PosixSynch {
 public:
  using LockType = PosixMutex;
  using CondVarType = PosixCondVar;
};

class FutexSynch {
 public:
  using LockType = FutexLock;
  using CondVarType = FutexCondVar;
};

#if __linux__
using DefaultSynch = FutexSynch;
#else
using DefaultSynch = StdSynch;
#endif

}  // namespace concurrency
}  // namespace ev

#endif  // !EVENK_SYNCH_H_

A simple concurrent queue on top of the standard deque and above synchronisation primitives is defined as follows:

#ifndef EVENK_QUEUE_H_
#define EVENK_QUEUE_H_

#include <deque>

#include "evenk/synch.h"

namespace ev {
namespace concurrency {

template <typename ValueType, typename SynchPolicy = DefaultSynch,
          typename Sequence = std::deque<ValueType>>
class Queue {
 public:
  Queue() noexcept : finish_(false) {}

  Queue(Queue&& other) noexcept : finish_(other.finish_) {
    std::swap(queue_, other.queue_);
  }

  bool Empty() const {
    LockGuard<LockType> guard(lock_);
    return queue_.empty();
  }

  bool Finished() const { return finish_; }

  void Finish() {
    LockGuard<LockType> guard(lock_);
    finish_ = true;
    cond_.NotifyAll();
  }

  template <typename... Backoff>
  void Enqueue(ValueType&& data, Backoff... backoff) {
    LockGuard<LockType> guard(lock_, std::forward<Backoff>(backoff)...);
    queue_.push_back(std::move(data));
    cond_.NotifyOne();
  }

  template <typename... Backoff>
  void Enqueue(const ValueType& data, Backoff... backoff) {
    LockGuard<LockType> guard(lock_, std::forward<Backoff>(backoff)...);
    queue_.push_back(data);
    cond_.NotifyOne();
  }

  template <typename... Backoff>
  bool Dequeue(ValueType& data, Backoff... backoff) {
    LockGuard<LockType> guard(lock_, std::forward<Backoff>(backoff)...);
    while (queue_.empty()) {
      if (Finished()) return false;
      cond_.Wait(guard);
    }
    data = std::move(queue_.front());
    queue_.pop_front();
    return true;
  }

 private:
  using LockType = typename SynchPolicy::LockType;
  using CondVarType = typename SynchPolicy::CondVarType;

  bool finish_;
  LockType lock_;
  CondVarType cond_;
  Sequence queue_;
};

}  // namespace concurrency
}  // namespace ev

#endif  // !EVENK_QUEUE_H_

Also there is a bounded queue that is normally faster than the standard queue. The bounded queue has a number of separate slots. For each enqueue or dequeue operation a slot is assigned using atomic tail and head counters. There are several methods to synchronise access to slots. One is based on busy waiting, another on mutexes and condition variables, and another on direct use of futexes.

#ifndef EVENK_BOUNDED_QUEUE_H_
#define EVENK_BOUNDED_QUEUE_H_

#include <atomic>
#include <cstdint>
#include <cstdlib>
#include <stdexcept>
#include <thread>

#include "evenk/backoff.h"
#include "evenk/basic.h"
#include "evenk/futex.h"
#include "evenk/synch.h"

namespace ev {
namespace concurrency {

struct BoundedQueueSlotBase {
 public:
  void Initialize(std::uint32_t value) {
    ticket_.store(value, std::memory_order_relaxed);
  }

  std::uint32_t Load() const { return ticket_.load(std::memory_order_acquire); }

  void Store(std::uint32_t value) {
    ticket_.store(value, std::memory_order_release);
  }

 protected:
  std::atomic<std::uint32_t> ticket_;
};

class BoundedQueueNoWait : public BoundedQueueSlotBase {
 public:
  std::uint32_t WaitAndLoad(std::uint32_t) { return Load(); }

  void StoreAndWake(std::uint32_t value) { Store(value); }

  void Wake() {}
};

class BoundedQueueYieldWait : public BoundedQueueSlotBase {
 public:
  std::uint32_t WaitAndLoad(std::uint32_t) {
    std::this_thread::yield();
    return Load();
  }

  void StoreAndWake(std::uint32_t value) { Store(value); }

  void Wake() {}
};

class BoundedQueueFutexWait : public BoundedQueueSlotBase {
 public:
  std::uint32_t WaitAndLoad(std::uint32_t value) {
    wait_count_.fetch_add(1, std::memory_order_relaxed);
    ev::futex_wait(ticket_, value);  // Presuming this is a full memory fence.
    wait_count_.fetch_sub(1, std::memory_order_relaxed);
    return Load();
  }

  void StoreAndWake(std::uint32_t value) {
    Store(value);
    std::atomic_thread_fence(std::memory_order_seq_cst);
    if (wait_count_.load(std::memory_order_relaxed)) Wake();
  }

  void Wake() { futex_wake(ticket_, INT32_MAX); }

 private:
  std::atomic<std::uint32_t> wait_count_ = ATOMIC_VAR_INIT(0);
};

template <typename Synch = DefaultSynch>
class BoundedQueueSynchWait : public BoundedQueueSlotBase {
 public:
  std::uint32_t WaitAndLoad(std::uint32_t value) {
    LockGuard<LockType> guard(lock_);
    std::uint32_t current_value = ticket_.load(std::memory_order_relaxed);
    if (current_value == value) {
      cond_.Wait(guard);
      current_value = ticket_.load(std::memory_order_relaxed);
    }
    return current_value;
  }

  void StoreAndWake(std::uint32_t value) {
    LockGuard<LockType> guard(lock_);
    ticket_.store(value, std::memory_order_relaxed);
    cond_.NotifyAll();
  }

  void Wake() {
    LockGuard<LockType> guard(lock_);
    cond_.NotifyAll();
  }

 private:
  using LockType = typename Synch::LockType;
  using CondVarType = typename Synch::CondVarType;

  LockType lock_;
  CondVarType cond_;
};

template <typename ValueType, typename WaitType = BoundedQueueNoWait>
class BoundedQueue {
 public:
  BoundedQueue(std::uint32_t size)
      : ring_{nullptr}, mask_{size - 1}, finish_{false}, head_{0}, tail_{0} {
    if (size == 0 || (size & mask_) != 0)
      throw std::invalid_argument("BoundedQueue size must be a power of two");

    void* ring;
    if (::posix_memalign(&ring, ev::kCacheLineSize, size * sizeof(Slot)))
      throw std::bad_alloc();

    ring_ = new (ring) Slot[size];
    for (std::uint32_t i = 0; i < size; i++) ring_[i].Initialize(i);
  }

  BoundedQueue(BoundedQueue&& other) noexcept : ring_{other.ring_},
                                                mask_{other.mask_},
                                                finish_{false},
                                                head_{0},
                                                tail_{0} {
    other.ring_ = nullptr;
  }

  BoundedQueue(BoundedQueue const&) = delete;
  BoundedQueue& operator=(BoundedQueue const&) = delete;

  ~BoundedQueue() { Destroy(); }

  bool Empty() const {
    int64_t head = head_.load(std::memory_order_relaxed);
    int64_t tail = tail_.load(std::memory_order_relaxed);
    return (tail <= head);
  }

  bool Finished() const { return finish_.load(std::memory_order_relaxed); }

  void Finish() {
    finish_.store(true, std::memory_order_relaxed);
    for (std::uint32_t i = 0; i < mask_ + 1; i++) ring_[i].Wake();
  }

  template <typename... Backoff>
  void Enqueue(ValueType&& value, Backoff... backoff) {
    const std::uint64_t tail = tail_.fetch_add(1, std::memory_order_seq_cst);
    Slot& slot = ring_[tail & mask_];
    WaitTail(slot, tail, std::forward<Backoff>(backoff)...);
    slot.value = std::move(value);
    WakeHead(slot, tail + 1);
  }

  template <typename... Backoff>
  void Enqueue(const ValueType& value, Backoff... backoff) {
    const std::uint64_t tail = tail_.fetch_add(1, std::memory_order_seq_cst);
    Slot& slot = ring_[tail & mask_];
    WaitTail(slot, tail, std::forward<Backoff>(backoff)...);
    slot.value = value;
    WakeHead(slot, tail + 1);
  }

  template <typename... Backoff>
  bool Dequeue(ValueType& value, Backoff... backoff) {
    const std::uint64_t head = head_.fetch_add(1, std::memory_order_relaxed);
    Slot& slot = ring_[head & mask_];
    if (!WaitHead(slot, head + 1, std::forward<Backoff>(backoff)...))
      return false;
    value = std::move(slot.value);
    WakeTail(slot, head + mask_ + 1);
    return true;
  }

 private:
  struct alignas(ev::kCacheLineSize) Slot : public WaitType {
    ValueType value;
  };

  void Destroy() {
    if (ring_ != nullptr) {
      std::uint32_t size = mask_ + 1;
      for (std::uint32_t i = 0; i < size; i++) ring_[i].~Slot();
      std::free(ring_);
    }
  }

  void WaitTail(Slot& slot, std::uint64_t required_ticket) {
    std::uint32_t current_ticket = slot.Load();
    while (current_ticket != std::uint32_t(required_ticket)) {
      current_ticket = slot.WaitAndLoad(current_ticket);
    }
  }

  template <typename Backoff>
  void WaitTail(Slot& slot, std::uint64_t required_ticket, Backoff backoff) {
    bool waiting = false;
    std::uint32_t current_ticket = slot.Load();
    while (current_ticket != std::uint32_t(required_ticket)) {
      if (waiting) {
        current_ticket = slot.WaitAndLoad(current_ticket);
      } else {
        waiting = backoff();
        current_ticket = slot.Load();
      }
    }
  }

  bool WaitHead(Slot& slot, std::uint64_t required_ticket) {
    std::uint32_t current_ticket = slot.Load();
    while (current_ticket != std::uint32_t(required_ticket)) {
      if (Finished()) {
        std::uint64_t tail = tail_.load(std::memory_order_seq_cst);
        if (required_ticket >= tail) return false;
      }
      current_ticket = slot.WaitAndLoad(current_ticket);
    }
    return true;
  }

  template <typename Backoff>
  bool WaitHead(Slot& slot, std::uint64_t required_ticket, Backoff backoff) {
    bool waiting = false;
    std::uint32_t current_ticket = slot.Load();
    while (current_ticket != std::uint32_t(required_ticket)) {
      if (Finished()) {
        std::uint64_t tail = tail_.load(std::memory_order_seq_cst);
        if (required_ticket >= tail) return false;
      }
      if (waiting) {
        current_ticket = slot.WaitAndLoad(current_ticket);
      } else {
        waiting = backoff();
        current_ticket = slot.Load();
      }
    }
    return true;
  }

  void WakeHead(Slot& slot, std::uint32_t new_ticket) {
    slot.StoreAndWake(new_ticket);
  }

  void WakeTail(Slot& slot, std::uint32_t new_ticket) {
    slot.StoreAndWake(new_ticket);
  }

  Slot* ring_;
  const std::uint32_t mask_;

  std::atomic<bool> finish_;

  alignas(ev::kCacheLineSize) std::atomic<std::uint64_t> head_;
  alignas(ev::kCacheLineSize) std::atomic<std::uint64_t> tail_;
};

template <typename ValueType>
using DefaultBoundedQueue = BoundedQueue<ValueType, BoundedQueueNoWait>;

}  // namespace concurrency
}  // namespace ev

#endif  // !EVENK_BOUNDED_QUEUE_H_

A benchmark for different variants of queues is available on GitHub.

share|improve this question
1  
Why would you not just use the version that is provided by the compiler. You would expect the implementation to use the most efficient technique (and if it does not then it will eventually and your code gets upgraded for free with a re-compile). –  Loki Astari Apr 23 at 17:09
    
I implemented synchronisation mechanisms in a very flexible way to experiment with them and find the best choice for particular usage scenario. The solution provided by the compiler does not have to be and cannot be the best in every possible case. It just has to be good enough for average use. If you run the provided benchmark you could see that mechanisms based on busy-waiting and futexes significantly outperform std::mutex and std::condition_variable in certain cases. So if you have an application that suffers from poor multi-threading performance then using my library might be an answer. –  Aleksey Demakov Apr 23 at 17:56
    
Why are you not using std::future and std::promise. –  Loki Astari Apr 23 at 17:59
    
std::future and std::promise use hidden behind the scenes atomic operations and memory allocation. This could affect performance of very high-load applications. I will probably provide in the future a convenient higher-level interface that would use these objects. So a user will have a choice between familiar high-level interface and extremely efficient low-level interface. –  Aleksey Demakov Apr 23 at 18:11
    
@AlekseyDemakov I may be wong but I think that libstdc++ uses futexes to implement the concurrency features of the standard library. –  Morwenn May 25 at 12:49

Your Answer

 
discard

By posting your answer, you agree to the privacy policy and terms of service.

Browse other questions tagged or ask your own question.