(See the next iteration.)
I have this easy to use facility that maps input elements to output elements concurrently by the means of a thread pool:
concurrent.h:
#ifndef FORP_H
#define FORP_H
#include <functional>
#include <initializer_list>
#include <iostream>
#include <thread>
#include <vector>
namespace net {
namespace coderodde {
namespace concurrent {
////////////////////////////////////////////////////////////////////
// This is an adhoc concurrent queue used by forp. //
////////////////////////////////////////////////////////////////////
template<class T>
class queue
{
private:
struct queue_node
{
T m_element;
size_t m_element_index;
queue_node* m_next;
queue_node(const T& element, const size_t index) :
m_element{element},
m_element_index{index},
m_next{nullptr}
{
}
};
std::mutex m_mutex;
queue_node* m_head;
queue_node* m_tail;
public:
queue(std::initializer_list<T> list)
{
m_head = nullptr;
size_t index = 0;
for (const auto& element : list)
{
queue_node* new_node = new queue_node(element,
index++);
if (m_head == nullptr)
{
m_head = new_node;
m_tail = new_node;
}
else
{
m_tail->m_next = new_node;
m_tail = new_node;
}
}
}
std::tuple<T, size_t, bool> dequeue()
{
std::tuple<T, size_t, bool> ret;
m_mutex.lock();
if (m_head == nullptr)
{
// The queue is empty.
ret = std::make_tuple(T(), 0, false);
}
else
{
ret = std::make_tuple(m_head->m_element,
m_head->m_element_index,
true);
m_head = m_head->m_next;
}
m_mutex.unlock();
return ret;
}
};
template<class In, class Out>
void thread_do(net::coderodde::concurrent::queue<In>& input_queue,
Out (*process)(In in),
std::vector<Out>& output_vector)
{
while (true)
{
std::tuple<In, size_t, bool> data = input_queue.dequeue();
if (std::get<2>(data) == false)
{
return;
}
const In input_element = std::get<0>(data);
const size_t input_element_index = std::get<1>(data);
Out output_element = process(input_element);
output_vector[input_element_index] = output_element;
}
}
////////////////////////////////////////////////////////////////////
// This function template implements a concurrent, thread-pool-//
// based iteration construct. //
////////////////////////////////////////////////////////////////////
template<class In, class Out>
void forp(std::initializer_list<In>& input_list,
Out (*process)(In in),
std::vector<Out>& output_vector)
{
unsigned thread_count = std::thread::hardware_concurrency();
std::vector<std::thread> thread_vector;
thread_vector.reserve(thread_count);
net::coderodde::concurrent::queue<In> input_queue(input_list);
output_vector.clear();
output_vector.reserve(input_list.size());
for (size_t i = 0; i < input_list.size(); ++i)
{
output_vector.push_back(Out());
}
for (unsigned i = 0; i < thread_count; ++i)
{
thread_vector.push_back(
std::thread(&thread_do<In, Out>,
std::ref(input_queue),
std::ref(process),
std::ref(output_vector)));
}
for (std::thread& thread : thread_vector)
{
thread.join();
}
}
} /* namespace concurrent */
} /* namespace coderodde */
} /* namespace net */
#endif /* FORP_H */
main.cpp:
#include "concurrent.h"
#include <chrono>
#include <cstdint>
#include <initializer_list>
#include <iostream>
#include <sstream>
#include <vector>
class CurrentTime {
std::chrono::high_resolution_clock m_clock;
public:
uint64_t milliseconds()
{
return std::chrono
::duration_cast<std::chrono
::milliseconds>
(m_clock.now().time_since_epoch()).count();
}
};
using net::coderodde::concurrent::forp;
using std::initializer_list;
using std::vector;
using std::cout;
using std::stringstream;
static uint64_t fibonacci(uint64_t n)
{
if (n <= 0)
{
return 0;
}
if (n == 1)
{
return 1;
}
return fibonacci(n - 1) + fibonacci(n - 2);
}
template<class T>
std::string to_string(std::vector<T>& vec)
{
stringstream ss;
ss << "[";
if (vec.size() > 0)
{
ss << vec[0];
}
for (size_t i = 1; i < vec.size(); ++i)
{
ss << ", " << vec[i];
}
ss << "]";
return ss.str();
}
int main(int argc, char** argv) {
std::initializer_list<uint64_t> fibonacci_task_input_list =
{ 40, 41, 39, 33, 43, 30, 34, 40, 42, 20, 42, 40, 41 };
CurrentTime ct;
vector<uint64_t> result_vector1;
vector<uint64_t> result_vector2;
uint64_t start_time = ct.milliseconds();
for (const int i : fibonacci_task_input_list)
{
result_vector1.push_back(fibonacci(i));
}
uint64_t end_time = ct.milliseconds();
cout << "Serial processing in "
<< (end_time - start_time)
<< " milliseconds.\n";
start_time = ct.milliseconds();
net::coderodde::concurrent::forp(fibonacci_task_input_list,
fibonacci,
result_vector2);
end_time = ct.milliseconds();
cout << "Parallel processing in "
<< (end_time - start_time)
<< " milliseconds.\n";
cout << "Serial result: " << to_string(result_vector1) << "\n";
cout << "Concurrent result: " << to_string(result_vector2) << "\n";
return 0;
}
queue
If you look at the dequeue()
method of the queue
, it returns also a boolean value indicating whether the queue is still nonempty after actually removing an element from it. I did this out of fear of the following scenario:
Suppose the queue contains only one element. Suppose also that a thread \$T_1\$ asks whether the queue is nonempy. Next, another thread \$T_2\$ asks whether the queue is empty. Next, say, the thread \$T_1\$ pops the last element. Eventually, \$T_2\$ still thinks that the queue is not empty when, in fact, it is.
Performance figures
On a dual-core CPU I get the following digits:
Serial processing in 20024 milliseconds. Parallel processing in 10642 milliseconds. Serial result: [102334155, 165580141, 63245986, 3524578, 433494437, 832040, 5702887, 102334155, 267914296, 6765, 267914296, 102334155, 165580141] Concurrent result: [102334155, 165580141, 63245986, 3524578, 433494437, 832040, 5702887, 102334155, 267914296, 6765, 267914296, 102334155, 165580141]
Since I am not proficient in C++, please, tell me anything that comes to mind.