Path: blob/master/thirdparty/libwebp/src/utils/thread_utils.c
9912 views
// Copyright 2011 Google Inc. All Rights Reserved.1//2// Use of this source code is governed by a BSD-style license3// that can be found in the COPYING file in the root of the source4// tree. An additional intellectual property rights grant can be found5// in the file PATENTS. All contributing project authors may6// be found in the AUTHORS file in the root of the source tree.7// -----------------------------------------------------------------------------8//9// Multi-threaded worker10//11// Author: Skal ([email protected])1213#include <assert.h>14#include <string.h> // for memset()15#include "src/utils/thread_utils.h"16#include "src/utils/utils.h"1718#ifdef WEBP_USE_THREAD1920#if defined(_WIN32)2122#include <windows.h>23typedef HANDLE pthread_t;24typedef CRITICAL_SECTION pthread_mutex_t;2526#if _WIN32_WINNT >= 0x0600 // Windows Vista / Server 2008 or greater27#define USE_WINDOWS_CONDITION_VARIABLE28typedef CONDITION_VARIABLE pthread_cond_t;29#else30typedef struct {31HANDLE waiting_sem_;32HANDLE received_sem_;33HANDLE signal_event_;34} pthread_cond_t;35#endif // _WIN32_WINNT >= 0x6003637#ifndef WINAPI_FAMILY_PARTITION38#define WINAPI_PARTITION_DESKTOP 139#define WINAPI_FAMILY_PARTITION(x) x40#endif4142#if !WINAPI_FAMILY_PARTITION(WINAPI_PARTITION_DESKTOP)43#define USE_CREATE_THREAD44#endif4546#else // !_WIN324748#include <pthread.h>4950#endif // _WIN325152typedef struct {53pthread_mutex_t mutex_;54pthread_cond_t condition_;55pthread_t thread_;56} WebPWorkerImpl;5758#if defined(_WIN32)5960//------------------------------------------------------------------------------61// simplistic pthread emulation layer6263#include <process.h>6465// _beginthreadex requires __stdcall66#define THREADFN unsigned int __stdcall67#define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val)6869#if _WIN32_WINNT >= 0x0501 // Windows XP or greater70#define WaitForSingleObject(obj, timeout) \71WaitForSingleObjectEx(obj, timeout, FALSE /*bAlertable*/)72#endif7374static int pthread_create(pthread_t* const thread, const void* attr,75unsigned int (__stdcall* start)(void*), void* arg) {76(void)attr;77#ifdef USE_CREATE_THREAD78*thread = CreateThread(NULL, /* lpThreadAttributes */790, /* dwStackSize */80start,81arg,820, /* dwStackSize */83NULL); /* lpThreadId */84#else85*thread = (pthread_t)_beginthreadex(NULL, /* void *security */860, /* unsigned stack_size */87start,88arg,890, /* unsigned initflag */90NULL); /* unsigned *thrdaddr */91#endif92if (*thread == NULL) return 1;93SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL);94return 0;95}9697static int pthread_join(pthread_t thread, void** value_ptr) {98(void)value_ptr;99return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 ||100CloseHandle(thread) == 0);101}102103// Mutex104static int pthread_mutex_init(pthread_mutex_t* const mutex, void* mutexattr) {105(void)mutexattr;106#if _WIN32_WINNT >= 0x0600 // Windows Vista / Server 2008 or greater107InitializeCriticalSectionEx(mutex, 0 /*dwSpinCount*/, 0 /*Flags*/);108#else109InitializeCriticalSection(mutex);110#endif111return 0;112}113114static int pthread_mutex_lock(pthread_mutex_t* const mutex) {115EnterCriticalSection(mutex);116return 0;117}118119static int pthread_mutex_unlock(pthread_mutex_t* const mutex) {120LeaveCriticalSection(mutex);121return 0;122}123124static int pthread_mutex_destroy(pthread_mutex_t* const mutex) {125DeleteCriticalSection(mutex);126return 0;127}128129// Condition130static int pthread_cond_destroy(pthread_cond_t* const condition) {131int ok = 1;132#ifdef USE_WINDOWS_CONDITION_VARIABLE133(void)condition;134#else135ok &= (CloseHandle(condition->waiting_sem_) != 0);136ok &= (CloseHandle(condition->received_sem_) != 0);137ok &= (CloseHandle(condition->signal_event_) != 0);138#endif139return !ok;140}141142static int pthread_cond_init(pthread_cond_t* const condition, void* cond_attr) {143(void)cond_attr;144#ifdef USE_WINDOWS_CONDITION_VARIABLE145InitializeConditionVariable(condition);146#else147condition->waiting_sem_ = CreateSemaphore(NULL, 0, 1, NULL);148condition->received_sem_ = CreateSemaphore(NULL, 0, 1, NULL);149condition->signal_event_ = CreateEvent(NULL, FALSE, FALSE, NULL);150if (condition->waiting_sem_ == NULL ||151condition->received_sem_ == NULL ||152condition->signal_event_ == NULL) {153pthread_cond_destroy(condition);154return 1;155}156#endif157return 0;158}159160static int pthread_cond_signal(pthread_cond_t* const condition) {161int ok = 1;162#ifdef USE_WINDOWS_CONDITION_VARIABLE163WakeConditionVariable(condition);164#else165if (WaitForSingleObject(condition->waiting_sem_, 0) == WAIT_OBJECT_0) {166// a thread is waiting in pthread_cond_wait: allow it to be notified167ok = SetEvent(condition->signal_event_);168// wait until the event is consumed so the signaler cannot consume169// the event via its own pthread_cond_wait.170ok &= (WaitForSingleObject(condition->received_sem_, INFINITE) !=171WAIT_OBJECT_0);172}173#endif174return !ok;175}176177static int pthread_cond_wait(pthread_cond_t* const condition,178pthread_mutex_t* const mutex) {179int ok;180#ifdef USE_WINDOWS_CONDITION_VARIABLE181ok = SleepConditionVariableCS(condition, mutex, INFINITE);182#else183// note that there is a consumer available so the signal isn't dropped in184// pthread_cond_signal185if (!ReleaseSemaphore(condition->waiting_sem_, 1, NULL)) return 1;186// now unlock the mutex so pthread_cond_signal may be issued187pthread_mutex_unlock(mutex);188ok = (WaitForSingleObject(condition->signal_event_, INFINITE) ==189WAIT_OBJECT_0);190ok &= ReleaseSemaphore(condition->received_sem_, 1, NULL);191pthread_mutex_lock(mutex);192#endif193return !ok;194}195196#else // !_WIN32197# define THREADFN void*198# define THREAD_RETURN(val) val199#endif // _WIN32200201//------------------------------------------------------------------------------202203static THREADFN ThreadLoop(void* ptr) {204WebPWorker* const worker = (WebPWorker*)ptr;205WebPWorkerImpl* const impl = (WebPWorkerImpl*)worker->impl_;206int done = 0;207while (!done) {208pthread_mutex_lock(&impl->mutex_);209while (worker->status_ == OK) { // wait in idling mode210pthread_cond_wait(&impl->condition_, &impl->mutex_);211}212if (worker->status_ == WORK) {213WebPGetWorkerInterface()->Execute(worker);214worker->status_ = OK;215} else if (worker->status_ == NOT_OK) { // finish the worker216done = 1;217}218// signal to the main thread that we're done (for Sync())219// Note the associated mutex does not need to be held when signaling the220// condition. Unlocking the mutex first may improve performance in some221// implementations, avoiding the case where the waiting thread can't222// reacquire the mutex when woken.223pthread_mutex_unlock(&impl->mutex_);224pthread_cond_signal(&impl->condition_);225}226return THREAD_RETURN(NULL); // Thread is finished227}228229// main thread state control230static void ChangeState(WebPWorker* const worker, WebPWorkerStatus new_status) {231// No-op when attempting to change state on a thread that didn't come up.232// Checking status_ without acquiring the lock first would result in a data233// race.234WebPWorkerImpl* const impl = (WebPWorkerImpl*)worker->impl_;235if (impl == NULL) return;236237pthread_mutex_lock(&impl->mutex_);238if (worker->status_ >= OK) {239// wait for the worker to finish240while (worker->status_ != OK) {241pthread_cond_wait(&impl->condition_, &impl->mutex_);242}243// assign new status and release the working thread if needed244if (new_status != OK) {245worker->status_ = new_status;246// Note the associated mutex does not need to be held when signaling the247// condition. Unlocking the mutex first may improve performance in some248// implementations, avoiding the case where the waiting thread can't249// reacquire the mutex when woken.250pthread_mutex_unlock(&impl->mutex_);251pthread_cond_signal(&impl->condition_);252return;253}254}255pthread_mutex_unlock(&impl->mutex_);256}257258#endif // WEBP_USE_THREAD259260//------------------------------------------------------------------------------261262static void Init(WebPWorker* const worker) {263memset(worker, 0, sizeof(*worker));264worker->status_ = NOT_OK;265}266267static int Sync(WebPWorker* const worker) {268#ifdef WEBP_USE_THREAD269ChangeState(worker, OK);270#endif271assert(worker->status_ <= OK);272return !worker->had_error;273}274275static int Reset(WebPWorker* const worker) {276int ok = 1;277worker->had_error = 0;278if (worker->status_ < OK) {279#ifdef WEBP_USE_THREAD280WebPWorkerImpl* const impl =281(WebPWorkerImpl*)WebPSafeCalloc(1, sizeof(WebPWorkerImpl));282worker->impl_ = (void*)impl;283if (worker->impl_ == NULL) {284return 0;285}286if (pthread_mutex_init(&impl->mutex_, NULL)) {287goto Error;288}289if (pthread_cond_init(&impl->condition_, NULL)) {290pthread_mutex_destroy(&impl->mutex_);291goto Error;292}293pthread_mutex_lock(&impl->mutex_);294ok = !pthread_create(&impl->thread_, NULL, ThreadLoop, worker);295if (ok) worker->status_ = OK;296pthread_mutex_unlock(&impl->mutex_);297if (!ok) {298pthread_mutex_destroy(&impl->mutex_);299pthread_cond_destroy(&impl->condition_);300Error:301WebPSafeFree(impl);302worker->impl_ = NULL;303return 0;304}305#else306worker->status_ = OK;307#endif308} else if (worker->status_ > OK) {309ok = Sync(worker);310}311assert(!ok || (worker->status_ == OK));312return ok;313}314315static void Execute(WebPWorker* const worker) {316if (worker->hook != NULL) {317worker->had_error |= !worker->hook(worker->data1, worker->data2);318}319}320321static void Launch(WebPWorker* const worker) {322#ifdef WEBP_USE_THREAD323ChangeState(worker, WORK);324#else325Execute(worker);326#endif327}328329static void End(WebPWorker* const worker) {330#ifdef WEBP_USE_THREAD331if (worker->impl_ != NULL) {332WebPWorkerImpl* const impl = (WebPWorkerImpl*)worker->impl_;333ChangeState(worker, NOT_OK);334pthread_join(impl->thread_, NULL);335pthread_mutex_destroy(&impl->mutex_);336pthread_cond_destroy(&impl->condition_);337WebPSafeFree(impl);338worker->impl_ = NULL;339}340#else341worker->status_ = NOT_OK;342assert(worker->impl_ == NULL);343#endif344assert(worker->status_ == NOT_OK);345}346347//------------------------------------------------------------------------------348349static WebPWorkerInterface g_worker_interface = {350Init, Reset, Sync, Launch, Execute, End351};352353int WebPSetWorkerInterface(const WebPWorkerInterface* const winterface) {354if (winterface == NULL ||355winterface->Init == NULL || winterface->Reset == NULL ||356winterface->Sync == NULL || winterface->Launch == NULL ||357winterface->Execute == NULL || winterface->End == NULL) {358return 0;359}360g_worker_interface = *winterface;361return 1;362}363364const WebPWorkerInterface* WebPGetWorkerInterface(void) {365return &g_worker_interface;366}367368//------------------------------------------------------------------------------369370371