Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/sys/contrib/openzfs/module/zstd/lib/common/pool.c
48775 views
1
// SPDX-License-Identifier: BSD-3-Clause OR GPL-2.0-only
2
/*
3
* Copyright (c) 2016-2020, Yann Collet, Facebook, Inc.
4
* All rights reserved.
5
*
6
* This source code is licensed under both the BSD-style license (found in the
7
* LICENSE file in the root directory of this source tree) and the GPLv2 (found
8
* in the COPYING file in the root directory of this source tree).
9
* You may select, at your option, one of the above-listed licenses.
10
*/
11
12
13
/* ====== Dependencies ======= */
14
#include <stddef.h> /* size_t */
15
#include "debug.h" /* assert */
16
#include "zstd_internal.h" /* ZSTD_malloc, ZSTD_free */
17
#include "pool.h"
18
19
/* ====== Compiler specifics ====== */
20
#if defined(_MSC_VER)
21
# pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */
22
#endif
23
24
25
#ifdef ZSTD_MULTITHREAD
26
27
#include "threading.h" /* pthread adaptation */
28
29
/* A job is a function and an opaque argument */
30
typedef struct POOL_job_s {
31
POOL_function function;
32
void *opaque;
33
} POOL_job;
34
35
struct POOL_ctx_s {
36
ZSTD_customMem customMem;
37
/* Keep track of the threads */
38
ZSTD_pthread_t* threads;
39
size_t threadCapacity;
40
size_t threadLimit;
41
42
/* The queue is a circular buffer */
43
POOL_job *queue;
44
size_t queueHead;
45
size_t queueTail;
46
size_t queueSize;
47
48
/* The number of threads working on jobs */
49
size_t numThreadsBusy;
50
/* Indicates if the queue is empty */
51
int queueEmpty;
52
53
/* The mutex protects the queue */
54
ZSTD_pthread_mutex_t queueMutex;
55
/* Condition variable for pushers to wait on when the queue is full */
56
ZSTD_pthread_cond_t queuePushCond;
57
/* Condition variables for poppers to wait on when the queue is empty */
58
ZSTD_pthread_cond_t queuePopCond;
59
/* Indicates if the queue is shutting down */
60
int shutdown;
61
};
62
63
/* POOL_thread() :
64
* Work thread for the thread pool.
65
* Waits for jobs and executes them.
66
* @returns : NULL on failure else non-null.
67
*/
68
static void* POOL_thread(void* opaque) {
69
POOL_ctx* const ctx = (POOL_ctx*)opaque;
70
if (!ctx) { return NULL; }
71
for (;;) {
72
/* Lock the mutex and wait for a non-empty queue or until shutdown */
73
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
74
75
while ( ctx->queueEmpty
76
|| (ctx->numThreadsBusy >= ctx->threadLimit) ) {
77
if (ctx->shutdown) {
78
/* even if !queueEmpty, (possible if numThreadsBusy >= threadLimit),
79
* a few threads will be shutdown while !queueEmpty,
80
* but enough threads will remain active to finish the queue */
81
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
82
return opaque;
83
}
84
ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
85
}
86
/* Pop a job off the queue */
87
{ POOL_job const job = ctx->queue[ctx->queueHead];
88
ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
89
ctx->numThreadsBusy++;
90
ctx->queueEmpty = ctx->queueHead == ctx->queueTail;
91
/* Unlock the mutex, signal a pusher, and run the job */
92
ZSTD_pthread_cond_signal(&ctx->queuePushCond);
93
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
94
95
job.function(job.opaque);
96
97
/* If the intended queue size was 0, signal after finishing job */
98
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
99
ctx->numThreadsBusy--;
100
if (ctx->queueSize == 1) {
101
ZSTD_pthread_cond_signal(&ctx->queuePushCond);
102
}
103
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
104
}
105
} /* for (;;) */
106
assert(0); /* Unreachable */
107
}
108
109
POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
110
return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);
111
}
112
113
POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize,
114
ZSTD_customMem customMem) {
115
POOL_ctx* ctx;
116
/* Check parameters */
117
if (!numThreads) { return NULL; }
118
/* Allocate the context and zero initialize */
119
ctx = (POOL_ctx*)ZSTD_calloc(sizeof(POOL_ctx), customMem);
120
if (!ctx) { return NULL; }
121
/* Initialize the job queue.
122
* It needs one extra space since one space is wasted to differentiate
123
* empty and full queues.
124
*/
125
ctx->queueSize = queueSize + 1;
126
ctx->queue = (POOL_job*)ZSTD_malloc(ctx->queueSize * sizeof(POOL_job), customMem);
127
ctx->queueHead = 0;
128
ctx->queueTail = 0;
129
ctx->numThreadsBusy = 0;
130
ctx->queueEmpty = 1;
131
{
132
int error = 0;
133
error |= ZSTD_pthread_mutex_init(&ctx->queueMutex, NULL);
134
error |= ZSTD_pthread_cond_init(&ctx->queuePushCond, NULL);
135
error |= ZSTD_pthread_cond_init(&ctx->queuePopCond, NULL);
136
if (error) { POOL_free(ctx); return NULL; }
137
}
138
ctx->shutdown = 0;
139
/* Allocate space for the thread handles */
140
ctx->threads = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), customMem);
141
ctx->threadCapacity = 0;
142
ctx->customMem = customMem;
143
/* Check for errors */
144
if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; }
145
/* Initialize the threads */
146
{ size_t i;
147
for (i = 0; i < numThreads; ++i) {
148
if (ZSTD_pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) {
149
ctx->threadCapacity = i;
150
POOL_free(ctx);
151
return NULL;
152
} }
153
ctx->threadCapacity = numThreads;
154
ctx->threadLimit = numThreads;
155
}
156
return ctx;
157
}
158
159
/*! POOL_join() :
160
Shutdown the queue, wake any sleeping threads, and join all of the threads.
161
*/
162
static void POOL_join(POOL_ctx* ctx) {
163
/* Shut down the queue */
164
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
165
ctx->shutdown = 1;
166
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
167
/* Wake up sleeping threads */
168
ZSTD_pthread_cond_broadcast(&ctx->queuePushCond);
169
ZSTD_pthread_cond_broadcast(&ctx->queuePopCond);
170
/* Join all of the threads */
171
{ size_t i;
172
for (i = 0; i < ctx->threadCapacity; ++i) {
173
ZSTD_pthread_join(ctx->threads[i], NULL); /* note : could fail */
174
} }
175
}
176
177
void POOL_free(POOL_ctx *ctx) {
178
if (!ctx) { return; }
179
POOL_join(ctx);
180
ZSTD_pthread_mutex_destroy(&ctx->queueMutex);
181
ZSTD_pthread_cond_destroy(&ctx->queuePushCond);
182
ZSTD_pthread_cond_destroy(&ctx->queuePopCond);
183
ZSTD_free(ctx->queue, ctx->customMem);
184
ZSTD_free(ctx->threads, ctx->customMem);
185
ZSTD_free(ctx, ctx->customMem);
186
}
187
188
189
190
size_t POOL_sizeof(POOL_ctx *ctx) {
191
if (ctx==NULL) return 0; /* supports sizeof NULL */
192
return sizeof(*ctx)
193
+ ctx->queueSize * sizeof(POOL_job)
194
+ ctx->threadCapacity * sizeof(ZSTD_pthread_t);
195
}
196
197
198
/* @return : 0 on success, 1 on error */
199
static int POOL_resize_internal(POOL_ctx* ctx, size_t numThreads)
200
{
201
if (numThreads <= ctx->threadCapacity) {
202
if (!numThreads) return 1;
203
ctx->threadLimit = numThreads;
204
return 0;
205
}
206
/* numThreads > threadCapacity */
207
{ ZSTD_pthread_t* const threadPool = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), ctx->customMem);
208
if (!threadPool) return 1;
209
/* replace existing thread pool */
210
memcpy(threadPool, ctx->threads, ctx->threadCapacity * sizeof(*threadPool));
211
ZSTD_free(ctx->threads, ctx->customMem);
212
ctx->threads = threadPool;
213
/* Initialize additional threads */
214
{ size_t threadId;
215
for (threadId = ctx->threadCapacity; threadId < numThreads; ++threadId) {
216
if (ZSTD_pthread_create(&threadPool[threadId], NULL, &POOL_thread, ctx)) {
217
ctx->threadCapacity = threadId;
218
return 1;
219
} }
220
} }
221
/* successfully expanded */
222
ctx->threadCapacity = numThreads;
223
ctx->threadLimit = numThreads;
224
return 0;
225
}
226
227
/* @return : 0 on success, 1 on error */
228
int POOL_resize(POOL_ctx* ctx, size_t numThreads)
229
{
230
int result;
231
if (ctx==NULL) return 1;
232
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
233
result = POOL_resize_internal(ctx, numThreads);
234
ZSTD_pthread_cond_broadcast(&ctx->queuePopCond);
235
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
236
return result;
237
}
238
239
/**
240
* Returns 1 if the queue is full and 0 otherwise.
241
*
242
* When queueSize is 1 (pool was created with an intended queueSize of 0),
243
* then a queue is empty if there is a thread free _and_ no job is waiting.
244
*/
245
static int isQueueFull(POOL_ctx const* ctx) {
246
if (ctx->queueSize > 1) {
247
return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);
248
} else {
249
return (ctx->numThreadsBusy == ctx->threadLimit) ||
250
!ctx->queueEmpty;
251
}
252
}
253
254
255
static void POOL_add_internal(POOL_ctx* ctx, POOL_function function, void *opaque)
256
{
257
POOL_job const job = {function, opaque};
258
assert(ctx != NULL);
259
if (ctx->shutdown) return;
260
261
ctx->queueEmpty = 0;
262
ctx->queue[ctx->queueTail] = job;
263
ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
264
ZSTD_pthread_cond_signal(&ctx->queuePopCond);
265
}
266
267
void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque)
268
{
269
assert(ctx != NULL);
270
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
271
/* Wait until there is space in the queue for the new job */
272
while (isQueueFull(ctx) && (!ctx->shutdown)) {
273
ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
274
}
275
POOL_add_internal(ctx, function, opaque);
276
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
277
}
278
279
280
int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque)
281
{
282
assert(ctx != NULL);
283
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
284
if (isQueueFull(ctx)) {
285
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
286
return 0;
287
}
288
POOL_add_internal(ctx, function, opaque);
289
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
290
return 1;
291
}
292
293
294
#else /* ZSTD_MULTITHREAD not defined */
295
296
/* ========================== */
297
/* No multi-threading support */
298
/* ========================== */
299
300
301
/* We don't need any data, but if it is empty, malloc() might return NULL. */
302
struct POOL_ctx_s {
303
int dummy;
304
};
305
static POOL_ctx g_ctx;
306
307
POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
308
return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);
309
}
310
311
POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) {
312
(void)numThreads;
313
(void)queueSize;
314
(void)customMem;
315
return &g_ctx;
316
}
317
318
void POOL_free(POOL_ctx* ctx) {
319
assert(!ctx || ctx == &g_ctx);
320
(void)ctx;
321
}
322
323
int POOL_resize(POOL_ctx* ctx, size_t numThreads) {
324
(void)ctx; (void)numThreads;
325
return 0;
326
}
327
328
void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) {
329
(void)ctx;
330
function(opaque);
331
}
332
333
int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) {
334
(void)ctx;
335
function(opaque);
336
return 1;
337
}
338
339
size_t POOL_sizeof(POOL_ctx* ctx) {
340
if (ctx==NULL) return 0; /* supports sizeof NULL */
341
assert(ctx == &g_ctx);
342
return sizeof(*ctx);
343
}
344
345
#endif /* ZSTD_MULTITHREAD */
346
347