Tell me more ×
Code Review Stack Exchange is a question and answer site for peer programmer code reviews. It's 100% free, no registration required.

I've implemented concurrent_blocking_queue for multiple-producers and single consumer.

I intend it to block the producers as well, when the queue is full, which is why I set the capacity of the queue in the constructor. I've provided my own implementation of critical_section, scoped_lock and condition_variable, using msdn primitives.

Please review it. I want to know if there is any potential deadlocks.

template<typename T>
class concurrent_blocking_queue : public noncopyable
{
    std::queue<T> m_internal_queue;
    mutable critical_section m_critical_section;
    condition_variable m_queue_full;
    condition_variable m_queue_empty;
    const size_t m_capacity;
public:
    concurrent_blocking_queue(size_t capacity) 
        : m_capacity(capacity), m_critical_section()
    {
    }
    void put(T const & input)
    {
        framework::scoped_lock lock(m_critical_section); //lock
        m_queue_full.wait(lock, [&]{ return !full(); });
        m_internal_queue.push(input);
        m_queue_empty.notify_all();
    }
    T take()
    {
        framework::scoped_lock lock(m_critical_section); //lock
        m_queue_empty.wait(lock, [&]{ return !empty(); });
        T output = m_internal_queue.front();
        m_internal_queue.pop();
        m_queue_full.notify_all();
        return output;
    }
    bool full() const
    {
        framework::scoped_lock lock(m_critical_section);
        if ( m_internal_queue.size() > m_capacity )
        {
            throw std::logic_error("size of concurrent_blocking_queue cannot be greater than the capacity.");
        }
        return m_internal_queue.size() == m_capacity;
    }
    bool empty() const
    {
        framework::scoped_lock lock(m_critical_section);
        return m_internal_queue.empty();
    }
    //..
};

And the remaining classes are posted below. Note that I've implemented them in .h and .cpp files, but here I've merged them to make the post short and simple, and have removed the namespace for brevity, otherwise all classes are defined in framework namespace.

  • critical_section derives publicly from CRITICAL_SECTION.

    class critical_section : public CRITICAL_SECTION, public noncopyable
    {
     public:
        critical_section(void) { ::InitializeCriticalSection(this); }
        ~critical_section(void) { ::DeleteCriticalSection(this); }
        void lock() { ::EnterCriticalSection(this); }
        void unlock() { ::LeaveCriticalSection(this); }
    };
    
  • condition_variable derives privately from CONDITION_VARIABLE.

    class condition_variable : private CONDITION_VARIABLE, public noncopyable
    {
     public:
        condition_variable() { ::InitializeConditionVariable(this); }
        ~condition_variable(void) {}
        void wait(scoped_lock & lock)
        {
           if (!::SleepConditionVariableCS (this, &lock.m_cs, INFINITE))
           {
              DWORD error = ::GetLastError();
              std::string msg = format("SleepConditionVariableCS() failed with error code = 0x%X.", error);
              throw std::runtime_error(msg);
           }
        }
        void wait(scoped_lock & lock, std::function<bool()> predicate)
        {
           while( !predicate() )  {  wait(lock);  }
        }
        void notify_one() { ::WakeConditionVariable(this); }
        void notify_all() { ::WakeAllConditionVariable(this); }
    };
    
  • scoped_lock

    class scoped_lock : public noncopyable
    {
       friend class condition_variable;
       critical_section & m_cs;
     public:
       scoped_lock(critical_section & cs) : m_cs(cs) { m_cs.lock(); }
       ~scoped_lock(void) { m_cs.unlock(); }
    };
    
share|improve this question
Does the implementation you use for condition variables unlock the mutex lock in the call to wait() like it does in Posix? If not, then Ive located a dead-lock. – Brady May 25 '12 at 10:48
@Brady: As the MSDN doc says Sleeps on the specified condition variable and releases the specified critical section as an atomic operation, so yes, the condition variable unlocks the mutex in wait(). – Nawaz May 25 '12 at 11:07
ok, thanks. I wasnt sure about the MS implementation. Another question as you can see in my answer: are the locks recursive? If not, then there is a dead-lock. – Brady May 25 '12 at 11:09
@Brady: Yes, we can recursively lock CRITICAL_SECTION as per the MSDN. – Nawaz May 25 '12 at 11:10
One last comment, it seems odd that the threads should call m_queue_empty.notify_all() if no other threads are waiting. If that behavior is allowed/supported by MSDN, then the rest looks ok to me. – Brady May 25 '12 at 13:00

1 Answer

up vote 3 down vote accepted

Basic comments:

No point in functions that do nothing.
Also avoid void as definition for empty parameter list.

~condition_variable(void) {}

Why put a member that has no default constructor in the initializer list ( I can go either way)

concurrent_blocking_queue(size_t capacity) 
    : m_capacity(capacity), m_critical_section()

But you should be consistent and thus if you do it for one then you should do it for all members.

Design:

When you call full() and empty() from within other members you already have a lock. Yet you are calling a function that will add another level of locking. And locking is usually relatively expensive.

    m_queue_full.wait(lock, [&]{ return !full(); });

    m_queue_empty.wait(lock, [&]{ return !empty(); });

Thus I would change the above to call a private version of these function that do not use locks.

    // OLD STUFF
    m_queue_full.wait(lock, [&]{ return !test_full(); });

    // OLD STUFF
    m_queue_empty.wait(lock, [&]{ return !test_empty(); });

    bool full() const
    {
      framework::scoped_lock lock(m_critical_section);
      return test_full();
    }
    bool empty() const
    {
      framework::scoped_lock lock(m_critical_section);
      return test_empty();
    }

    private:
    bool test_full()  const {STUFF}
    bool test_empty() const {STUFF}
share|improve this answer
1) The destructor was generated by MSVS itself. I'm just too lazy to remove it from my code. 2) thanks for the idea of test_full() and test_empty(). They're good improvements to my code. 3) I assume there is no dead-lock in the implementation of the queue, as everyone here seems to have this opinion. – Nawaz Jun 4 '12 at 10:59

Your Answer

 
discard

By posting your answer, you agree to the privacy policy and terms of service.

Not the answer you're looking for? Browse other questions tagged or ask your own question.