Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Tetragramm
GitHub Repository: Tetragramm/opencv
Path: blob/master/3rdparty/openexr/IlmThread/IlmThreadPool.cpp
16337 views
1
///////////////////////////////////////////////////////////////////////////
2
//
3
// Copyright (c) 2005, Industrial Light & Magic, a division of Lucas
4
// Digital Ltd. LLC
5
//
6
// All rights reserved.
7
//
8
// Redistribution and use in source and binary forms, with or without
9
// modification, are permitted provided that the following conditions are
10
// met:
11
// * Redistributions of source code must retain the above copyright
12
// notice, this list of conditions and the following disclaimer.
13
// * Redistributions in binary form must reproduce the above
14
// copyright notice, this list of conditions and the following disclaimer
15
// in the documentation and/or other materials provided with the
16
// distribution.
17
// * Neither the name of Industrial Light & Magic nor the names of
18
// its contributors may be used to endorse or promote products derived
19
// from this software without specific prior written permission.
20
//
21
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32
//
33
///////////////////////////////////////////////////////////////////////////
34
35
//-----------------------------------------------------------------------------
36
//
37
// class Task, class ThreadPool, class TaskGroup
38
//
39
//-----------------------------------------------------------------------------
40
41
#include "IlmThread.h"
42
#include "IlmThreadMutex.h"
43
#include "IlmThreadSemaphore.h"
44
#include "IlmThreadPool.h"
45
#include "Iex.h"
46
#include <list>
47
48
using namespace std;
49
50
namespace IlmThread {
51
namespace {
52
53
class WorkerThread: public Thread
54
{
55
public:
56
57
WorkerThread (ThreadPool::Data* data);
58
59
virtual void run ();
60
61
private:
62
63
ThreadPool::Data * _data;
64
};
65
66
} //namespace
67
68
69
struct TaskGroup::Data
70
{
71
Data ();
72
~Data ();
73
74
void addTask () ;
75
void removeTask ();
76
77
Semaphore isEmpty; // used to signal that the taskgroup is empty
78
int numPending; // number of pending tasks to still execute
79
};
80
81
82
struct ThreadPool::Data
83
{
84
Data ();
85
~Data();
86
87
void finish ();
88
bool stopped () const;
89
void stop ();
90
91
Semaphore taskSemaphore; // threads wait on this for ready tasks
92
Mutex taskMutex; // mutual exclusion for the tasks list
93
list<Task*> tasks; // the list of tasks to execute
94
size_t numTasks; // fast access to list size
95
// (list::size() can be O(n))
96
97
Semaphore threadSemaphore; // signaled when a thread starts executing
98
Mutex threadMutex; // mutual exclusion for threads list
99
list<WorkerThread*> threads; // the list of all threads
100
size_t numThreads; // fast access to list size
101
102
bool stopping; // flag indicating whether to stop threads
103
Mutex stopMutex; // mutual exclusion for stopping flag
104
};
105
106
107
108
//
109
// class WorkerThread
110
//
111
112
WorkerThread::WorkerThread (ThreadPool::Data* data):
113
_data (data)
114
{
115
start();
116
}
117
118
119
void
120
WorkerThread::run ()
121
{
122
//
123
// Signal that the thread has started executing
124
//
125
126
_data->threadSemaphore.post();
127
128
while (true)
129
{
130
//
131
// Wait for a task to become available
132
//
133
134
_data->taskSemaphore.wait();
135
136
{
137
Lock taskLock (_data->taskMutex);
138
139
//
140
// If there is a task pending, pop off the next task in the FIFO
141
//
142
143
if (_data->numTasks > 0)
144
{
145
Task* task = _data->tasks.front();
146
TaskGroup* taskGroup = task->group();
147
_data->tasks.pop_front();
148
_data->numTasks--;
149
150
taskLock.release();
151
task->execute();
152
taskLock.acquire();
153
154
delete task;
155
taskGroup->_data->removeTask();
156
}
157
else if (_data->stopped())
158
{
159
break;
160
}
161
}
162
}
163
}
164
165
166
//
167
// struct TaskGroup::Data
168
//
169
170
TaskGroup::Data::Data (): isEmpty (1), numPending (0)
171
{
172
// empty
173
}
174
175
176
TaskGroup::Data::~Data ()
177
{
178
//
179
// A TaskGroup acts like an "inverted" semaphore: if the count
180
// is above 0 then waiting on the taskgroup will block. This
181
// destructor waits until the taskgroup is empty before returning.
182
//
183
184
isEmpty.wait ();
185
}
186
187
188
void
189
TaskGroup::Data::addTask ()
190
{
191
//
192
// Any access to the taskgroup is protected by a mutex that is
193
// held by the threadpool. Therefore it is safe to access
194
// numPending before we wait on the semaphore.
195
//
196
197
if (numPending++ == 0)
198
isEmpty.wait ();
199
}
200
201
202
void
203
TaskGroup::Data::removeTask ()
204
{
205
if (--numPending == 0)
206
isEmpty.post ();
207
}
208
209
210
//
211
// struct ThreadPool::Data
212
//
213
214
ThreadPool::Data::Data (): numTasks (0), numThreads (0), stopping (false)
215
{
216
// empty
217
}
218
219
220
ThreadPool::Data::~Data()
221
{
222
Lock lock (threadMutex);
223
finish ();
224
}
225
226
227
void
228
ThreadPool::Data::finish ()
229
{
230
stop();
231
232
//
233
// Signal enough times to allow all threads to stop.
234
//
235
// Wait until all threads have started their run functions.
236
// If we do not wait before we destroy the threads then it's
237
// possible that the threads have not yet called their run
238
// functions.
239
// If this happens then the run function will be called off
240
// of an invalid object and we will crash, most likely with
241
// an error like: "pure virtual method called"
242
//
243
244
for (size_t i = 0; i < numThreads; i++)
245
{
246
taskSemaphore.post();
247
threadSemaphore.wait();
248
}
249
250
//
251
// Join all the threads
252
//
253
254
for (list<WorkerThread*>::iterator i = threads.begin();
255
i != threads.end();
256
++i)
257
{
258
delete (*i);
259
}
260
261
Lock lock1 (taskMutex);
262
Lock lock2 (stopMutex);
263
threads.clear();
264
tasks.clear();
265
numThreads = 0;
266
numTasks = 0;
267
stopping = false;
268
}
269
270
271
bool
272
ThreadPool::Data::stopped () const
273
{
274
Lock lock (stopMutex);
275
return stopping;
276
}
277
278
279
void
280
ThreadPool::Data::stop ()
281
{
282
Lock lock (stopMutex);
283
stopping = true;
284
}
285
286
287
//
288
// class Task
289
//
290
291
Task::Task (TaskGroup* g): _group(g)
292
{
293
// empty
294
}
295
296
297
Task::~Task()
298
{
299
// empty
300
}
301
302
303
TaskGroup*
304
Task::group ()
305
{
306
return _group;
307
}
308
309
310
TaskGroup::TaskGroup ():
311
_data (new Data())
312
{
313
// empty
314
}
315
316
317
TaskGroup::~TaskGroup ()
318
{
319
delete _data;
320
}
321
322
323
//
324
// class ThreadPool
325
//
326
327
ThreadPool::ThreadPool (unsigned nthreads):
328
_data (new Data())
329
{
330
setNumThreads (nthreads);
331
}
332
333
334
ThreadPool::~ThreadPool ()
335
{
336
delete _data;
337
}
338
339
340
int
341
ThreadPool::numThreads () const
342
{
343
Lock lock (_data->threadMutex);
344
return _data->numThreads;
345
}
346
347
348
void
349
ThreadPool::setNumThreads (int count)
350
{
351
if (count < 0)
352
throw Iex::ArgExc ("Attempt to set the number of threads "
353
"in a thread pool to a negative value.");
354
355
//
356
// Lock access to thread list and size
357
//
358
359
Lock lock (_data->threadMutex);
360
361
if ((size_t)count > _data->numThreads)
362
{
363
//
364
// Add more threads
365
//
366
367
while (_data->numThreads < (size_t)count)
368
{
369
_data->threads.push_back (new WorkerThread (_data));
370
_data->numThreads++;
371
}
372
}
373
else if ((size_t)count < _data->numThreads)
374
{
375
//
376
// Wait until all existing threads are finished processing,
377
// then delete all threads.
378
//
379
380
_data->finish ();
381
382
//
383
// Add in new threads
384
//
385
386
while (_data->numThreads < (size_t)count)
387
{
388
_data->threads.push_back (new WorkerThread (_data));
389
_data->numThreads++;
390
}
391
}
392
}
393
394
395
void
396
ThreadPool::addTask (Task* task)
397
{
398
//
399
// Lock the threads, needed to access numThreads
400
//
401
402
Lock lock (_data->threadMutex);
403
404
if (_data->numThreads == 0)
405
{
406
task->execute ();
407
delete task;
408
}
409
else
410
{
411
//
412
// Get exclusive access to the tasks queue
413
//
414
415
{
416
Lock taskLock (_data->taskMutex);
417
418
//
419
// Push the new task into the FIFO
420
//
421
422
_data->tasks.push_back (task);
423
_data->numTasks++;
424
task->group()->_data->addTask();
425
}
426
427
//
428
// Signal that we have a new task to process
429
//
430
431
_data->taskSemaphore.post ();
432
}
433
}
434
435
436
ThreadPool&
437
ThreadPool::globalThreadPool ()
438
{
439
//
440
// The global thread pool
441
//
442
443
static ThreadPool gThreadPool (0);
444
445
return gThreadPool;
446
}
447
448
449
void
450
ThreadPool::addGlobalTask (Task* task)
451
{
452
globalThreadPool().addTask (task);
453
}
454
455
456
} // namespace IlmThread
457
458