Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/sys/contrib/openzfs/lib/libtpool/thread_pool.c
48375 views
1
// SPDX-License-Identifier: CDDL-1.0
2
/*
3
* CDDL HEADER START
4
*
5
* The contents of this file are subject to the terms of the
6
* Common Development and Distribution License (the "License").
7
* You may not use this file except in compliance with the License.
8
*
9
* You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10
* or https://opensource.org/licenses/CDDL-1.0.
11
* See the License for the specific language governing permissions
12
* and limitations under the License.
13
*
14
* When distributing Covered Code, include this CDDL HEADER in each
15
* file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16
* If applicable, add the following below this CDDL HEADER, with the
17
* fields enclosed by brackets "[]" replaced with your own identifying
18
* information: Portions Copyright [yyyy] [name of copyright owner]
19
*
20
* CDDL HEADER END
21
*/
22
23
/*
24
* Copyright 2008 Sun Microsystems, Inc. All rights reserved.
25
* Use is subject to license terms.
26
*/
27
28
#include <stdlib.h>
29
#include <signal.h>
30
#include <errno.h>
31
#include <assert.h>
32
#include <limits.h>
33
#include "thread_pool_impl.h"
34
35
static pthread_mutex_t thread_pool_lock = PTHREAD_MUTEX_INITIALIZER;
36
static tpool_t *thread_pools = NULL;
37
38
static void
39
delete_pool(tpool_t *tpool)
40
{
41
tpool_job_t *job;
42
43
ASSERT(tpool->tp_current == 0 && tpool->tp_active == NULL);
44
45
/*
46
* Unlink the pool from the global list of all pools.
47
*/
48
(void) pthread_mutex_lock(&thread_pool_lock);
49
if (thread_pools == tpool)
50
thread_pools = tpool->tp_forw;
51
if (thread_pools == tpool)
52
thread_pools = NULL;
53
else {
54
tpool->tp_back->tp_forw = tpool->tp_forw;
55
tpool->tp_forw->tp_back = tpool->tp_back;
56
}
57
pthread_mutex_unlock(&thread_pool_lock);
58
59
/*
60
* There should be no pending jobs, but just in case...
61
*/
62
for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) {
63
tpool->tp_head = job->tpj_next;
64
free(job);
65
}
66
(void) pthread_attr_destroy(&tpool->tp_attr);
67
free(tpool);
68
}
69
70
/*
71
* Worker thread is terminating.
72
*/
73
static void
74
worker_cleanup(void *arg)
75
{
76
tpool_t *tpool = (tpool_t *)arg;
77
78
if (--tpool->tp_current == 0 &&
79
(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
80
if (tpool->tp_flags & TP_ABANDON) {
81
pthread_mutex_unlock(&tpool->tp_mutex);
82
delete_pool(tpool);
83
return;
84
}
85
if (tpool->tp_flags & TP_DESTROY)
86
(void) pthread_cond_broadcast(&tpool->tp_busycv);
87
}
88
pthread_mutex_unlock(&tpool->tp_mutex);
89
}
90
91
static void
92
notify_waiters(tpool_t *tpool)
93
{
94
if (tpool->tp_head == NULL && tpool->tp_active == NULL) {
95
tpool->tp_flags &= ~TP_WAIT;
96
(void) pthread_cond_broadcast(&tpool->tp_waitcv);
97
}
98
}
99
100
/*
101
* Called by a worker thread on return from a tpool_dispatch()d job.
102
*/
103
static void
104
job_cleanup(void *arg)
105
{
106
tpool_t *tpool = (tpool_t *)arg;
107
108
pthread_t my_tid = pthread_self();
109
tpool_active_t *activep;
110
tpool_active_t **activepp;
111
112
pthread_mutex_lock(&tpool->tp_mutex);
113
for (activepp = &tpool->tp_active; ; activepp = &activep->tpa_next) {
114
activep = *activepp;
115
if (activep->tpa_tid == my_tid) {
116
*activepp = activep->tpa_next;
117
break;
118
}
119
}
120
if (tpool->tp_flags & TP_WAIT)
121
notify_waiters(tpool);
122
}
123
124
static void *
125
tpool_worker(void *arg)
126
{
127
tpool_t *tpool = (tpool_t *)arg;
128
int elapsed;
129
tpool_job_t *job;
130
void (*func)(void *);
131
tpool_active_t active;
132
133
pthread_mutex_lock(&tpool->tp_mutex);
134
pthread_cleanup_push(worker_cleanup, tpool);
135
136
/*
137
* This is the worker's main loop.
138
* It will only be left if a timeout or an error has occurred.
139
*/
140
active.tpa_tid = pthread_self();
141
for (;;) {
142
elapsed = 0;
143
tpool->tp_idle++;
144
if (tpool->tp_flags & TP_WAIT)
145
notify_waiters(tpool);
146
while ((tpool->tp_head == NULL ||
147
(tpool->tp_flags & TP_SUSPEND)) &&
148
!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
149
if (tpool->tp_current <= tpool->tp_minimum ||
150
tpool->tp_linger == 0) {
151
(void) pthread_cond_wait(&tpool->tp_workcv,
152
&tpool->tp_mutex);
153
} else {
154
struct timespec ts;
155
156
clock_gettime(CLOCK_REALTIME, &ts);
157
ts.tv_sec += tpool->tp_linger;
158
159
if (pthread_cond_timedwait(&tpool->tp_workcv,
160
&tpool->tp_mutex, &ts) != 0) {
161
elapsed = 1;
162
break;
163
}
164
}
165
}
166
tpool->tp_idle--;
167
if (tpool->tp_flags & TP_DESTROY)
168
break;
169
if (tpool->tp_flags & TP_ABANDON) {
170
/* can't abandon a suspended pool */
171
if (tpool->tp_flags & TP_SUSPEND) {
172
tpool->tp_flags &= ~TP_SUSPEND;
173
(void) pthread_cond_broadcast(
174
&tpool->tp_workcv);
175
}
176
if (tpool->tp_head == NULL)
177
break;
178
}
179
if ((job = tpool->tp_head) != NULL &&
180
!(tpool->tp_flags & TP_SUSPEND)) {
181
elapsed = 0;
182
func = job->tpj_func;
183
arg = job->tpj_arg;
184
tpool->tp_head = job->tpj_next;
185
if (job == tpool->tp_tail)
186
tpool->tp_tail = NULL;
187
tpool->tp_njobs--;
188
active.tpa_next = tpool->tp_active;
189
tpool->tp_active = &active;
190
pthread_mutex_unlock(&tpool->tp_mutex);
191
pthread_cleanup_push(job_cleanup, tpool);
192
free(job);
193
194
sigset_t maskset;
195
(void) pthread_sigmask(SIG_SETMASK, NULL, &maskset);
196
197
/*
198
* Call the specified function.
199
*/
200
func(arg);
201
/*
202
* We don't know what this thread has been doing,
203
* so we reset its signal mask and cancellation
204
* state back to the values prior to calling func().
205
*/
206
(void) pthread_sigmask(SIG_SETMASK, &maskset, NULL);
207
(void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,
208
NULL);
209
(void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,
210
NULL);
211
pthread_cleanup_pop(1);
212
}
213
if (elapsed && tpool->tp_current > tpool->tp_minimum) {
214
/*
215
* We timed out and there is no work to be done
216
* and the number of workers exceeds the minimum.
217
* Exit now to reduce the size of the pool.
218
*/
219
break;
220
}
221
}
222
pthread_cleanup_pop(1);
223
return (arg);
224
}
225
226
/*
227
* Create a worker thread, with default signals blocked.
228
*/
229
static int
230
create_worker(tpool_t *tpool)
231
{
232
pthread_t thread;
233
sigset_t oset;
234
int error;
235
236
(void) pthread_sigmask(SIG_SETMASK, NULL, &oset);
237
error = pthread_create(&thread, &tpool->tp_attr, tpool_worker, tpool);
238
(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
239
return (error);
240
}
241
242
243
/*
244
* pthread_attr_clone: make a copy of a pthread_attr_t. When old_attr
245
* is NULL initialize the cloned attr using default values.
246
*/
247
static int
248
pthread_attr_clone(pthread_attr_t *attr, const pthread_attr_t *old_attr)
249
{
250
int error;
251
252
error = pthread_attr_init(attr);
253
if (error || (old_attr == NULL))
254
return (error);
255
256
#ifdef __GLIBC__
257
cpu_set_t cpuset;
258
size_t cpusetsize = sizeof (cpuset);
259
error = pthread_attr_getaffinity_np(old_attr, cpusetsize, &cpuset);
260
if (error == 0)
261
error = pthread_attr_setaffinity_np(attr, cpusetsize, &cpuset);
262
if (error)
263
goto error;
264
#endif /* __GLIBC__ */
265
266
int detachstate;
267
error = pthread_attr_getdetachstate(old_attr, &detachstate);
268
if (error == 0)
269
error = pthread_attr_setdetachstate(attr, detachstate);
270
if (error)
271
goto error;
272
273
size_t guardsize;
274
error = pthread_attr_getguardsize(old_attr, &guardsize);
275
if (error == 0)
276
error = pthread_attr_setguardsize(attr, guardsize);
277
if (error)
278
goto error;
279
280
int inheritsched;
281
error = pthread_attr_getinheritsched(old_attr, &inheritsched);
282
if (error == 0)
283
error = pthread_attr_setinheritsched(attr, inheritsched);
284
if (error)
285
goto error;
286
287
struct sched_param param;
288
error = pthread_attr_getschedparam(old_attr, &param);
289
if (error == 0)
290
error = pthread_attr_setschedparam(attr, &param);
291
if (error)
292
goto error;
293
294
int policy;
295
error = pthread_attr_getschedpolicy(old_attr, &policy);
296
if (error == 0)
297
error = pthread_attr_setschedpolicy(attr, policy);
298
if (error)
299
goto error;
300
301
int scope;
302
error = pthread_attr_getscope(old_attr, &scope);
303
if (error == 0)
304
error = pthread_attr_setscope(attr, scope);
305
if (error)
306
goto error;
307
308
void *stackaddr;
309
size_t stacksize;
310
error = pthread_attr_getstack(old_attr, &stackaddr, &stacksize);
311
if (error == 0)
312
error = pthread_attr_setstack(attr, stackaddr, stacksize);
313
if (error)
314
goto error;
315
316
return (0);
317
error:
318
pthread_attr_destroy(attr);
319
return (error);
320
}
321
322
tpool_t *
323
tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger,
324
pthread_attr_t *attr)
325
{
326
tpool_t *tpool;
327
void *stackaddr;
328
size_t stacksize;
329
size_t minstack;
330
int error;
331
332
if (min_threads > max_threads || max_threads < 1) {
333
errno = EINVAL;
334
return (NULL);
335
}
336
if (attr != NULL) {
337
if (pthread_attr_getstack(attr, &stackaddr, &stacksize) != 0) {
338
errno = EINVAL;
339
return (NULL);
340
}
341
/*
342
* Allow only one thread in the pool with a specified stack.
343
* Require threads to have at least the minimum stack size.
344
*/
345
minstack = PTHREAD_STACK_MIN;
346
if (stackaddr != NULL) {
347
if (stacksize < minstack || max_threads != 1) {
348
errno = EINVAL;
349
return (NULL);
350
}
351
} else if (stacksize != 0 && stacksize < minstack) {
352
errno = EINVAL;
353
return (NULL);
354
}
355
}
356
357
tpool = calloc(1, sizeof (*tpool));
358
if (tpool == NULL) {
359
errno = ENOMEM;
360
return (NULL);
361
}
362
(void) pthread_mutex_init(&tpool->tp_mutex, NULL);
363
(void) pthread_cond_init(&tpool->tp_busycv, NULL);
364
(void) pthread_cond_init(&tpool->tp_workcv, NULL);
365
(void) pthread_cond_init(&tpool->tp_waitcv, NULL);
366
tpool->tp_minimum = min_threads;
367
tpool->tp_maximum = max_threads;
368
tpool->tp_linger = linger;
369
370
/*
371
* We cannot just copy the attribute pointer.
372
* We need to initialize a new pthread_attr_t structure
373
* with the values from the user-supplied pthread_attr_t.
374
* If the attribute pointer is NULL, we need to initialize
375
* the new pthread_attr_t structure with default values.
376
*/
377
error = pthread_attr_clone(&tpool->tp_attr, attr);
378
if (error) {
379
free(tpool);
380
errno = error;
381
return (NULL);
382
}
383
384
/* make all pool threads be detached daemon threads */
385
(void) pthread_attr_setdetachstate(&tpool->tp_attr,
386
PTHREAD_CREATE_DETACHED);
387
388
/* insert into the global list of all thread pools */
389
pthread_mutex_lock(&thread_pool_lock);
390
if (thread_pools == NULL) {
391
tpool->tp_forw = tpool;
392
tpool->tp_back = tpool;
393
thread_pools = tpool;
394
} else {
395
thread_pools->tp_back->tp_forw = tpool;
396
tpool->tp_forw = thread_pools;
397
tpool->tp_back = thread_pools->tp_back;
398
thread_pools->tp_back = tpool;
399
}
400
pthread_mutex_unlock(&thread_pool_lock);
401
402
return (tpool);
403
}
404
405
/*
406
* Dispatch a work request to the thread pool.
407
* If there are idle workers, awaken one.
408
* Else, if the maximum number of workers has
409
* not been reached, spawn a new worker thread.
410
* Else just return with the job added to the queue.
411
*/
412
int
413
tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg)
414
{
415
tpool_job_t *job;
416
417
ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
418
419
if ((job = calloc(1, sizeof (*job))) == NULL)
420
return (-1);
421
job->tpj_next = NULL;
422
job->tpj_func = func;
423
job->tpj_arg = arg;
424
425
pthread_mutex_lock(&tpool->tp_mutex);
426
427
if (!(tpool->tp_flags & TP_SUSPEND)) {
428
if (tpool->tp_idle > 0)
429
(void) pthread_cond_signal(&tpool->tp_workcv);
430
else if (tpool->tp_current >= tpool->tp_maximum) {
431
/* At worker limit. Leave task on queue */
432
} else {
433
if (create_worker(tpool) == 0) {
434
/* Started a new worker thread */
435
tpool->tp_current++;
436
} else if (tpool->tp_current > 0) {
437
/* Leave task on queue */
438
} else {
439
/* Cannot start a single worker! */
440
pthread_mutex_unlock(&tpool->tp_mutex);
441
free(job);
442
return (-1);
443
}
444
}
445
}
446
447
if (tpool->tp_head == NULL)
448
tpool->tp_head = job;
449
else
450
tpool->tp_tail->tpj_next = job;
451
tpool->tp_tail = job;
452
tpool->tp_njobs++;
453
454
pthread_mutex_unlock(&tpool->tp_mutex);
455
return (0);
456
}
457
458
static void
459
tpool_cleanup(void *arg)
460
{
461
tpool_t *tpool = (tpool_t *)arg;
462
463
pthread_mutex_unlock(&tpool->tp_mutex);
464
}
465
466
/*
467
* Assumes: by the time tpool_destroy() is called no one will use this
468
* thread pool in any way and no one will try to dispatch entries to it.
469
* Calling tpool_destroy() from a job in the pool will cause deadlock.
470
*/
471
void
472
tpool_destroy(tpool_t *tpool)
473
{
474
tpool_active_t *activep;
475
476
ASSERT(!tpool_member(tpool));
477
ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
478
479
pthread_mutex_lock(&tpool->tp_mutex);
480
pthread_cleanup_push(tpool_cleanup, tpool);
481
482
/* mark the pool as being destroyed; wakeup idle workers */
483
tpool->tp_flags |= TP_DESTROY;
484
tpool->tp_flags &= ~TP_SUSPEND;
485
(void) pthread_cond_broadcast(&tpool->tp_workcv);
486
487
/* cancel all active workers */
488
for (activep = tpool->tp_active; activep; activep = activep->tpa_next)
489
(void) pthread_cancel(activep->tpa_tid);
490
491
/* wait for all active workers to finish */
492
while (tpool->tp_active != NULL) {
493
tpool->tp_flags |= TP_WAIT;
494
(void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
495
}
496
497
/* the last worker to terminate will wake us up */
498
while (tpool->tp_current != 0)
499
(void) pthread_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex);
500
501
pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */
502
delete_pool(tpool);
503
}
504
505
/*
506
* Like tpool_destroy(), but don't cancel workers or wait for them to finish.
507
* The last worker to terminate will delete the pool.
508
*/
509
void
510
tpool_abandon(tpool_t *tpool)
511
{
512
ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
513
514
pthread_mutex_lock(&tpool->tp_mutex);
515
if (tpool->tp_current == 0) {
516
/* no workers, just delete the pool */
517
pthread_mutex_unlock(&tpool->tp_mutex);
518
delete_pool(tpool);
519
} else {
520
/* wake up all workers, last one will delete the pool */
521
tpool->tp_flags |= TP_ABANDON;
522
tpool->tp_flags &= ~TP_SUSPEND;
523
(void) pthread_cond_broadcast(&tpool->tp_workcv);
524
pthread_mutex_unlock(&tpool->tp_mutex);
525
}
526
}
527
528
/*
529
* Wait for all jobs to complete.
530
* Calling tpool_wait() from a job in the pool will cause deadlock.
531
*/
532
void
533
tpool_wait(tpool_t *tpool)
534
{
535
ASSERT(!tpool_member(tpool));
536
ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
537
538
pthread_mutex_lock(&tpool->tp_mutex);
539
pthread_cleanup_push(tpool_cleanup, tpool);
540
while (tpool->tp_head != NULL || tpool->tp_active != NULL) {
541
tpool->tp_flags |= TP_WAIT;
542
(void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
543
ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
544
}
545
pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */
546
}
547
548
void
549
tpool_suspend(tpool_t *tpool)
550
{
551
ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
552
553
pthread_mutex_lock(&tpool->tp_mutex);
554
tpool->tp_flags |= TP_SUSPEND;
555
pthread_mutex_unlock(&tpool->tp_mutex);
556
}
557
558
int
559
tpool_suspended(tpool_t *tpool)
560
{
561
int suspended;
562
563
ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
564
565
pthread_mutex_lock(&tpool->tp_mutex);
566
suspended = (tpool->tp_flags & TP_SUSPEND) != 0;
567
pthread_mutex_unlock(&tpool->tp_mutex);
568
569
return (suspended);
570
}
571
572
void
573
tpool_resume(tpool_t *tpool)
574
{
575
int excess;
576
577
ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
578
579
pthread_mutex_lock(&tpool->tp_mutex);
580
if (!(tpool->tp_flags & TP_SUSPEND)) {
581
pthread_mutex_unlock(&tpool->tp_mutex);
582
return;
583
}
584
tpool->tp_flags &= ~TP_SUSPEND;
585
(void) pthread_cond_broadcast(&tpool->tp_workcv);
586
excess = tpool->tp_njobs - tpool->tp_idle;
587
while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) {
588
if (create_worker(tpool) != 0)
589
break; /* pthread_create() failed */
590
tpool->tp_current++;
591
}
592
pthread_mutex_unlock(&tpool->tp_mutex);
593
}
594
595
int
596
tpool_member(tpool_t *tpool)
597
{
598
pthread_t my_tid = pthread_self();
599
tpool_active_t *activep;
600
601
ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
602
603
pthread_mutex_lock(&tpool->tp_mutex);
604
for (activep = tpool->tp_active; activep; activep = activep->tpa_next) {
605
if (activep->tpa_tid == my_tid) {
606
pthread_mutex_unlock(&tpool->tp_mutex);
607
return (1);
608
}
609
}
610
pthread_mutex_unlock(&tpool->tp_mutex);
611
return (0);
612
}
613
614