Path: blob/master/cpp/linux/scheduler/ctpl_stl.h
644 views
/*********************************************************1*2* Copyright (C) 2014 by Vitaliy Vitsentiy3*4* Licensed under the Apache License, Version 2.0 (the "License");5* you may not use this file except in compliance with the License.6* You may obtain a copy of the License at7*8* http://www.apache.org/licenses/LICENSE-2.09*10* Unless required by applicable law or agreed to in writing, software11* distributed under the License is distributed on an "AS IS" BASIS,12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13* See the License for the specific language governing permissions and14* limitations under the License.15*16*********************************************************/171819#ifndef __ctpl_stl_thread_pool_H__20#define __ctpl_stl_thread_pool_H__2122#include <functional>23#include <thread>24#include <atomic>25#include <vector>26#include <memory>27#include <exception>28#include <future>29#include <mutex>30#include <queue>31323334// thread pool to run user's functors with signature35// ret func(int id, other_params)36// where id is the index of the thread that runs the functor37// ret is some return type383940namespace ctpl {4142namespace detail {43template <typename T>44class Queue {45public:46bool push(T const & value) {47std::unique_lock<std::mutex> lock(this->mutex);48this->q.push(value);49return true;50}51// deletes the retrieved element, do not use for non integral types52bool pop(T & v) {53std::unique_lock<std::mutex> lock(this->mutex);54if (this->q.empty())55return false;56v = this->q.front();57this->q.pop();58return true;59}60bool empty() {61std::unique_lock<std::mutex> lock(this->mutex);62return this->q.empty();63}64private:65std::queue<T> q;66std::mutex mutex;67};68}6970class thread_pool {7172public:7374thread_pool() { this->init(); }75thread_pool(int nThreads) { this->init(); this->resize(nThreads); }7677// the destructor waits for all the functions in the queue to be finished78~thread_pool() {79this->stop(true);80}8182// get the number of running threads in the pool83int size() { return static_cast<int>(this->threads.size()); }8485// number of idle threads86int n_idle() { return this->nWaiting; }87std::thread & get_thread(int i) { return *this->threads[i]; }8889// change the number of threads in the pool90// should be called from one thread, otherwise be careful to not interleave, also with this->stop()91// nThreads must be >= 092void resize(int nThreads) {93if (!this->isStop && !this->isDone) {94int oldNThreads = static_cast<int>(this->threads.size());95if (oldNThreads <= nThreads) { // if the number of threads is increased96this->threads.resize(nThreads);97this->flags.resize(nThreads);9899for (int i = oldNThreads; i < nThreads; ++i) {100this->flags[i] = std::make_shared<std::atomic<bool>>(false);101this->set_thread(i);102}103}104else { // the number of threads is decreased105for (int i = oldNThreads - 1; i >= nThreads; --i) {106*this->flags[i] = true; // this thread will finish107this->threads[i]->detach();108}109{110// stop the detached threads that were waiting111std::unique_lock<std::mutex> lock(this->mutex);112this->cv.notify_all();113}114this->threads.resize(nThreads); // safe to delete because the threads are detached115this->flags.resize(nThreads); // safe to delete because the threads have copies of shared_ptr of the flags, not originals116}117}118}119120// empty the queue121void clear_queue() {122std::function<void(int id)> * _f;123while (this->q.pop(_f))124delete _f; // empty the queue125}126127// pops a functional wrapper to the original function128std::function<void(int)> pop() {129std::function<void(int id)> * _f = nullptr;130this->q.pop(_f);131std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred132std::function<void(int)> f;133if (_f)134f = *_f;135return f;136}137138// wait for all computing threads to finish and stop all threads139// may be called asynchronously to not pause the calling thread while waiting140// if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions141void stop(bool isWait = false) {142if (!isWait) {143if (this->isStop)144return;145this->isStop = true;146for (int i = 0, n = this->size(); i < n; ++i) {147*this->flags[i] = true; // command the threads to stop148}149this->clear_queue(); // empty the queue150}151else {152if (this->isDone || this->isStop)153return;154this->isDone = true; // give the waiting threads a command to finish155}156{157std::unique_lock<std::mutex> lock(this->mutex);158this->cv.notify_all(); // stop all waiting threads159}160for (int i = 0; i < static_cast<int>(this->threads.size()); ++i) { // wait for the computing threads to finish161if (this->threads[i]->joinable())162this->threads[i]->join();163}164// if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads165// therefore delete them here166this->clear_queue();167this->threads.clear();168this->flags.clear();169}170171template<typename F, typename... Rest>172auto push(F && f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> {173auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(174std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)175);176auto _f = new std::function<void(int id)>([pck](int id) {177(*pck)(id);178});179this->q.push(_f);180std::unique_lock<std::mutex> lock(this->mutex);181this->cv.notify_one();182return pck->get_future();183}184185// run the user's function that excepts argument int - id of the running thread. returned value is templatized186// operator returns std::future, where the user can get the result and rethrow the catched exceptins187template<typename F>188auto push(F && f) ->std::future<decltype(f(0))> {189auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f));190auto _f = new std::function<void(int id)>([pck](int id) {191(*pck)(id);192});193this->q.push(_f);194std::unique_lock<std::mutex> lock(this->mutex);195this->cv.notify_one();196return pck->get_future();197}198199200private:201202// deleted203thread_pool(const thread_pool &);// = delete;204thread_pool(thread_pool &&);// = delete;205thread_pool & operator=(const thread_pool &);// = delete;206thread_pool & operator=(thread_pool &&);// = delete;207208void set_thread(int i) {209std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); // a copy of the shared ptr to the flag210auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() {211std::atomic<bool> & _flag = *flag;212std::function<void(int id)> * _f;213bool isPop = this->q.pop(_f);214while (true) {215while (isPop) { // if there is anything in the queue216std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred217(*_f)(i);218if (_flag)219return; // the thread is wanted to stop, return even if the queue is not empty yet220else221isPop = this->q.pop(_f);222}223// the queue is empty here, wait for the next command224std::unique_lock<std::mutex> lock(this->mutex);225++this->nWaiting;226this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; });227--this->nWaiting;228if (!isPop)229return; // if the queue is empty and this->isDone == true or *flag then return230}231};232this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique()233}234235void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; }236237std::vector<std::unique_ptr<std::thread>> threads;238std::vector<std::shared_ptr<std::atomic<bool>>> flags;239detail::Queue<std::function<void(int id)> *> q;240std::atomic<bool> isDone;241std::atomic<bool> isStop;242std::atomic<int> nWaiting; // how many threads are waiting243244std::mutex mutex;245std::condition_variable cv;246};247248}249250#endif // __ctpl_stl_thread_pool_H__251252253