Code Review Stack Exchange is a question and answer site for peer programmer code reviews. It's 100% free, no registration required.

Sign up
Here's how it works:
  1. Anybody can ask a question
  2. Anybody can answer
  3. The best answers are voted up and rise to the top

(See the next iteration.)

I have this easy to use facility that maps input elements to output elements concurrently by the means of a thread pool:

concurrent.h:

#ifndef FORP_H
#define FORP_H

#include <functional>
#include <initializer_list>
#include <iostream>
#include <thread>
#include <vector>

namespace net {

    namespace coderodde {

        namespace concurrent {
            ////////////////////////////////////////////////////////////////////
             // This is an adhoc concurrent queue used by forp.              //
            ////////////////////////////////////////////////////////////////////
            template<class T>
            class queue
            {
                private:

                    struct queue_node
                    {
                        T           m_element;
                        size_t      m_element_index;
                        queue_node* m_next;

                        queue_node(const T& element, const size_t index) :
                            m_element{element},
                            m_element_index{index},
                            m_next{nullptr} 
                            {

                            }
                    };

                    std::mutex m_mutex;
                    queue_node* m_head;
                    queue_node* m_tail;

                public:

                    queue(std::initializer_list<T> list)
                    {
                        m_head = nullptr;
                        size_t index = 0;

                        for (const auto& element : list)
                        {
                            queue_node* new_node = new queue_node(element,
                                                                  index++);

                            if (m_head == nullptr)
                            {
                                m_head = new_node;
                                m_tail = new_node;
                            }
                            else
                            {
                                m_tail->m_next = new_node;
                                m_tail = new_node;
                            }
                        }
                    }

                    std::tuple<T, size_t, bool> dequeue() 
                    {
                        std::tuple<T, size_t, bool> ret;
                        m_mutex.lock();

                        if (m_head == nullptr)
                        {
                            // The queue is empty.
                            ret = std::make_tuple(T(), 0, false);
                        }
                        else 
                        {
                            ret = std::make_tuple(m_head->m_element,
                                                  m_head->m_element_index,
                                                  true);
                            m_head = m_head->m_next;
                        }

                        m_mutex.unlock();
                        return ret;
                    }
            };

            template<class In, class Out>
            void thread_do(net::coderodde::concurrent::queue<In>& input_queue,
                           Out (*process)(In in),
                           std::vector<Out>& output_vector)
            {
                while (true)
                {
                    std::tuple<In, size_t, bool> data = input_queue.dequeue();

                    if (std::get<2>(data) == false)
                    {
                        return;
                    }

                    const In input_element = std::get<0>(data);
                    const size_t input_element_index = std::get<1>(data);

                    Out output_element = process(input_element);
                    output_vector[input_element_index] = output_element;
                }
            }

            ////////////////////////////////////////////////////////////////////
              // This function template implements a concurrent, thread-pool-//
             // based iteration construct.                                  //
            ////////////////////////////////////////////////////////////////////
            template<class In, class Out>
            void forp(std::initializer_list<In>& input_list, 
                      Out (*process)(In in),
                      std::vector<Out>& output_vector)
            {
                unsigned thread_count = std::thread::hardware_concurrency();
                std::vector<std::thread> thread_vector;
                thread_vector.reserve(thread_count);
                net::coderodde::concurrent::queue<In> input_queue(input_list);
                output_vector.clear();
                output_vector.reserve(input_list.size());

                for (size_t i = 0; i < input_list.size(); ++i) 
                {
                    output_vector.push_back(Out());
                }

                for (unsigned i = 0; i < thread_count; ++i) 
                {
                    thread_vector.push_back(
                            std::thread(&thread_do<In, Out>, 
                                        std::ref(input_queue), 
                                        std::ref(process), 
                                        std::ref(output_vector)));
                }

                for (std::thread& thread : thread_vector)
                {
                    thread.join();
                }
            }

        } /* namespace concurrent */

    } /* namespace coderodde */

} /* namespace net */

#endif  /* FORP_H */

main.cpp:

#include "concurrent.h"
#include <chrono>
#include <cstdint>
#include <initializer_list>
#include <iostream>
#include <sstream>
#include <vector>

class CurrentTime {
    std::chrono::high_resolution_clock m_clock;

public:

    uint64_t milliseconds() 
    {
        return std::chrono
                  ::duration_cast<std::chrono
                                     ::milliseconds>
              (m_clock.now().time_since_epoch()).count();
    }
};

using net::coderodde::concurrent::forp;
using std::initializer_list;
using std::vector;
using std::cout;
using std::stringstream;

static uint64_t fibonacci(uint64_t n)
{
    if (n <= 0) 
    {
        return 0;
    }

    if (n == 1) 
    {
        return 1;
    }

    return fibonacci(n - 1) + fibonacci(n - 2);
}

