Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/contrib/lib9p/threadpool.c
39481 views
1
/*
2
* Copyright 2016 Jakub Klama <[email protected]>
3
* All rights reserved
4
*
5
* Redistribution and use in source and binary forms, with or without
6
* modification, are permitted providing that the following conditions
7
* are met:
8
* 1. Redistributions of source code must retain the above copyright
9
* notice, this list of conditions and the following disclaimer.
10
* 2. Redistributions in binary form must reproduce the above copyright
11
* notice, this list of conditions and the following disclaimer in the
12
* documentation and/or other materials provided with the distribution.
13
*
14
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
15
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
18
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
22
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
23
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
24
* POSSIBILITY OF SUCH DAMAGE.
25
*
26
*/
27
28
#include <errno.h>
29
#include <stdlib.h>
30
#include <pthread.h>
31
#if defined(__FreeBSD__)
32
#include <pthread_np.h>
33
#endif
34
#include <sys/queue.h>
35
#include "lib9p.h"
36
#include "threadpool.h"
37
38
static void l9p_threadpool_rflush(struct l9p_threadpool *tp,
39
struct l9p_request *req);
40
41
static void *
42
l9p_responder(void *arg)
43
{
44
struct l9p_threadpool *tp;
45
struct l9p_worker *worker = arg;
46
struct l9p_request *req;
47
48
tp = worker->ltw_tp;
49
for (;;) {
50
/* get next reply to send */
51
pthread_mutex_lock(&tp->ltp_mtx);
52
while (STAILQ_EMPTY(&tp->ltp_replyq) && !worker->ltw_exiting)
53
pthread_cond_wait(&tp->ltp_reply_cv, &tp->ltp_mtx);
54
if (worker->ltw_exiting) {
55
pthread_mutex_unlock(&tp->ltp_mtx);
56
break;
57
}
58
59
/* off reply queue */
60
req = STAILQ_FIRST(&tp->ltp_replyq);
61
STAILQ_REMOVE_HEAD(&tp->ltp_replyq, lr_worklink);
62
63
/* request is now in final glide path, can't be Tflush-ed */
64
req->lr_workstate = L9P_WS_REPLYING;
65
66
/* any flushers waiting for this request can go now */
67
if (req->lr_flushstate != L9P_FLUSH_NONE)
68
l9p_threadpool_rflush(tp, req);
69
70
pthread_mutex_unlock(&tp->ltp_mtx);
71
72
/* send response */
73
l9p_respond(req, false, true);
74
}
75
return (NULL);
76
}
77
78
static void *
79
l9p_worker(void *arg)
80
{
81
struct l9p_threadpool *tp;
82
struct l9p_worker *worker = arg;
83
struct l9p_request *req;
84
85
tp = worker->ltw_tp;
86
pthread_mutex_lock(&tp->ltp_mtx);
87
for (;;) {
88
while (STAILQ_EMPTY(&tp->ltp_workq) && !worker->ltw_exiting)
89
pthread_cond_wait(&tp->ltp_work_cv, &tp->ltp_mtx);
90
if (worker->ltw_exiting)
91
break;
92
93
/* off work queue; now work-in-progress, by us */
94
req = STAILQ_FIRST(&tp->ltp_workq);
95
STAILQ_REMOVE_HEAD(&tp->ltp_workq, lr_worklink);
96
req->lr_workstate = L9P_WS_INPROGRESS;
97
req->lr_worker = worker;
98
pthread_mutex_unlock(&tp->ltp_mtx);
99
100
/* actually try the request */
101
req->lr_error = l9p_dispatch_request(req);
102
103
/* move to responder queue, updating work-state */
104
pthread_mutex_lock(&tp->ltp_mtx);
105
req->lr_workstate = L9P_WS_RESPQUEUED;
106
req->lr_worker = NULL;
107
STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink);
108
109
/* signal the responder */
110
pthread_cond_signal(&tp->ltp_reply_cv);
111
}
112
pthread_mutex_unlock(&tp->ltp_mtx);
113
return (NULL);
114
}
115
116
/*
117
* Just before finally replying to a request that got touched by
118
* a Tflush request, we enqueue its flushers (requests of type
119
* Tflush, which are now on the flushee's lr_flushq) onto the
120
* response queue.
121
*/
122
static void
123
l9p_threadpool_rflush(struct l9p_threadpool *tp, struct l9p_request *req)
124
{
125
struct l9p_request *flusher;
126
127
/*
128
* https://swtch.com/plan9port/man/man9/flush.html says:
129
*
130
* "Should multiple Tflushes be received for a pending
131
* request, they must be answered in order. A Rflush for
132
* any of the multiple Tflushes implies an answer for all
133
* previous ones. Therefore, should a server receive a
134
* request and then multiple flushes for that request, it
135
* need respond only to the last flush." This means
136
* we could march through the queue of flushers here,
137
* marking all but the last one as "to be dropped" rather
138
* than "to be replied-to".
139
*
140
* However, we'll leave that for later, if ever -- it
141
* should be harmless to respond to each, in order.
142
*/
143
STAILQ_FOREACH(flusher, &req->lr_flushq, lr_flushlink) {
144
flusher->lr_workstate = L9P_WS_RESPQUEUED;
145
#ifdef notdef
146
if (not the last) {
147
flusher->lr_flushstate = L9P_FLUSH_NOT_RUN;
148
/* or, flusher->lr_drop = true ? */
149
}
150
#endif
151
STAILQ_INSERT_TAIL(&tp->ltp_replyq, flusher, lr_worklink);
152
}
153
}
154
155
int
156
l9p_threadpool_init(struct l9p_threadpool *tp, int size)
157
{
158
struct l9p_worker *worker;
159
#if defined(__FreeBSD__)
160
char threadname[16];
161
#endif
162
int error;
163
int i, nworkers, nresponders;
164
165
if (size <= 0)
166
return (EINVAL);
167
error = pthread_mutex_init(&tp->ltp_mtx, NULL);
168
if (error)
169
return (error);
170
error = pthread_cond_init(&tp->ltp_work_cv, NULL);
171
if (error)
172
goto fail_work_cv;
173
error = pthread_cond_init(&tp->ltp_reply_cv, NULL);
174
if (error)
175
goto fail_reply_cv;
176
177
STAILQ_INIT(&tp->ltp_workq);
178
STAILQ_INIT(&tp->ltp_replyq);
179
LIST_INIT(&tp->ltp_workers);
180
181
nresponders = 0;
182
nworkers = 0;
183
for (i = 0; i <= size; i++) {
184
worker = calloc(1, sizeof(struct l9p_worker));
185
worker->ltw_tp = tp;
186
worker->ltw_responder = i == 0;
187
error = pthread_create(&worker->ltw_thread, NULL,
188
worker->ltw_responder ? l9p_responder : l9p_worker,
189
(void *)worker);
190
if (error) {
191
free(worker);
192
break;
193
}
194
if (worker->ltw_responder)
195
nresponders++;
196
else
197
nworkers++;
198
199
#if defined(__FreeBSD__)
200
if (worker->ltw_responder) {
201
pthread_set_name_np(worker->ltw_thread, "9p-responder");
202
} else {
203
sprintf(threadname, "9p-worker:%d", i - 1);
204
pthread_set_name_np(worker->ltw_thread, threadname);
205
}
206
#endif
207
208
LIST_INSERT_HEAD(&tp->ltp_workers, worker, ltw_link);
209
}
210
if (nresponders == 0 || nworkers == 0) {
211
/* need the one responder, and at least one worker */
212
l9p_threadpool_shutdown(tp);
213
return (error);
214
}
215
return (0);
216
217
/*
218
* We could avoid these labels by having multiple destroy
219
* paths (one for each error case), or by having booleans
220
* for which variables were initialized. Neither is very
221
* appealing...
222
*/
223
fail_reply_cv:
224
pthread_cond_destroy(&tp->ltp_work_cv);
225
fail_work_cv:
226
pthread_mutex_destroy(&tp->ltp_mtx);
227
228
return (error);
229
}
230
231
/*
232
* Run a request, usually by queueing it.
233
*/
234
void
235
l9p_threadpool_run(struct l9p_threadpool *tp, struct l9p_request *req)
236
{
237
238
/*
239
* Flush requests must be handled specially, since they
240
* can cancel / kill off regular requests. (But we can
241
* run them through the regular dispatch mechanism.)
242
*/
243
if (req->lr_req.hdr.type == L9P_TFLUSH) {
244
/* not on a work queue yet so we can touch state */
245
req->lr_workstate = L9P_WS_IMMEDIATE;
246
(void) l9p_dispatch_request(req);
247
} else {
248
pthread_mutex_lock(&tp->ltp_mtx);
249
req->lr_workstate = L9P_WS_NOTSTARTED;
250
STAILQ_INSERT_TAIL(&tp->ltp_workq, req, lr_worklink);
251
pthread_cond_signal(&tp->ltp_work_cv);
252
pthread_mutex_unlock(&tp->ltp_mtx);
253
}
254
}
255
256
/*
257
* Run a Tflush request. Called via l9p_dispatch_request() since
258
* it has some debug code in it, but not called from worker thread.
259
*/
260
int
261
l9p_threadpool_tflush(struct l9p_request *req)
262
{
263
struct l9p_connection *conn;
264
struct l9p_threadpool *tp;
265
struct l9p_request *flushee;
266
uint16_t oldtag;
267
enum l9p_flushstate nstate;
268
269
/*
270
* Find what we're supposed to flush (the flushee, as it were).
271
*/
272
req->lr_error = 0; /* Tflush always succeeds */
273
conn = req->lr_conn;
274
tp = &conn->lc_tp;
275
oldtag = req->lr_req.tflush.oldtag;
276
ht_wrlock(&conn->lc_requests);
277
flushee = ht_find_locked(&conn->lc_requests, oldtag);
278
if (flushee == NULL) {
279
/*
280
* Nothing to flush! The old request must have
281
* been done and gone already. Just queue this
282
* Tflush for a success reply.
283
*/
284
ht_unlock(&conn->lc_requests);
285
pthread_mutex_lock(&tp->ltp_mtx);
286
goto done;
287
}
288
289
/*
290
* Found the original request. We'll need to inspect its
291
* work-state to figure out what to do.
292
*/
293
pthread_mutex_lock(&tp->ltp_mtx);
294
ht_unlock(&conn->lc_requests);
295
296
switch (flushee->lr_workstate) {
297
298
case L9P_WS_NOTSTARTED:
299
/*
300
* Flushee is on work queue, but not yet being
301
* handled by a worker.
302
*
303
* The documentation -- see
304
* http://ericvh.github.io/9p-rfc/rfc9p2000.html
305
* https://swtch.com/plan9port/man/man9/flush.html
306
* -- says that "the server should answer the
307
* flush message immediately". However, Linux
308
* sends flush requests for operations that
309
* must finish, such as Tclunk, and it's not
310
* possible to *answer* the flush request until
311
* it has been handled (if necessary) or aborted
312
* (if allowed).
313
*
314
* We therefore now just the original request
315
* and let the request-handler do whatever is
316
* appropriate. NOTE: we could have a table of
317
* "requests that can be aborted without being
318
* run" vs "requests that must be run to be
319
* aborted", but for now that seems like an
320
* unnecessary complication.
321
*/
322
nstate = L9P_FLUSH_REQUESTED_PRE_START;
323
break;
324
325
case L9P_WS_IMMEDIATE:
326
/*
327
* This state only applies to Tflush requests, and
328
* flushing a Tflush is illegal. But we'll do nothing
329
* special here, which will make us act like a flush
330
* request for the flushee that arrived too late to
331
* do anything about the flushee.
332
*/
333
nstate = L9P_FLUSH_REQUESTED_POST_START;
334
break;
335
336
case L9P_WS_INPROGRESS:
337
/*
338
* Worker thread flushee->lr_worker is working on it.
339
* Kick it to get it out of blocking system calls.
340
* (This requires that it carefully set up some
341
* signal handlers, and may be FreeBSD-dependent,
342
* it probably cannot be handled this way on MacOS.)
343
*/
344
#ifdef notyet
345
pthread_kill(...);
346
#endif
347
nstate = L9P_FLUSH_REQUESTED_POST_START;
348
break;
349
350
case L9P_WS_RESPQUEUED:
351
/*
352
* The flushee is already in the response queue.
353
* We'll just mark it as having had some flush
354
* action applied.
355
*/
356
nstate = L9P_FLUSH_TOOLATE;
357
break;
358
359
case L9P_WS_REPLYING:
360
/*
361
* Although we found the flushee, it's too late to
362
* make us depend on it: it's already heading out
363
* the door as a reply.
364
*
365
* We don't want to do anything to the flushee.
366
* Instead, we want to work the same way as if
367
* we had never found the tag.
368
*/
369
goto done;
370
}
371
372
/*
373
* Now add us to the list of Tflush-es that are waiting
374
* for the flushee (creating the list if needed, i.e., if
375
* this is the first Tflush for the flushee). We (req)
376
* will get queued for reply later, when the responder
377
* processes the flushee and calls l9p_threadpool_rflush().
378
*/
379
if (flushee->lr_flushstate == L9P_FLUSH_NONE)
380
STAILQ_INIT(&flushee->lr_flushq);
381
flushee->lr_flushstate = nstate;
382
STAILQ_INSERT_TAIL(&flushee->lr_flushq, req, lr_flushlink);
383
384
pthread_mutex_unlock(&tp->ltp_mtx);
385
386
return (0);
387
388
done:
389
/*
390
* This immediate op is ready to be replied-to now, so just
391
* stick it onto the reply queue.
392
*/
393
req->lr_workstate = L9P_WS_RESPQUEUED;
394
STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink);
395
pthread_mutex_unlock(&tp->ltp_mtx);
396
pthread_cond_signal(&tp->ltp_reply_cv);
397
return (0);
398
}
399
400
int
401
l9p_threadpool_shutdown(struct l9p_threadpool *tp)
402
{
403
struct l9p_worker *worker, *tmp;
404
405
LIST_FOREACH_SAFE(worker, &tp->ltp_workers, ltw_link, tmp) {
406
pthread_mutex_lock(&tp->ltp_mtx);
407
worker->ltw_exiting = true;
408
if (worker->ltw_responder)
409
pthread_cond_signal(&tp->ltp_reply_cv);
410
else
411
pthread_cond_broadcast(&tp->ltp_work_cv);
412
pthread_mutex_unlock(&tp->ltp_mtx);
413
pthread_join(worker->ltw_thread, NULL);
414
LIST_REMOVE(worker, ltw_link);
415
free(worker);
416
}
417
pthread_cond_destroy(&tp->ltp_reply_cv);
418
pthread_cond_destroy(&tp->ltp_work_cv);
419
pthread_mutex_destroy(&tp->ltp_mtx);
420
421
return (0);
422
}
423
424