Thread pooling in C++11
Solution 1:
This is copied from my answer to another very similar post:
-
Start with maximum number of threads the system can support:
int num_threads = std::thread::hardware_concurrency();
-
For an efficient threadpool implementation, once threads are created according to
num_threads
, it's better not to create new ones, or destroy old ones (by joining). There will be a performance penalty, an it might even make your application go slower than the serial version.
Each C++11 thread should be running in their function with an infinite loop, constantly waiting for new tasks to grab and run.
Here is how to attach such a function to the thread pool:
int num_threads = std::thread::hardware_concurrency();
std::vector<std::thread> threads;
for (int i = 0; i < num_threads; i++)
{
pool.push_back(std::thread(Infinite_loop_function));
}
- The infinite loop function. This is a
while (true)
loop waiting for the task queue.
void Pool::Infinite_loop_function()
{
while (true)
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
condition.wait(lock, [this](){
return !queue.empty() || terminate_pool;
});
Job = queue.front();
queue.pop();
}
Job(); // function<void()> type
}
};
- Make a function to add job to your queue
void Pool::Add_Job(function<void()> New_Job)
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
queue.push(New_Job);
}
condition.notify_one();
}
- Bind an arbitrary function to your queue
Pool_Obj.Add_Job(std::bind(&Some_Class::Some_Method, &Some_object));
Once you integrate these ingredients, you have your own dynamic threading pool. These threads always run, waiting for job to do.
I apologize if there are some syntax errors, I typed this code and and I have a bad memory. Sorry that I cannot provide you the complete thread pool code; that would violate my job integrity.
Edit: to terminate the pool, call the shutdown()
method:
Pool::shutdown()
{
{
std::unique_lock<std::mutex> lock(threadpool_mutex);
terminate_pool = true; // use this flag in condition.wait
}
condition.notify_all(); // wake up all threads.
// Join all threads.
for (std::thread &th : threads)
{
th.join();
}
pool.clear();
stopped = true; // use this flag in destructor, if not set, call shutdown()
}
Note: the anonymous code blocks are used so that when they are exited, the std::unique_lock
variables created within them go out of scope, unlocking the mutex.
Solution 2:
You can use C++ Thread Pool Library, https://github.com/vit-vit/ctpl.
Then the code your wrote can be replaced with the following
#include <ctpl.h> // or <ctpl_stl.h> if ou do not have Boost library
int main (int argc, char *argv[]) {
ctpl::thread_pool p(2 /* two threads in the pool */);
int arr[4] = {0};
std::vector<std::future<void>> results(4);
for (int i = 0; i < 8; ++i) { // for 8 iterations,
for (int j = 0; j < 4; ++j) {
results[j] = p.push([&arr, j](int){ arr[j] +=2; });
}
for (int j = 0; j < 4; ++j) {
results[j].get();
}
arr[4] = std::min_element(arr, arr + 4);
}
}
You will get the desired number of threads and will not create and delete them over and over again on the iterations.
Solution 3:
A pool of threads means that all your threads are running, all the time – in other words, the thread function never returns. To give the threads something meaningful to do, you have to design a system of inter-thread communication, both for the purpose of telling the thread that there's something to do, as well as for communicating the actual work data.
Typically this will involve some kind of concurrent data structure, and each thread would presumably sleep on some kind of condition variable, which would be notified when there's work to do. Upon receiving the notification, one or several of the threads wake up, recover a task from the concurrent data structure, process it, and store the result in an analogous fashion.
The thread would then go on to check whether there's even more work to do, and if not go back to sleep.
The upshot is that you have to design all this yourself, since there isn't a natural notion of "work" that's universally applicable. It's quite a bit of work, and there are some subtle issues you have to get right. (You can program in Go if you like a system which takes care of thread management for you behind the scenes.)
Solution 4:
A threadpool is at core a set of threads all bound to a function working as an event loop. These threads will endlessly wait for a task to be executed, or their own termination.
The threadpool job is to provide an interface to submit jobs, define (and perhaps modify) the policy of running these jobs (scheduling rules, thread instantiation, size of the pool), and monitor the status of the threads and related resources.
So for a versatile pool, one must start by defining what a task is, how it is launched, interrupted, what is the result (see the notion of promise and future for that question), what sort of events the threads will have to respond to, how they will handle them, how these events shall be discriminated from the ones handled by the tasks. This can become quite complicated as you can see, and impose restrictions on how the threads will work, as the solution becomes more and more involved.
The current tooling for handling events is fairly barebones(*): primitives like mutexes, condition variables, and a few abstractions on top of that (locks, barriers). But in some cases, these abstrations may turn out to be unfit (see this related question), and one must revert to using the primitives.
Other problems have to be managed too:
- signal
- i/o
- hardware (processor affinity, heterogenous setup)
How would these play out in your setting?
This answer to a similar question points to an existing implementation meant for boost and the stl.
I offered a very crude implementation of a threadpool for another question, which doesn't address many problems outlined above. You might want to build up on it. You might also want to have a look of existing frameworks in other languages, to find inspiration.
(*) I don't see that as a problem, quite to the contrary. I think it's the very spirit of C++ inherited from C.
Solution 5:
Follwoing [PhD EcE](https://stackoverflow.com/users/3818417/phd-ece) suggestion, I implemented the thread pool:
function_pool.h
#pragma once
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <cassert>
class Function_pool
{
private:
std::queue<std::function<void()>> m_function_queue;
std::mutex m_lock;
std::condition_variable m_data_condition;
std::atomic<bool> m_accept_functions;
public:
Function_pool();
~Function_pool();
void push(std::function<void()> func);
void done();
void infinite_loop_func();
};
function_pool.cpp
#include "function_pool.h"
Function_pool::Function_pool() : m_function_queue(), m_lock(), m_data_condition(), m_accept_functions(true)
{
}
Function_pool::~Function_pool()
{
}
void Function_pool::push(std::function<void()> func)
{
std::unique_lock<std::mutex> lock(m_lock);
m_function_queue.push(func);
// when we send the notification immediately, the consumer will try to get the lock , so unlock asap
lock.unlock();
m_data_condition.notify_one();
}
void Function_pool::done()
{
std::unique_lock<std::mutex> lock(m_lock);
m_accept_functions = false;
lock.unlock();
// when we send the notification immediately, the consumer will try to get the lock , so unlock asap
m_data_condition.notify_all();
//notify all waiting threads.
}
void Function_pool::infinite_loop_func()
{
std::function<void()> func;
while (true)
{
{
std::unique_lock<std::mutex> lock(m_lock);
m_data_condition.wait(lock, [this]() {return !m_function_queue.empty() || !m_accept_functions; });
if (!m_accept_functions && m_function_queue.empty())
{
//lock will be release automatically.
//finish the thread loop and let it join in the main thread.
return;
}
func = m_function_queue.front();
m_function_queue.pop();
//release the lock
}
func();
}
}
main.cpp
#include "function_pool.h"
#include <string>
#include <iostream>
#include <mutex>
#include <functional>
#include <thread>
#include <vector>
Function_pool func_pool;
class quit_worker_exception : public std::exception {};
void example_function()
{
std::cout << "bla" << std::endl;
}
int main()
{
std::cout << "stating operation" << std::endl;
int num_threads = std::thread::hardware_concurrency();
std::cout << "number of threads = " << num_threads << std::endl;
std::vector<std::thread> thread_pool;
for (int i = 0; i < num_threads; i++)
{
thread_pool.push_back(std::thread(&Function_pool::infinite_loop_func, &func_pool));
}
//here we should send our functions
for (int i = 0; i < 50; i++)
{
func_pool.push(example_function);
}
func_pool.done();
for (unsigned int i = 0; i < thread_pool.size(); i++)
{
thread_pool.at(i).join();
}
}