Path: blob/master/3rdparty/openexr/IlmThread/IlmThreadPool.cpp
16337 views
///////////////////////////////////////////////////////////////////////////1//2// Copyright (c) 2005, Industrial Light & Magic, a division of Lucas3// Digital Ltd. LLC4//5// All rights reserved.6//7// Redistribution and use in source and binary forms, with or without8// modification, are permitted provided that the following conditions are9// met:10// * Redistributions of source code must retain the above copyright11// notice, this list of conditions and the following disclaimer.12// * Redistributions in binary form must reproduce the above13// copyright notice, this list of conditions and the following disclaimer14// in the documentation and/or other materials provided with the15// distribution.16// * Neither the name of Industrial Light & Magic nor the names of17// its contributors may be used to endorse or promote products derived18// from this software without specific prior written permission.19//20// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS21// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT22// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR23// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT24// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,25// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT26// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,27// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY28// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT29// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE30// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.31//32///////////////////////////////////////////////////////////////////////////3334//-----------------------------------------------------------------------------35//36// class Task, class ThreadPool, class TaskGroup37//38//-----------------------------------------------------------------------------3940#include "IlmThread.h"41#include "IlmThreadMutex.h"42#include "IlmThreadSemaphore.h"43#include "IlmThreadPool.h"44#include "Iex.h"45#include <list>4647using namespace std;4849namespace IlmThread {50namespace {5152class WorkerThread: public Thread53{54public:5556WorkerThread (ThreadPool::Data* data);5758virtual void run ();5960private:6162ThreadPool::Data * _data;63};6465} //namespace666768struct TaskGroup::Data69{70Data ();71~Data ();7273void addTask () ;74void removeTask ();7576Semaphore isEmpty; // used to signal that the taskgroup is empty77int numPending; // number of pending tasks to still execute78};798081struct ThreadPool::Data82{83Data ();84~Data();8586void finish ();87bool stopped () const;88void stop ();8990Semaphore taskSemaphore; // threads wait on this for ready tasks91Mutex taskMutex; // mutual exclusion for the tasks list92list<Task*> tasks; // the list of tasks to execute93size_t numTasks; // fast access to list size94// (list::size() can be O(n))9596Semaphore threadSemaphore; // signaled when a thread starts executing97Mutex threadMutex; // mutual exclusion for threads list98list<WorkerThread*> threads; // the list of all threads99size_t numThreads; // fast access to list size100101bool stopping; // flag indicating whether to stop threads102Mutex stopMutex; // mutual exclusion for stopping flag103};104105106107//108// class WorkerThread109//110111WorkerThread::WorkerThread (ThreadPool::Data* data):112_data (data)113{114start();115}116117118void119WorkerThread::run ()120{121//122// Signal that the thread has started executing123//124125_data->threadSemaphore.post();126127while (true)128{129//130// Wait for a task to become available131//132133_data->taskSemaphore.wait();134135{136Lock taskLock (_data->taskMutex);137138//139// If there is a task pending, pop off the next task in the FIFO140//141142if (_data->numTasks > 0)143{144Task* task = _data->tasks.front();145TaskGroup* taskGroup = task->group();146_data->tasks.pop_front();147_data->numTasks--;148149taskLock.release();150task->execute();151taskLock.acquire();152153delete task;154taskGroup->_data->removeTask();155}156else if (_data->stopped())157{158break;159}160}161}162}163164165//166// struct TaskGroup::Data167//168169TaskGroup::Data::Data (): isEmpty (1), numPending (0)170{171// empty172}173174175TaskGroup::Data::~Data ()176{177//178// A TaskGroup acts like an "inverted" semaphore: if the count179// is above 0 then waiting on the taskgroup will block. This180// destructor waits until the taskgroup is empty before returning.181//182183isEmpty.wait ();184}185186187void188TaskGroup::Data::addTask ()189{190//191// Any access to the taskgroup is protected by a mutex that is192// held by the threadpool. Therefore it is safe to access193// numPending before we wait on the semaphore.194//195196if (numPending++ == 0)197isEmpty.wait ();198}199200201void202TaskGroup::Data::removeTask ()203{204if (--numPending == 0)205isEmpty.post ();206}207208209//210// struct ThreadPool::Data211//212213ThreadPool::Data::Data (): numTasks (0), numThreads (0), stopping (false)214{215// empty216}217218219ThreadPool::Data::~Data()220{221Lock lock (threadMutex);222finish ();223}224225226void227ThreadPool::Data::finish ()228{229stop();230231//232// Signal enough times to allow all threads to stop.233//234// Wait until all threads have started their run functions.235// If we do not wait before we destroy the threads then it's236// possible that the threads have not yet called their run237// functions.238// If this happens then the run function will be called off239// of an invalid object and we will crash, most likely with240// an error like: "pure virtual method called"241//242243for (size_t i = 0; i < numThreads; i++)244{245taskSemaphore.post();246threadSemaphore.wait();247}248249//250// Join all the threads251//252253for (list<WorkerThread*>::iterator i = threads.begin();254i != threads.end();255++i)256{257delete (*i);258}259260Lock lock1 (taskMutex);261Lock lock2 (stopMutex);262threads.clear();263tasks.clear();264numThreads = 0;265numTasks = 0;266stopping = false;267}268269270bool271ThreadPool::Data::stopped () const272{273Lock lock (stopMutex);274return stopping;275}276277278void279ThreadPool::Data::stop ()280{281Lock lock (stopMutex);282stopping = true;283}284285286//287// class Task288//289290Task::Task (TaskGroup* g): _group(g)291{292// empty293}294295296Task::~Task()297{298// empty299}300301302TaskGroup*303Task::group ()304{305return _group;306}307308309TaskGroup::TaskGroup ():310_data (new Data())311{312// empty313}314315316TaskGroup::~TaskGroup ()317{318delete _data;319}320321322//323// class ThreadPool324//325326ThreadPool::ThreadPool (unsigned nthreads):327_data (new Data())328{329setNumThreads (nthreads);330}331332333ThreadPool::~ThreadPool ()334{335delete _data;336}337338339int340ThreadPool::numThreads () const341{342Lock lock (_data->threadMutex);343return _data->numThreads;344}345346347void348ThreadPool::setNumThreads (int count)349{350if (count < 0)351throw Iex::ArgExc ("Attempt to set the number of threads "352"in a thread pool to a negative value.");353354//355// Lock access to thread list and size356//357358Lock lock (_data->threadMutex);359360if ((size_t)count > _data->numThreads)361{362//363// Add more threads364//365366while (_data->numThreads < (size_t)count)367{368_data->threads.push_back (new WorkerThread (_data));369_data->numThreads++;370}371}372else if ((size_t)count < _data->numThreads)373{374//375// Wait until all existing threads are finished processing,376// then delete all threads.377//378379_data->finish ();380381//382// Add in new threads383//384385while (_data->numThreads < (size_t)count)386{387_data->threads.push_back (new WorkerThread (_data));388_data->numThreads++;389}390}391}392393394void395ThreadPool::addTask (Task* task)396{397//398// Lock the threads, needed to access numThreads399//400401Lock lock (_data->threadMutex);402403if (_data->numThreads == 0)404{405task->execute ();406delete task;407}408else409{410//411// Get exclusive access to the tasks queue412//413414{415Lock taskLock (_data->taskMutex);416417//418// Push the new task into the FIFO419//420421_data->tasks.push_back (task);422_data->numTasks++;423task->group()->_data->addTask();424}425426//427// Signal that we have a new task to process428//429430_data->taskSemaphore.post ();431}432}433434435ThreadPool&436ThreadPool::globalThreadPool ()437{438//439// The global thread pool440//441442static ThreadPool gThreadPool (0);443444return gThreadPool;445}446447448void449ThreadPool::addGlobalTask (Task* task)450{451globalThreadPool().addTask (task);452}453454455} // namespace IlmThread456457458