Path: blob/master/thirdparty/libwebp/src/utils/thread_utils.c
20888 views
// Copyright 2011 Google Inc. All Rights Reserved.1//2// Use of this source code is governed by a BSD-style license3// that can be found in the COPYING file in the root of the source4// tree. An additional intellectual property rights grant can be found5// in the file PATENTS. All contributing project authors may6// be found in the AUTHORS file in the root of the source tree.7// -----------------------------------------------------------------------------8//9// Multi-threaded worker10//11// Author: Skal ([email protected])1213#include <assert.h>14#include <string.h> // for memset()1516#include "src/utils/thread_utils.h"17#include "src/utils/utils.h"1819#ifdef WEBP_USE_THREAD2021#if defined(_WIN32)2223#include <windows.h>24typedef HANDLE pthread_t;25typedef CRITICAL_SECTION pthread_mutex_t;2627#if _WIN32_WINNT >= 0x0600 // Windows Vista / Server 2008 or greater28#define USE_WINDOWS_CONDITION_VARIABLE29typedef CONDITION_VARIABLE pthread_cond_t;30#else31typedef struct {32HANDLE waiting_sem;33HANDLE received_sem;34HANDLE signal_event;35} pthread_cond_t;36#endif // _WIN32_WINNT >= 0x6003738#ifndef WINAPI_FAMILY_PARTITION39#define WINAPI_PARTITION_DESKTOP 140#define WINAPI_FAMILY_PARTITION(x) x41#endif4243#if !WINAPI_FAMILY_PARTITION(WINAPI_PARTITION_DESKTOP)44#define USE_CREATE_THREAD45#endif4647#else // !_WIN324849#include <pthread.h>5051#endif // _WIN325253typedef struct {54pthread_mutex_t mutex;55pthread_cond_t condition;56pthread_t thread;57} WebPWorkerImpl;5859#if defined(_WIN32)6061//------------------------------------------------------------------------------62// simplistic pthread emulation layer6364#include <process.h>6566// _beginthreadex requires __stdcall67#define THREADFN unsigned int __stdcall68#define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val)6970#if _WIN32_WINNT >= 0x0501 // Windows XP or greater71#define WaitForSingleObject(obj, timeout) \72WaitForSingleObjectEx(obj, timeout, FALSE /*bAlertable*/)73#endif7475static int pthread_create(pthread_t* const thread, const void* attr,76unsigned int (__stdcall* start)(void*), void* arg) {77(void)attr;78#ifdef USE_CREATE_THREAD79*thread = CreateThread(NULL, /* lpThreadAttributes */800, /* dwStackSize */81start,82arg,830, /* dwStackSize */84NULL); /* lpThreadId */85#else86*thread = (pthread_t)_beginthreadex(NULL, /* void *security */870, /* unsigned stack_size */88start,89arg,900, /* unsigned initflag */91NULL); /* unsigned *thrdaddr */92#endif93if (*thread == NULL) return 1;94SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL);95return 0;96}9798static int pthread_join(pthread_t thread, void** value_ptr) {99(void)value_ptr;100return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 ||101CloseHandle(thread) == 0);102}103104// Mutex105static int pthread_mutex_init(pthread_mutex_t* const mutex, void* mutexattr) {106(void)mutexattr;107#if _WIN32_WINNT >= 0x0600 // Windows Vista / Server 2008 or greater108InitializeCriticalSectionEx(mutex, 0 /*dwSpinCount*/, 0 /*Flags*/);109#else110InitializeCriticalSection(mutex);111#endif112return 0;113}114115static int pthread_mutex_lock(pthread_mutex_t* const mutex) {116EnterCriticalSection(mutex);117return 0;118}119120static int pthread_mutex_unlock(pthread_mutex_t* const mutex) {121LeaveCriticalSection(mutex);122return 0;123}124125static int pthread_mutex_destroy(pthread_mutex_t* const mutex) {126DeleteCriticalSection(mutex);127return 0;128}129130// Condition131static int pthread_cond_destroy(pthread_cond_t* const condition) {132int ok = 1;133#ifdef USE_WINDOWS_CONDITION_VARIABLE134(void)condition;135#else136ok &= (CloseHandle(condition->waiting_sem) != 0);137ok &= (CloseHandle(condition->received_sem) != 0);138ok &= (CloseHandle(condition->signal_event) != 0);139#endif140return !ok;141}142143static int pthread_cond_init(pthread_cond_t* const condition, void* cond_attr) {144(void)cond_attr;145#ifdef USE_WINDOWS_CONDITION_VARIABLE146InitializeConditionVariable(condition);147#else148condition->waiting_sem = CreateSemaphore(NULL, 0, 1, NULL);149condition->received_sem = CreateSemaphore(NULL, 0, 1, NULL);150condition->signal_event = CreateEvent(NULL, FALSE, FALSE, NULL);151if (condition->waiting_sem == NULL ||152condition->received_sem == NULL ||153condition->signal_event == NULL) {154pthread_cond_destroy(condition);155return 1;156}157#endif158return 0;159}160161static int pthread_cond_signal(pthread_cond_t* const condition) {162int ok = 1;163#ifdef USE_WINDOWS_CONDITION_VARIABLE164WakeConditionVariable(condition);165#else166if (WaitForSingleObject(condition->waiting_sem, 0) == WAIT_OBJECT_0) {167// a thread is waiting in pthread_cond_wait: allow it to be notified168ok = SetEvent(condition->signal_event);169// wait until the event is consumed so the signaler cannot consume170// the event via its own pthread_cond_wait.171ok &= (WaitForSingleObject(condition->received_sem, INFINITE) !=172WAIT_OBJECT_0);173}174#endif175return !ok;176}177178static int pthread_cond_wait(pthread_cond_t* const condition,179pthread_mutex_t* const mutex) {180int ok;181#ifdef USE_WINDOWS_CONDITION_VARIABLE182ok = SleepConditionVariableCS(condition, mutex, INFINITE);183#else184// note that there is a consumer available so the signal isn't dropped in185// pthread_cond_signal186if (!ReleaseSemaphore(condition->waiting_sem, 1, NULL)) return 1;187// now unlock the mutex so pthread_cond_signal may be issued188pthread_mutex_unlock(mutex);189ok = (WaitForSingleObject(condition->signal_event, INFINITE) ==190WAIT_OBJECT_0);191ok &= ReleaseSemaphore(condition->received_sem, 1, NULL);192pthread_mutex_lock(mutex);193#endif194return !ok;195}196197#else // !_WIN32198# define THREADFN void*199# define THREAD_RETURN(val) val200#endif // _WIN32201202//------------------------------------------------------------------------------203204static THREADFN ThreadLoop(void* ptr) {205WebPWorker* const worker = (WebPWorker*)ptr;206WebPWorkerImpl* const impl = (WebPWorkerImpl*)worker->impl;207int done = 0;208while (!done) {209pthread_mutex_lock(&impl->mutex);210while (worker->status == OK) { // wait in idling mode211pthread_cond_wait(&impl->condition, &impl->mutex);212}213if (worker->status == WORK) {214WebPGetWorkerInterface()->Execute(worker);215worker->status = OK;216} else if (worker->status == NOT_OK) { // finish the worker217done = 1;218}219// signal to the main thread that we're done (for Sync())220// Note the associated mutex does not need to be held when signaling the221// condition. Unlocking the mutex first may improve performance in some222// implementations, avoiding the case where the waiting thread can't223// reacquire the mutex when woken.224pthread_mutex_unlock(&impl->mutex);225pthread_cond_signal(&impl->condition);226}227return THREAD_RETURN(NULL); // Thread is finished228}229230// main thread state control231static void ChangeState(WebPWorker* const worker, WebPWorkerStatus new_status) {232// No-op when attempting to change state on a thread that didn't come up.233// Checking 'status' without acquiring the lock first would result in a data234// race.235WebPWorkerImpl* const impl = (WebPWorkerImpl*)worker->impl;236if (impl == NULL) return;237238pthread_mutex_lock(&impl->mutex);239if (worker->status >= OK) {240// wait for the worker to finish241while (worker->status != OK) {242pthread_cond_wait(&impl->condition, &impl->mutex);243}244// assign new status and release the working thread if needed245if (new_status != OK) {246worker->status = new_status;247// Note the associated mutex does not need to be held when signaling the248// condition. Unlocking the mutex first may improve performance in some249// implementations, avoiding the case where the waiting thread can't250// reacquire the mutex when woken.251pthread_mutex_unlock(&impl->mutex);252pthread_cond_signal(&impl->condition);253return;254}255}256pthread_mutex_unlock(&impl->mutex);257}258259#endif // WEBP_USE_THREAD260261//------------------------------------------------------------------------------262263static void Init(WebPWorker* const worker) {264memset(worker, 0, sizeof(*worker));265worker->status = NOT_OK;266}267268static int Sync(WebPWorker* const worker) {269#ifdef WEBP_USE_THREAD270ChangeState(worker, OK);271#endif272assert(worker->status <= OK);273return !worker->had_error;274}275276static int Reset(WebPWorker* const worker) {277int ok = 1;278worker->had_error = 0;279if (worker->status < OK) {280#ifdef WEBP_USE_THREAD281WebPWorkerImpl* const impl =282(WebPWorkerImpl*)WebPSafeCalloc(1, sizeof(WebPWorkerImpl));283worker->impl = (void*)impl;284if (worker->impl == NULL) {285return 0;286}287if (pthread_mutex_init(&impl->mutex, NULL)) {288goto Error;289}290if (pthread_cond_init(&impl->condition, NULL)) {291pthread_mutex_destroy(&impl->mutex);292goto Error;293}294pthread_mutex_lock(&impl->mutex);295ok = !pthread_create(&impl->thread, NULL, ThreadLoop, worker);296if (ok) worker->status = OK;297pthread_mutex_unlock(&impl->mutex);298if (!ok) {299pthread_mutex_destroy(&impl->mutex);300pthread_cond_destroy(&impl->condition);301Error:302WebPSafeFree(impl);303worker->impl = NULL;304return 0;305}306#else307worker->status = OK;308#endif309} else if (worker->status > OK) {310ok = Sync(worker);311}312assert(!ok || (worker->status == OK));313return ok;314}315316static void Execute(WebPWorker* const worker) {317if (worker->hook != NULL) {318worker->had_error |= !worker->hook(worker->data1, worker->data2);319}320}321322static void Launch(WebPWorker* const worker) {323#ifdef WEBP_USE_THREAD324ChangeState(worker, WORK);325#else326Execute(worker);327#endif328}329330static void End(WebPWorker* const worker) {331#ifdef WEBP_USE_THREAD332if (worker->impl != NULL) {333WebPWorkerImpl* const impl = (WebPWorkerImpl*)worker->impl;334ChangeState(worker, NOT_OK);335pthread_join(impl->thread, NULL);336pthread_mutex_destroy(&impl->mutex);337pthread_cond_destroy(&impl->condition);338WebPSafeFree(impl);339worker->impl = NULL;340}341#else342worker->status = NOT_OK;343assert(worker->impl == NULL);344#endif345assert(worker->status == NOT_OK);346}347348//------------------------------------------------------------------------------349350static WebPWorkerInterface g_worker_interface = {351Init, Reset, Sync, Launch, Execute, End352};353354int WebPSetWorkerInterface(const WebPWorkerInterface* const winterface) {355if (winterface == NULL ||356winterface->Init == NULL || winterface->Reset == NULL ||357winterface->Sync == NULL || winterface->Launch == NULL ||358winterface->Execute == NULL || winterface->End == NULL) {359return 0;360}361g_worker_interface = *winterface;362return 1;363}364365const WebPWorkerInterface* WebPGetWorkerInterface(void) {366return &g_worker_interface;367}368369//------------------------------------------------------------------------------370371372