Path: blob/main/sys/contrib/openzfs/module/zstd/lib/common/pool.c
48775 views
// SPDX-License-Identifier: BSD-3-Clause OR GPL-2.0-only1/*2* Copyright (c) 2016-2020, Yann Collet, Facebook, Inc.3* All rights reserved.4*5* This source code is licensed under both the BSD-style license (found in the6* LICENSE file in the root directory of this source tree) and the GPLv2 (found7* in the COPYING file in the root directory of this source tree).8* You may select, at your option, one of the above-listed licenses.9*/101112/* ====== Dependencies ======= */13#include <stddef.h> /* size_t */14#include "debug.h" /* assert */15#include "zstd_internal.h" /* ZSTD_malloc, ZSTD_free */16#include "pool.h"1718/* ====== Compiler specifics ====== */19#if defined(_MSC_VER)20# pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */21#endif222324#ifdef ZSTD_MULTITHREAD2526#include "threading.h" /* pthread adaptation */2728/* A job is a function and an opaque argument */29typedef struct POOL_job_s {30POOL_function function;31void *opaque;32} POOL_job;3334struct POOL_ctx_s {35ZSTD_customMem customMem;36/* Keep track of the threads */37ZSTD_pthread_t* threads;38size_t threadCapacity;39size_t threadLimit;4041/* The queue is a circular buffer */42POOL_job *queue;43size_t queueHead;44size_t queueTail;45size_t queueSize;4647/* The number of threads working on jobs */48size_t numThreadsBusy;49/* Indicates if the queue is empty */50int queueEmpty;5152/* The mutex protects the queue */53ZSTD_pthread_mutex_t queueMutex;54/* Condition variable for pushers to wait on when the queue is full */55ZSTD_pthread_cond_t queuePushCond;56/* Condition variables for poppers to wait on when the queue is empty */57ZSTD_pthread_cond_t queuePopCond;58/* Indicates if the queue is shutting down */59int shutdown;60};6162/* POOL_thread() :63* Work thread for the thread pool.64* Waits for jobs and executes them.65* @returns : NULL on failure else non-null.66*/67static void* POOL_thread(void* opaque) {68POOL_ctx* const ctx = (POOL_ctx*)opaque;69if (!ctx) { return NULL; }70for (;;) {71/* Lock the mutex and wait for a non-empty queue or until shutdown */72ZSTD_pthread_mutex_lock(&ctx->queueMutex);7374while ( ctx->queueEmpty75|| (ctx->numThreadsBusy >= ctx->threadLimit) ) {76if (ctx->shutdown) {77/* even if !queueEmpty, (possible if numThreadsBusy >= threadLimit),78* a few threads will be shutdown while !queueEmpty,79* but enough threads will remain active to finish the queue */80ZSTD_pthread_mutex_unlock(&ctx->queueMutex);81return opaque;82}83ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);84}85/* Pop a job off the queue */86{ POOL_job const job = ctx->queue[ctx->queueHead];87ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;88ctx->numThreadsBusy++;89ctx->queueEmpty = ctx->queueHead == ctx->queueTail;90/* Unlock the mutex, signal a pusher, and run the job */91ZSTD_pthread_cond_signal(&ctx->queuePushCond);92ZSTD_pthread_mutex_unlock(&ctx->queueMutex);9394job.function(job.opaque);9596/* If the intended queue size was 0, signal after finishing job */97ZSTD_pthread_mutex_lock(&ctx->queueMutex);98ctx->numThreadsBusy--;99if (ctx->queueSize == 1) {100ZSTD_pthread_cond_signal(&ctx->queuePushCond);101}102ZSTD_pthread_mutex_unlock(&ctx->queueMutex);103}104} /* for (;;) */105assert(0); /* Unreachable */106}107108POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {109return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);110}111112POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize,113ZSTD_customMem customMem) {114POOL_ctx* ctx;115/* Check parameters */116if (!numThreads) { return NULL; }117/* Allocate the context and zero initialize */118ctx = (POOL_ctx*)ZSTD_calloc(sizeof(POOL_ctx), customMem);119if (!ctx) { return NULL; }120/* Initialize the job queue.121* It needs one extra space since one space is wasted to differentiate122* empty and full queues.123*/124ctx->queueSize = queueSize + 1;125ctx->queue = (POOL_job*)ZSTD_malloc(ctx->queueSize * sizeof(POOL_job), customMem);126ctx->queueHead = 0;127ctx->queueTail = 0;128ctx->numThreadsBusy = 0;129ctx->queueEmpty = 1;130{131int error = 0;132error |= ZSTD_pthread_mutex_init(&ctx->queueMutex, NULL);133error |= ZSTD_pthread_cond_init(&ctx->queuePushCond, NULL);134error |= ZSTD_pthread_cond_init(&ctx->queuePopCond, NULL);135if (error) { POOL_free(ctx); return NULL; }136}137ctx->shutdown = 0;138/* Allocate space for the thread handles */139ctx->threads = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), customMem);140ctx->threadCapacity = 0;141ctx->customMem = customMem;142/* Check for errors */143if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; }144/* Initialize the threads */145{ size_t i;146for (i = 0; i < numThreads; ++i) {147if (ZSTD_pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) {148ctx->threadCapacity = i;149POOL_free(ctx);150return NULL;151} }152ctx->threadCapacity = numThreads;153ctx->threadLimit = numThreads;154}155return ctx;156}157158/*! POOL_join() :159Shutdown the queue, wake any sleeping threads, and join all of the threads.160*/161static void POOL_join(POOL_ctx* ctx) {162/* Shut down the queue */163ZSTD_pthread_mutex_lock(&ctx->queueMutex);164ctx->shutdown = 1;165ZSTD_pthread_mutex_unlock(&ctx->queueMutex);166/* Wake up sleeping threads */167ZSTD_pthread_cond_broadcast(&ctx->queuePushCond);168ZSTD_pthread_cond_broadcast(&ctx->queuePopCond);169/* Join all of the threads */170{ size_t i;171for (i = 0; i < ctx->threadCapacity; ++i) {172ZSTD_pthread_join(ctx->threads[i], NULL); /* note : could fail */173} }174}175176void POOL_free(POOL_ctx *ctx) {177if (!ctx) { return; }178POOL_join(ctx);179ZSTD_pthread_mutex_destroy(&ctx->queueMutex);180ZSTD_pthread_cond_destroy(&ctx->queuePushCond);181ZSTD_pthread_cond_destroy(&ctx->queuePopCond);182ZSTD_free(ctx->queue, ctx->customMem);183ZSTD_free(ctx->threads, ctx->customMem);184ZSTD_free(ctx, ctx->customMem);185}186187188189size_t POOL_sizeof(POOL_ctx *ctx) {190if (ctx==NULL) return 0; /* supports sizeof NULL */191return sizeof(*ctx)192+ ctx->queueSize * sizeof(POOL_job)193+ ctx->threadCapacity * sizeof(ZSTD_pthread_t);194}195196197/* @return : 0 on success, 1 on error */198static int POOL_resize_internal(POOL_ctx* ctx, size_t numThreads)199{200if (numThreads <= ctx->threadCapacity) {201if (!numThreads) return 1;202ctx->threadLimit = numThreads;203return 0;204}205/* numThreads > threadCapacity */206{ ZSTD_pthread_t* const threadPool = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), ctx->customMem);207if (!threadPool) return 1;208/* replace existing thread pool */209memcpy(threadPool, ctx->threads, ctx->threadCapacity * sizeof(*threadPool));210ZSTD_free(ctx->threads, ctx->customMem);211ctx->threads = threadPool;212/* Initialize additional threads */213{ size_t threadId;214for (threadId = ctx->threadCapacity; threadId < numThreads; ++threadId) {215if (ZSTD_pthread_create(&threadPool[threadId], NULL, &POOL_thread, ctx)) {216ctx->threadCapacity = threadId;217return 1;218} }219} }220/* successfully expanded */221ctx->threadCapacity = numThreads;222ctx->threadLimit = numThreads;223return 0;224}225226/* @return : 0 on success, 1 on error */227int POOL_resize(POOL_ctx* ctx, size_t numThreads)228{229int result;230if (ctx==NULL) return 1;231ZSTD_pthread_mutex_lock(&ctx->queueMutex);232result = POOL_resize_internal(ctx, numThreads);233ZSTD_pthread_cond_broadcast(&ctx->queuePopCond);234ZSTD_pthread_mutex_unlock(&ctx->queueMutex);235return result;236}237238/**239* Returns 1 if the queue is full and 0 otherwise.240*241* When queueSize is 1 (pool was created with an intended queueSize of 0),242* then a queue is empty if there is a thread free _and_ no job is waiting.243*/244static int isQueueFull(POOL_ctx const* ctx) {245if (ctx->queueSize > 1) {246return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);247} else {248return (ctx->numThreadsBusy == ctx->threadLimit) ||249!ctx->queueEmpty;250}251}252253254static void POOL_add_internal(POOL_ctx* ctx, POOL_function function, void *opaque)255{256POOL_job const job = {function, opaque};257assert(ctx != NULL);258if (ctx->shutdown) return;259260ctx->queueEmpty = 0;261ctx->queue[ctx->queueTail] = job;262ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;263ZSTD_pthread_cond_signal(&ctx->queuePopCond);264}265266void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque)267{268assert(ctx != NULL);269ZSTD_pthread_mutex_lock(&ctx->queueMutex);270/* Wait until there is space in the queue for the new job */271while (isQueueFull(ctx) && (!ctx->shutdown)) {272ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);273}274POOL_add_internal(ctx, function, opaque);275ZSTD_pthread_mutex_unlock(&ctx->queueMutex);276}277278279int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque)280{281assert(ctx != NULL);282ZSTD_pthread_mutex_lock(&ctx->queueMutex);283if (isQueueFull(ctx)) {284ZSTD_pthread_mutex_unlock(&ctx->queueMutex);285return 0;286}287POOL_add_internal(ctx, function, opaque);288ZSTD_pthread_mutex_unlock(&ctx->queueMutex);289return 1;290}291292293#else /* ZSTD_MULTITHREAD not defined */294295/* ========================== */296/* No multi-threading support */297/* ========================== */298299300/* We don't need any data, but if it is empty, malloc() might return NULL. */301struct POOL_ctx_s {302int dummy;303};304static POOL_ctx g_ctx;305306POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {307return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);308}309310POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) {311(void)numThreads;312(void)queueSize;313(void)customMem;314return &g_ctx;315}316317void POOL_free(POOL_ctx* ctx) {318assert(!ctx || ctx == &g_ctx);319(void)ctx;320}321322int POOL_resize(POOL_ctx* ctx, size_t numThreads) {323(void)ctx; (void)numThreads;324return 0;325}326327void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) {328(void)ctx;329function(opaque);330}331332int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) {333(void)ctx;334function(opaque);335return 1;336}337338size_t POOL_sizeof(POOL_ctx* ctx) {339if (ctx==NULL) return 0; /* supports sizeof NULL */340assert(ctx == &g_ctx);341return sizeof(*ctx);342}343344#endif /* ZSTD_MULTITHREAD */345346347