Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
parkpow
GitHub Repository: parkpow/deep-license-plate-recognition
Path: blob/master/cpp/linux/scheduler/ctpl.h
644 views
1
2
/*********************************************************
3
*
4
* Copyright (C) 2014 by Vitaliy Vitsentiy
5
*
6
* Licensed under the Apache License, Version 2.0 (the "License");
7
* you may not use this file except in compliance with the License.
8
* You may obtain a copy of the License at
9
*
10
* http://www.apache.org/licenses/LICENSE-2.0
11
*
12
* Unless required by applicable law or agreed to in writing, software
13
* distributed under the License is distributed on an "AS IS" BASIS,
14
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
* See the License for the specific language governing permissions and
16
* limitations under the License.
17
*
18
*********************************************************/
19
20
21
#ifndef __ctpl_thread_pool_H__
22
#define __ctpl_thread_pool_H__
23
24
#include <functional>
25
#include <thread>
26
#include <atomic>
27
#include <vector>
28
#include <memory>
29
#include <exception>
30
#include <future>
31
#include <mutex>
32
#include <boost/lockfree/queue.hpp>
33
34
35
#ifndef _ctplThreadPoolLength_
36
#define _ctplThreadPoolLength_ 100
37
#endif
38
39
40
// thread pool to run user's functors with signature
41
// ret func(int id, other_params)
42
// where id is the index of the thread that runs the functor
43
// ret is some return type
44
45
46
namespace ctpl {
47
48
class thread_pool {
49
50
public:
51
52
thread_pool() : q(_ctplThreadPoolLength_) { this->init(); }
53
thread_pool(int nThreads, int queueSize = _ctplThreadPoolLength_) : q(queueSize) { this->init(); this->resize(nThreads); }
54
55
// the destructor waits for all the functions in the queue to be finished
56
~thread_pool() {
57
this->stop(true);
58
}
59
60
// get the number of running threads in the pool
61
int size() { return static_cast<int>(this->threads.size()); }
62
63
// number of idle threads
64
int n_idle() { return this->nWaiting; }
65
std::thread & get_thread(int i) { return *this->threads[i]; }
66
67
// change the number of threads in the pool
68
// should be called from one thread, otherwise be careful to not interleave, also with this->stop()
69
// nThreads must be >= 0
70
void resize(int nThreads) {
71
if (!this->isStop && !this->isDone) {
72
int oldNThreads = static_cast<int>(this->threads.size());
73
if (oldNThreads <= nThreads) { // if the number of threads is increased
74
this->threads.resize(nThreads);
75
this->flags.resize(nThreads);
76
77
for (int i = oldNThreads; i < nThreads; ++i) {
78
this->flags[i] = std::make_shared<std::atomic<bool>>(false);
79
this->set_thread(i);
80
}
81
}
82
else { // the number of threads is decreased
83
for (int i = oldNThreads - 1; i >= nThreads; --i) {
84
*this->flags[i] = true; // this thread will finish
85
this->threads[i]->detach();
86
}
87
{
88
// stop the detached threads that were waiting
89
std::unique_lock<std::mutex> lock(this->mutex);
90
this->cv.notify_all();
91
}
92
this->threads.resize(nThreads); // safe to delete because the threads are detached
93
this->flags.resize(nThreads); // safe to delete because the threads have copies of shared_ptr of the flags, not originals
94
}
95
}
96
}
97
98
// empty the queue
99
void clear_queue() {
100
std::function<void(int id)> * _f;
101
while (this->q.pop(_f))
102
delete _f; // empty the queue
103
}
104
105
// pops a functional wraper to the original function
106
std::function<void(int)> pop() {
107
std::function<void(int id)> * _f = nullptr;
108
this->q.pop(_f);
109
std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
110
111
std::function<void(int)> f;
112
if (_f)
113
f = *_f;
114
return f;
115
}
116
117
118
// wait for all computing threads to finish and stop all threads
119
// may be called asyncronously to not pause the calling thread while waiting
120
// if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions
121
void stop(bool isWait = false) {
122
if (!isWait) {
123
if (this->isStop)
124
return;
125
this->isStop = true;
126
for (int i = 0, n = this->size(); i < n; ++i) {
127
*this->flags[i] = true; // command the threads to stop
128
}
129
this->clear_queue(); // empty the queue
130
}
131
else {
132
if (this->isDone || this->isStop)
133
return;
134
this->isDone = true; // give the waiting threads a command to finish
135
}
136
{
137
std::unique_lock<std::mutex> lock(this->mutex);
138
this->cv.notify_all(); // stop all waiting threads
139
}
140
for (int i = 0; i < static_cast<int>(this->threads.size()); ++i) { // wait for the computing threads to finish
141
if (this->threads[i]->joinable())
142
this->threads[i]->join();
143
}
144
// if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads
145
// therefore delete them here
146
this->clear_queue();
147
this->threads.clear();
148
this->flags.clear();
149
}
150
151
template<typename F, typename... Rest>
152
auto push(F && f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> {
153
auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
154
std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
155
);
156
157
auto _f = new std::function<void(int id)>([pck](int id) {
158
(*pck)(id);
159
});
160
this->q.push(_f);
161
162
std::unique_lock<std::mutex> lock(this->mutex);
163
this->cv.notify_one();
164
165
return pck->get_future();
166
}
167
168
// run the user's function that excepts argument int - id of the running thread. returned value is templatized
169
// operator returns std::future, where the user can get the result and rethrow the catched exceptins
170
template<typename F>
171
auto push(F && f) ->std::future<decltype(f(0))> {
172
auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f));
173
174
auto _f = new std::function<void(int id)>([pck](int id) {
175
(*pck)(id);
176
});
177
this->q.push(_f);
178
179
std::unique_lock<std::mutex> lock(this->mutex);
180
this->cv.notify_one();
181
182
return pck->get_future();
183
}
184
185
186
private:
187
188
// deleted
189
thread_pool(const thread_pool &);// = delete;
190
thread_pool(thread_pool &&);// = delete;
191
thread_pool & operator=(const thread_pool &);// = delete;
192
thread_pool & operator=(thread_pool &&);// = delete;
193
194
void set_thread(int i) {
195
std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); // a copy of the shared ptr to the flag
196
auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() {
197
std::atomic<bool> & _flag = *flag;
198
std::function<void(int id)> * _f;
199
bool isPop = this->q.pop(_f);
200
while (true) {
201
while (isPop) { // if there is anything in the queue
202
std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
203
(*_f)(i);
204
205
if (_flag)
206
return; // the thread is wanted to stop, return even if the queue is not empty yet
207
else
208
isPop = this->q.pop(_f);
209
}
210
211
// the queue is empty here, wait for the next command
212
std::unique_lock<std::mutex> lock(this->mutex);
213
++this->nWaiting;
214
this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; });
215
--this->nWaiting;
216
217
if (!isPop)
218
return; // if the queue is empty and this->isDone == true or *flag then return
219
}
220
};
221
this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique()
222
}
223
224
void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; }
225
226
std::vector<std::unique_ptr<std::thread>> threads;
227
std::vector<std::shared_ptr<std::atomic<bool>>> flags;
228
mutable boost::lockfree::queue<std::function<void(int id)> *> q;
229
std::atomic<bool> isDone;
230
std::atomic<bool> isStop;
231
std::atomic<int> nWaiting; // how many threads are waiting
232
233
std::mutex mutex;
234
std::condition_variable cv;
235
};
236
237
}
238
239
#endif // __ctpl_thread_pool_H__
240
241
242