/*1* Copyright 2016 Jakub Klama <[email protected]>2* All rights reserved3*4* Redistribution and use in source and binary forms, with or without5* modification, are permitted providing that the following conditions6* are met:7* 1. Redistributions of source code must retain the above copyright8* notice, this list of conditions and the following disclaimer.9* 2. Redistributions in binary form must reproduce the above copyright10* notice, this list of conditions and the following disclaimer in the11* documentation and/or other materials provided with the distribution.12*13* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR14* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED15* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE16* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY17* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL18* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS19* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)20* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,21* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING22* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE23* POSSIBILITY OF SUCH DAMAGE.24*25*/2627#include <errno.h>28#include <stdlib.h>29#include <pthread.h>30#if defined(__FreeBSD__)31#include <pthread_np.h>32#endif33#include <sys/queue.h>34#include "lib9p.h"35#include "threadpool.h"3637static void l9p_threadpool_rflush(struct l9p_threadpool *tp,38struct l9p_request *req);3940static void *41l9p_responder(void *arg)42{43struct l9p_threadpool *tp;44struct l9p_worker *worker = arg;45struct l9p_request *req;4647tp = worker->ltw_tp;48for (;;) {49/* get next reply to send */50pthread_mutex_lock(&tp->ltp_mtx);51while (STAILQ_EMPTY(&tp->ltp_replyq) && !worker->ltw_exiting)52pthread_cond_wait(&tp->ltp_reply_cv, &tp->ltp_mtx);53if (worker->ltw_exiting) {54pthread_mutex_unlock(&tp->ltp_mtx);55break;56}5758/* off reply queue */59req = STAILQ_FIRST(&tp->ltp_replyq);60STAILQ_REMOVE_HEAD(&tp->ltp_replyq, lr_worklink);6162/* request is now in final glide path, can't be Tflush-ed */63req->lr_workstate = L9P_WS_REPLYING;6465/* any flushers waiting for this request can go now */66if (req->lr_flushstate != L9P_FLUSH_NONE)67l9p_threadpool_rflush(tp, req);6869pthread_mutex_unlock(&tp->ltp_mtx);7071/* send response */72l9p_respond(req, false, true);73}74return (NULL);75}7677static void *78l9p_worker(void *arg)79{80struct l9p_threadpool *tp;81struct l9p_worker *worker = arg;82struct l9p_request *req;8384tp = worker->ltw_tp;85pthread_mutex_lock(&tp->ltp_mtx);86for (;;) {87while (STAILQ_EMPTY(&tp->ltp_workq) && !worker->ltw_exiting)88pthread_cond_wait(&tp->ltp_work_cv, &tp->ltp_mtx);89if (worker->ltw_exiting)90break;9192/* off work queue; now work-in-progress, by us */93req = STAILQ_FIRST(&tp->ltp_workq);94STAILQ_REMOVE_HEAD(&tp->ltp_workq, lr_worklink);95req->lr_workstate = L9P_WS_INPROGRESS;96req->lr_worker = worker;97pthread_mutex_unlock(&tp->ltp_mtx);9899/* actually try the request */100req->lr_error = l9p_dispatch_request(req);101102/* move to responder queue, updating work-state */103pthread_mutex_lock(&tp->ltp_mtx);104req->lr_workstate = L9P_WS_RESPQUEUED;105req->lr_worker = NULL;106STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink);107108/* signal the responder */109pthread_cond_signal(&tp->ltp_reply_cv);110}111pthread_mutex_unlock(&tp->ltp_mtx);112return (NULL);113}114115/*116* Just before finally replying to a request that got touched by117* a Tflush request, we enqueue its flushers (requests of type118* Tflush, which are now on the flushee's lr_flushq) onto the119* response queue.120*/121static void122l9p_threadpool_rflush(struct l9p_threadpool *tp, struct l9p_request *req)123{124struct l9p_request *flusher;125126/*127* https://swtch.com/plan9port/man/man9/flush.html says:128*129* "Should multiple Tflushes be received for a pending130* request, they must be answered in order. A Rflush for131* any of the multiple Tflushes implies an answer for all132* previous ones. Therefore, should a server receive a133* request and then multiple flushes for that request, it134* need respond only to the last flush." This means135* we could march through the queue of flushers here,136* marking all but the last one as "to be dropped" rather137* than "to be replied-to".138*139* However, we'll leave that for later, if ever -- it140* should be harmless to respond to each, in order.141*/142STAILQ_FOREACH(flusher, &req->lr_flushq, lr_flushlink) {143flusher->lr_workstate = L9P_WS_RESPQUEUED;144#ifdef notdef145if (not the last) {146flusher->lr_flushstate = L9P_FLUSH_NOT_RUN;147/* or, flusher->lr_drop = true ? */148}149#endif150STAILQ_INSERT_TAIL(&tp->ltp_replyq, flusher, lr_worklink);151}152}153154int155l9p_threadpool_init(struct l9p_threadpool *tp, int size)156{157struct l9p_worker *worker;158#if defined(__FreeBSD__)159char threadname[16];160#endif161int error;162int i, nworkers, nresponders;163164if (size <= 0)165return (EINVAL);166error = pthread_mutex_init(&tp->ltp_mtx, NULL);167if (error)168return (error);169error = pthread_cond_init(&tp->ltp_work_cv, NULL);170if (error)171goto fail_work_cv;172error = pthread_cond_init(&tp->ltp_reply_cv, NULL);173if (error)174goto fail_reply_cv;175176STAILQ_INIT(&tp->ltp_workq);177STAILQ_INIT(&tp->ltp_replyq);178LIST_INIT(&tp->ltp_workers);179180nresponders = 0;181nworkers = 0;182for (i = 0; i <= size; i++) {183worker = calloc(1, sizeof(struct l9p_worker));184worker->ltw_tp = tp;185worker->ltw_responder = i == 0;186error = pthread_create(&worker->ltw_thread, NULL,187worker->ltw_responder ? l9p_responder : l9p_worker,188(void *)worker);189if (error) {190free(worker);191break;192}193if (worker->ltw_responder)194nresponders++;195else196nworkers++;197198#if defined(__FreeBSD__)199if (worker->ltw_responder) {200pthread_set_name_np(worker->ltw_thread, "9p-responder");201} else {202sprintf(threadname, "9p-worker:%d", i - 1);203pthread_set_name_np(worker->ltw_thread, threadname);204}205#endif206207LIST_INSERT_HEAD(&tp->ltp_workers, worker, ltw_link);208}209if (nresponders == 0 || nworkers == 0) {210/* need the one responder, and at least one worker */211l9p_threadpool_shutdown(tp);212return (error);213}214return (0);215216/*217* We could avoid these labels by having multiple destroy218* paths (one for each error case), or by having booleans219* for which variables were initialized. Neither is very220* appealing...221*/222fail_reply_cv:223pthread_cond_destroy(&tp->ltp_work_cv);224fail_work_cv:225pthread_mutex_destroy(&tp->ltp_mtx);226227return (error);228}229230/*231* Run a request, usually by queueing it.232*/233void234l9p_threadpool_run(struct l9p_threadpool *tp, struct l9p_request *req)235{236237/*238* Flush requests must be handled specially, since they239* can cancel / kill off regular requests. (But we can240* run them through the regular dispatch mechanism.)241*/242if (req->lr_req.hdr.type == L9P_TFLUSH) {243/* not on a work queue yet so we can touch state */244req->lr_workstate = L9P_WS_IMMEDIATE;245(void) l9p_dispatch_request(req);246} else {247pthread_mutex_lock(&tp->ltp_mtx);248req->lr_workstate = L9P_WS_NOTSTARTED;249STAILQ_INSERT_TAIL(&tp->ltp_workq, req, lr_worklink);250pthread_cond_signal(&tp->ltp_work_cv);251pthread_mutex_unlock(&tp->ltp_mtx);252}253}254255/*256* Run a Tflush request. Called via l9p_dispatch_request() since257* it has some debug code in it, but not called from worker thread.258*/259int260l9p_threadpool_tflush(struct l9p_request *req)261{262struct l9p_connection *conn;263struct l9p_threadpool *tp;264struct l9p_request *flushee;265uint16_t oldtag;266enum l9p_flushstate nstate;267268/*269* Find what we're supposed to flush (the flushee, as it were).270*/271req->lr_error = 0; /* Tflush always succeeds */272conn = req->lr_conn;273tp = &conn->lc_tp;274oldtag = req->lr_req.tflush.oldtag;275ht_wrlock(&conn->lc_requests);276flushee = ht_find_locked(&conn->lc_requests, oldtag);277if (flushee == NULL) {278/*279* Nothing to flush! The old request must have280* been done and gone already. Just queue this281* Tflush for a success reply.282*/283ht_unlock(&conn->lc_requests);284pthread_mutex_lock(&tp->ltp_mtx);285goto done;286}287288/*289* Found the original request. We'll need to inspect its290* work-state to figure out what to do.291*/292pthread_mutex_lock(&tp->ltp_mtx);293ht_unlock(&conn->lc_requests);294295switch (flushee->lr_workstate) {296297case L9P_WS_NOTSTARTED:298/*299* Flushee is on work queue, but not yet being300* handled by a worker.301*302* The documentation -- see303* http://ericvh.github.io/9p-rfc/rfc9p2000.html304* https://swtch.com/plan9port/man/man9/flush.html305* -- says that "the server should answer the306* flush message immediately". However, Linux307* sends flush requests for operations that308* must finish, such as Tclunk, and it's not309* possible to *answer* the flush request until310* it has been handled (if necessary) or aborted311* (if allowed).312*313* We therefore now just the original request314* and let the request-handler do whatever is315* appropriate. NOTE: we could have a table of316* "requests that can be aborted without being317* run" vs "requests that must be run to be318* aborted", but for now that seems like an319* unnecessary complication.320*/321nstate = L9P_FLUSH_REQUESTED_PRE_START;322break;323324case L9P_WS_IMMEDIATE:325/*326* This state only applies to Tflush requests, and327* flushing a Tflush is illegal. But we'll do nothing328* special here, which will make us act like a flush329* request for the flushee that arrived too late to330* do anything about the flushee.331*/332nstate = L9P_FLUSH_REQUESTED_POST_START;333break;334335case L9P_WS_INPROGRESS:336/*337* Worker thread flushee->lr_worker is working on it.338* Kick it to get it out of blocking system calls.339* (This requires that it carefully set up some340* signal handlers, and may be FreeBSD-dependent,341* it probably cannot be handled this way on MacOS.)342*/343#ifdef notyet344pthread_kill(...);345#endif346nstate = L9P_FLUSH_REQUESTED_POST_START;347break;348349case L9P_WS_RESPQUEUED:350/*351* The flushee is already in the response queue.352* We'll just mark it as having had some flush353* action applied.354*/355nstate = L9P_FLUSH_TOOLATE;356break;357358case L9P_WS_REPLYING:359/*360* Although we found the flushee, it's too late to361* make us depend on it: it's already heading out362* the door as a reply.363*364* We don't want to do anything to the flushee.365* Instead, we want to work the same way as if366* we had never found the tag.367*/368goto done;369}370371/*372* Now add us to the list of Tflush-es that are waiting373* for the flushee (creating the list if needed, i.e., if374* this is the first Tflush for the flushee). We (req)375* will get queued for reply later, when the responder376* processes the flushee and calls l9p_threadpool_rflush().377*/378if (flushee->lr_flushstate == L9P_FLUSH_NONE)379STAILQ_INIT(&flushee->lr_flushq);380flushee->lr_flushstate = nstate;381STAILQ_INSERT_TAIL(&flushee->lr_flushq, req, lr_flushlink);382383pthread_mutex_unlock(&tp->ltp_mtx);384385return (0);386387done:388/*389* This immediate op is ready to be replied-to now, so just390* stick it onto the reply queue.391*/392req->lr_workstate = L9P_WS_RESPQUEUED;393STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink);394pthread_mutex_unlock(&tp->ltp_mtx);395pthread_cond_signal(&tp->ltp_reply_cv);396return (0);397}398399int400l9p_threadpool_shutdown(struct l9p_threadpool *tp)401{402struct l9p_worker *worker, *tmp;403404LIST_FOREACH_SAFE(worker, &tp->ltp_workers, ltw_link, tmp) {405pthread_mutex_lock(&tp->ltp_mtx);406worker->ltw_exiting = true;407if (worker->ltw_responder)408pthread_cond_signal(&tp->ltp_reply_cv);409else410pthread_cond_broadcast(&tp->ltp_work_cv);411pthread_mutex_unlock(&tp->ltp_mtx);412pthread_join(worker->ltw_thread, NULL);413LIST_REMOVE(worker, ltw_link);414free(worker);415}416pthread_cond_destroy(&tp->ltp_reply_cv);417pthread_cond_destroy(&tp->ltp_work_cv);418pthread_mutex_destroy(&tp->ltp_mtx);419420return (0);421}422423424