Path: blob/master/cpp/linux/scheduler/ctpl.h
644 views
1/*********************************************************2*3* Copyright (C) 2014 by Vitaliy Vitsentiy4*5* Licensed under the Apache License, Version 2.0 (the "License");6* you may not use this file except in compliance with the License.7* You may obtain a copy of the License at8*9* http://www.apache.org/licenses/LICENSE-2.010*11* Unless required by applicable law or agreed to in writing, software12* distributed under the License is distributed on an "AS IS" BASIS,13* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14* See the License for the specific language governing permissions and15* limitations under the License.16*17*********************************************************/181920#ifndef __ctpl_thread_pool_H__21#define __ctpl_thread_pool_H__2223#include <functional>24#include <thread>25#include <atomic>26#include <vector>27#include <memory>28#include <exception>29#include <future>30#include <mutex>31#include <boost/lockfree/queue.hpp>323334#ifndef _ctplThreadPoolLength_35#define _ctplThreadPoolLength_ 10036#endif373839// thread pool to run user's functors with signature40// ret func(int id, other_params)41// where id is the index of the thread that runs the functor42// ret is some return type434445namespace ctpl {4647class thread_pool {4849public:5051thread_pool() : q(_ctplThreadPoolLength_) { this->init(); }52thread_pool(int nThreads, int queueSize = _ctplThreadPoolLength_) : q(queueSize) { this->init(); this->resize(nThreads); }5354// the destructor waits for all the functions in the queue to be finished55~thread_pool() {56this->stop(true);57}5859// get the number of running threads in the pool60int size() { return static_cast<int>(this->threads.size()); }6162// number of idle threads63int n_idle() { return this->nWaiting; }64std::thread & get_thread(int i) { return *this->threads[i]; }6566// change the number of threads in the pool67// should be called from one thread, otherwise be careful to not interleave, also with this->stop()68// nThreads must be >= 069void resize(int nThreads) {70if (!this->isStop && !this->isDone) {71int oldNThreads = static_cast<int>(this->threads.size());72if (oldNThreads <= nThreads) { // if the number of threads is increased73this->threads.resize(nThreads);74this->flags.resize(nThreads);7576for (int i = oldNThreads; i < nThreads; ++i) {77this->flags[i] = std::make_shared<std::atomic<bool>>(false);78this->set_thread(i);79}80}81else { // the number of threads is decreased82for (int i = oldNThreads - 1; i >= nThreads; --i) {83*this->flags[i] = true; // this thread will finish84this->threads[i]->detach();85}86{87// stop the detached threads that were waiting88std::unique_lock<std::mutex> lock(this->mutex);89this->cv.notify_all();90}91this->threads.resize(nThreads); // safe to delete because the threads are detached92this->flags.resize(nThreads); // safe to delete because the threads have copies of shared_ptr of the flags, not originals93}94}95}9697// empty the queue98void clear_queue() {99std::function<void(int id)> * _f;100while (this->q.pop(_f))101delete _f; // empty the queue102}103104// pops a functional wraper to the original function105std::function<void(int)> pop() {106std::function<void(int id)> * _f = nullptr;107this->q.pop(_f);108std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred109110std::function<void(int)> f;111if (_f)112f = *_f;113return f;114}115116117// wait for all computing threads to finish and stop all threads118// may be called asyncronously to not pause the calling thread while waiting119// if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions120void stop(bool isWait = false) {121if (!isWait) {122if (this->isStop)123return;124this->isStop = true;125for (int i = 0, n = this->size(); i < n; ++i) {126*this->flags[i] = true; // command the threads to stop127}128this->clear_queue(); // empty the queue129}130else {131if (this->isDone || this->isStop)132return;133this->isDone = true; // give the waiting threads a command to finish134}135{136std::unique_lock<std::mutex> lock(this->mutex);137this->cv.notify_all(); // stop all waiting threads138}139for (int i = 0; i < static_cast<int>(this->threads.size()); ++i) { // wait for the computing threads to finish140if (this->threads[i]->joinable())141this->threads[i]->join();142}143// if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads144// therefore delete them here145this->clear_queue();146this->threads.clear();147this->flags.clear();148}149150template<typename F, typename... Rest>151auto push(F && f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> {152auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(153std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)154);155156auto _f = new std::function<void(int id)>([pck](int id) {157(*pck)(id);158});159this->q.push(_f);160161std::unique_lock<std::mutex> lock(this->mutex);162this->cv.notify_one();163164return pck->get_future();165}166167// run the user's function that excepts argument int - id of the running thread. returned value is templatized168// operator returns std::future, where the user can get the result and rethrow the catched exceptins169template<typename F>170auto push(F && f) ->std::future<decltype(f(0))> {171auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f));172173auto _f = new std::function<void(int id)>([pck](int id) {174(*pck)(id);175});176this->q.push(_f);177178std::unique_lock<std::mutex> lock(this->mutex);179this->cv.notify_one();180181return pck->get_future();182}183184185private:186187// deleted188thread_pool(const thread_pool &);// = delete;189thread_pool(thread_pool &&);// = delete;190thread_pool & operator=(const thread_pool &);// = delete;191thread_pool & operator=(thread_pool &&);// = delete;192193void set_thread(int i) {194std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); // a copy of the shared ptr to the flag195auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() {196std::atomic<bool> & _flag = *flag;197std::function<void(int id)> * _f;198bool isPop = this->q.pop(_f);199while (true) {200while (isPop) { // if there is anything in the queue201std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred202(*_f)(i);203204if (_flag)205return; // the thread is wanted to stop, return even if the queue is not empty yet206else207isPop = this->q.pop(_f);208}209210// the queue is empty here, wait for the next command211std::unique_lock<std::mutex> lock(this->mutex);212++this->nWaiting;213this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; });214--this->nWaiting;215216if (!isPop)217return; // if the queue is empty and this->isDone == true or *flag then return218}219};220this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique()221}222223void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; }224225std::vector<std::unique_ptr<std::thread>> threads;226std::vector<std::shared_ptr<std::atomic<bool>>> flags;227mutable boost::lockfree::queue<std::function<void(int id)> *> q;228std::atomic<bool> isDone;229std::atomic<bool> isStop;230std::atomic<int> nWaiting; // how many threads are waiting231232std::mutex mutex;233std::condition_variable cv;234};235236}237238#endif // __ctpl_thread_pool_H__239240241242