template<class T>
std::string to_string(std::vector<T>& vec)
{
    stringstream ss;
    ss << "[";

    if (vec.size() > 0) 
    {
        ss << vec[0];
    }

    for (size_t i = 1; i < vec.size(); ++i)
    {
        ss << ", " << vec[i];
    }

    ss << "]";
    return ss.str();
}

int main(int argc, char** argv) {
    std::initializer_list<uint64_t> fibonacci_task_input_list = 
        { 40, 41, 39, 33, 43, 30, 34, 40, 42, 20, 42, 40, 41 };

    CurrentTime ct;

    vector<uint64_t> result_vector1;
    vector<uint64_t> result_vector2;

    uint64_t start_time = ct.milliseconds();

    for (const int i : fibonacci_task_input_list)
    {
        result_vector1.push_back(fibonacci(i));
    }

    uint64_t end_time = ct.milliseconds();

    cout << "Serial processing in " 
         << (end_time - start_time)
         << " milliseconds.\n";

    start_time = ct.milliseconds();

    net::coderodde::concurrent::forp(fibonacci_task_input_list,
                                     fibonacci,
                                     result_vector2);

    end_time = ct.milliseconds();

    cout << "Parallel processing in "
         << (end_time - start_time)
         << " milliseconds.\n";

    cout << "Serial     result: " << to_string(result_vector1) << "\n";
    cout << "Concurrent result: " << to_string(result_vector2) << "\n";

    return 0;
}

queue

If you look at the dequeue() method of the queue, it returns also a boolean value indicating whether the queue is still nonempty after actually removing an element from it. I did this out of fear of the following scenario:

Suppose the queue contains only one element. Suppose also that a thread \$T_1\$ asks whether the queue is nonempy. Next, another thread \$T_2\$ asks whether the queue is empty. Next, say, the thread \$T_1\$ pops the last element. Eventually, \$T_2\$ still thinks that the queue is not empty when, in fact, it is.

Performance figures

On a dual-core CPU I get the following digits:

Serial processing in 20024 milliseconds.
Parallel processing in 10642 milliseconds.
Serial     result: [102334155, 165580141, 63245986, 3524578, 433494437, 832040, 5702887, 102334155, 267914296, 6765, 267914296, 102334155, 165580141]
Concurrent result: [102334155, 165580141, 63245986, 3524578, 433494437, 832040, 5702887, 102334155, 267914296, 6765, 267914296, 102334155, 165580141]

Since I am not proficient in C++, please, tell me anything that comes to mind.

share|improve this question
up vote 3 down vote accepted

Just a few items which caught my eye:

  1. I wouldn't have bothered implementing my own queue. Just use a std::deque or a plain std::vector with an index pointing to the current head element. Saves a bunch of code which you don't have to test and maintain.

  2. You shouldn't use the mutex directly, you should use a std::lock_guard instead to make sure the mutex gets released automatically when the scope is left.

  3. You should reduce the scope of the mutex to a minimum to avoid unnecessary lock contention (in this case probably more of an academic point but still a good habit to get into).

    So the dequeue method could look like this:

    std::tuple<T, size_t, bool> dequeue() 
    {   
        queue_node* item = nullptr;
    
        {
            std::lock_guard<std::mutex> lock(m_mutex);
            if (m_head != nullptr)
            {
                item = m_head;
                m_head = m_head->next;
            }
        }
    
        return std::make_tuple(item ? item->m_element : T(),
                               item ? item->m_element_index : 0,
                               item != nullptr);
    }
    
  4. This:

    output_vector.clear();
    output_vector.reserve(input_list.size());
    for (size_t i = 0; i < input_list.size(); ++i) 
    {
        output_vector.push_back(Out());
    }
    

    Can be replaced with

     output_vector.clear()
     output_vector.resize(input_list.size());
    

    since resize will automatically insert elements for you if the current size is smaller than the requested size.


Update: Actually I just noticed that your queue implementation is leaking memory: nodes get new-ed but never deleted. Which comes back to my first point :)

Also you're copying the In and Out elements around a few times when you probably could just move them but I don't do enough day-to-day modern C++ to provide a correct answer on the spot right now. I'll leave that to someone else.

share|improve this answer
    
Is there anything else, such as use of move semantics/rvalue references? – coderodde yesterday
    
@coderodde: You could probably move a few things around instead of (implicitly) copying them, I just can't provide a good enough answer I'm comfortable with on that point right now. – ChrisWue yesterday

Your Answer

 
discard

By posting your answer, you agree to the privacy policy and terms of service.

Not the answer you're looking for? Browse other questions tagged or ask your own question.