Changeset View
Changeset View
Standalone View
Standalone View
contrib/lib9p/threadpool.c
- This file was added.
/* | |||||
* Copyright 2016 Jakub Klama <jceel@FreeBSD.org> | |||||
* All rights reserved | |||||
* | |||||
* Redistribution and use in source and binary forms, with or without | |||||
* modification, are permitted providing that the following conditions | |||||
* are met: | |||||
* 1. Redistributions of source code must retain the above copyright | |||||
* notice, this list of conditions and the following disclaimer. | |||||
* 2. Redistributions in binary form must reproduce the above copyright | |||||
* notice, this list of conditions and the following disclaimer in the | |||||
* documentation and/or other materials provided with the distribution. | |||||
* | |||||
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR | |||||
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED | |||||
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | |||||
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY | |||||
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL | |||||
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS | |||||
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) | |||||
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, | |||||
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING | |||||
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE | |||||
* POSSIBILITY OF SUCH DAMAGE. | |||||
* | |||||
*/ | |||||
#include <errno.h> | |||||
#include <stdlib.h> | |||||
#include <pthread.h> | |||||
#if defined(__FreeBSD__) | |||||
#include <pthread_np.h> | |||||
#endif | |||||
#include <sys/queue.h> | |||||
#include "lib9p.h" | |||||
#include "threadpool.h" | |||||
static void l9p_threadpool_rflush(struct l9p_threadpool *tp, | |||||
struct l9p_request *req); | |||||
static void * | |||||
l9p_responder(void *arg) | |||||
{ | |||||
struct l9p_threadpool *tp; | |||||
struct l9p_worker *worker = arg; | |||||
struct l9p_request *req; | |||||
tp = worker->ltw_tp; | |||||
for (;;) { | |||||
/* get next reply to send */ | |||||
pthread_mutex_lock(&tp->ltp_mtx); | |||||
while (STAILQ_EMPTY(&tp->ltp_replyq) && !worker->ltw_exiting) | |||||
pthread_cond_wait(&tp->ltp_reply_cv, &tp->ltp_mtx); | |||||
if (worker->ltw_exiting) { | |||||
pthread_mutex_unlock(&tp->ltp_mtx); | |||||
break; | |||||
} | |||||
/* off reply queue */ | |||||
req = STAILQ_FIRST(&tp->ltp_replyq); | |||||
STAILQ_REMOVE_HEAD(&tp->ltp_replyq, lr_worklink); | |||||
/* request is now in final glide path, can't be Tflush-ed */ | |||||
req->lr_workstate = L9P_WS_REPLYING; | |||||
/* any flushers waiting for this request can go now */ | |||||
if (req->lr_flushstate != L9P_FLUSH_NONE) | |||||
l9p_threadpool_rflush(tp, req); | |||||
pthread_mutex_unlock(&tp->ltp_mtx); | |||||
/* send response */ | |||||
l9p_respond(req, false, true); | |||||
} | |||||
return (NULL); | |||||
} | |||||
static void * | |||||
l9p_worker(void *arg) | |||||
{ | |||||
struct l9p_threadpool *tp; | |||||
struct l9p_worker *worker = arg; | |||||
struct l9p_request *req; | |||||
tp = worker->ltw_tp; | |||||
pthread_mutex_lock(&tp->ltp_mtx); | |||||
for (;;) { | |||||
while (STAILQ_EMPTY(&tp->ltp_workq) && !worker->ltw_exiting) | |||||
pthread_cond_wait(&tp->ltp_work_cv, &tp->ltp_mtx); | |||||
if (worker->ltw_exiting) | |||||
break; | |||||
/* off work queue; now work-in-progress, by us */ | |||||
req = STAILQ_FIRST(&tp->ltp_workq); | |||||
STAILQ_REMOVE_HEAD(&tp->ltp_workq, lr_worklink); | |||||
req->lr_workstate = L9P_WS_INPROGRESS; | |||||
req->lr_worker = worker; | |||||
pthread_mutex_unlock(&tp->ltp_mtx); | |||||
/* actually try the request */ | |||||
req->lr_error = l9p_dispatch_request(req); | |||||
/* move to responder queue, updating work-state */ | |||||
pthread_mutex_lock(&tp->ltp_mtx); | |||||
req->lr_workstate = L9P_WS_RESPQUEUED; | |||||
req->lr_worker = NULL; | |||||
STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink); | |||||
/* signal the responder */ | |||||
pthread_cond_signal(&tp->ltp_reply_cv); | |||||
} | |||||
pthread_mutex_unlock(&tp->ltp_mtx); | |||||
return (NULL); | |||||
} | |||||
/* | |||||
* Just before finally replying to a request that got touched by | |||||
* a Tflush request, we enqueue its flushers (requests of type | |||||
* Tflush, which are now on the flushee's lr_flushq) onto the | |||||
* response queue. | |||||
*/ | |||||
static void | |||||
l9p_threadpool_rflush(struct l9p_threadpool *tp, struct l9p_request *req) | |||||
{ | |||||
struct l9p_request *flusher; | |||||
/* | |||||
* https://swtch.com/plan9port/man/man9/flush.html says: | |||||
* | |||||
* "Should multiple Tflushes be received for a pending | |||||
* request, they must be answered in order. A Rflush for | |||||
* any of the multiple Tflushes implies an answer for all | |||||
* previous ones. Therefore, should a server receive a | |||||
* request and then multiple flushes for that request, it | |||||
* need respond only to the last flush." This means | |||||
* we could march through the queue of flushers here, | |||||
* marking all but the last one as "to be dropped" rather | |||||
* than "to be replied-to". | |||||
* | |||||
* However, we'll leave that for later, if ever -- it | |||||
* should be harmless to respond to each, in order. | |||||
*/ | |||||
STAILQ_FOREACH(flusher, &req->lr_flushq, lr_flushlink) { | |||||
flusher->lr_workstate = L9P_WS_RESPQUEUED; | |||||
#ifdef notdef | |||||
if (not the last) { | |||||
flusher->lr_flushstate = L9P_FLUSH_NOT_RUN; | |||||
/* or, flusher->lr_drop = true ? */ | |||||
} | |||||
#endif | |||||
STAILQ_INSERT_TAIL(&tp->ltp_replyq, flusher, lr_worklink); | |||||
} | |||||
} | |||||
int | |||||
l9p_threadpool_init(struct l9p_threadpool *tp, int size) | |||||
{ | |||||
struct l9p_worker *worker; | |||||
#if defined(__FreeBSD__) | |||||
char threadname[16]; | |||||
#endif | |||||
int error; | |||||
int i, nworkers, nresponders; | |||||
if (size <= 0) | |||||
return (EINVAL); | |||||
error = pthread_mutex_init(&tp->ltp_mtx, NULL); | |||||
if (error) | |||||
return (error); | |||||
error = pthread_cond_init(&tp->ltp_work_cv, NULL); | |||||
if (error) | |||||
goto fail_work_cv; | |||||
error = pthread_cond_init(&tp->ltp_reply_cv, NULL); | |||||
if (error) | |||||
goto fail_reply_cv; | |||||
STAILQ_INIT(&tp->ltp_workq); | |||||
STAILQ_INIT(&tp->ltp_replyq); | |||||
LIST_INIT(&tp->ltp_workers); | |||||
nresponders = 0; | |||||
nworkers = 0; | |||||
for (i = 0; i <= size; i++) { | |||||
worker = calloc(1, sizeof(struct l9p_worker)); | |||||
worker->ltw_tp = tp; | |||||
worker->ltw_responder = i == 0; | |||||
error = pthread_create(&worker->ltw_thread, NULL, | |||||
worker->ltw_responder ? l9p_responder : l9p_worker, | |||||
(void *)worker); | |||||
if (error) { | |||||
free(worker); | |||||
break; | |||||
} | |||||
if (worker->ltw_responder) | |||||
nresponders++; | |||||
else | |||||
nworkers++; | |||||
#if defined(__FreeBSD__) | |||||
if (worker->ltw_responder) { | |||||
pthread_set_name_np(worker->ltw_thread, "9p-responder"); | |||||
} else { | |||||
sprintf(threadname, "9p-worker:%d", i - 1); | |||||
pthread_set_name_np(worker->ltw_thread, threadname); | |||||
} | |||||
#endif | |||||
LIST_INSERT_HEAD(&tp->ltp_workers, worker, ltw_link); | |||||
} | |||||
if (nresponders == 0 || nworkers == 0) { | |||||
/* need the one responder, and at least one worker */ | |||||
l9p_threadpool_shutdown(tp); | |||||
return (error); | |||||
} | |||||
return (0); | |||||
/* | |||||
* We could avoid these labels by having multiple destroy | |||||
* paths (one for each error case), or by having booleans | |||||
* for which variables were initialized. Neither is very | |||||
* appealing... | |||||
*/ | |||||
fail_reply_cv: | |||||
pthread_cond_destroy(&tp->ltp_work_cv); | |||||
fail_work_cv: | |||||
pthread_mutex_destroy(&tp->ltp_mtx); | |||||
return (error); | |||||
} | |||||
/* | |||||
* Run a request, usually by queueing it. | |||||
*/ | |||||
void | |||||
l9p_threadpool_run(struct l9p_threadpool *tp, struct l9p_request *req) | |||||
{ | |||||
/* | |||||
* Flush requests must be handled specially, since they | |||||
* can cancel / kill off regular requests. (But we can | |||||
* run them through the regular dispatch mechanism.) | |||||
*/ | |||||
if (req->lr_req.hdr.type == L9P_TFLUSH) { | |||||
/* not on a work queue yet so we can touch state */ | |||||
req->lr_workstate = L9P_WS_IMMEDIATE; | |||||
(void) l9p_dispatch_request(req); | |||||
} else { | |||||
pthread_mutex_lock(&tp->ltp_mtx); | |||||
req->lr_workstate = L9P_WS_NOTSTARTED; | |||||
STAILQ_INSERT_TAIL(&tp->ltp_workq, req, lr_worklink); | |||||
pthread_cond_signal(&tp->ltp_work_cv); | |||||
pthread_mutex_unlock(&tp->ltp_mtx); | |||||
} | |||||
} | |||||
/* | |||||
* Run a Tflush request. Called via l9p_dispatch_request() since | |||||
* it has some debug code in it, but not called from worker thread. | |||||
*/ | |||||
int | |||||
l9p_threadpool_tflush(struct l9p_request *req) | |||||
{ | |||||
struct l9p_connection *conn; | |||||
struct l9p_threadpool *tp; | |||||
struct l9p_request *flushee; | |||||
uint16_t oldtag; | |||||
enum l9p_flushstate nstate; | |||||
/* | |||||
* Find what we're supposed to flush (the flushee, as it were). | |||||
*/ | |||||
req->lr_error = 0; /* Tflush always succeeds */ | |||||
conn = req->lr_conn; | |||||
tp = &conn->lc_tp; | |||||
oldtag = req->lr_req.tflush.oldtag; | |||||
ht_wrlock(&conn->lc_requests); | |||||
flushee = ht_find_locked(&conn->lc_requests, oldtag); | |||||
if (flushee == NULL) { | |||||
/* | |||||
* Nothing to flush! The old request must have | |||||
* been done and gone already. Just queue this | |||||
* Tflush for a success reply. | |||||
*/ | |||||
ht_unlock(&conn->lc_requests); | |||||
pthread_mutex_lock(&tp->ltp_mtx); | |||||
goto done; | |||||
} | |||||
/* | |||||
* Found the original request. We'll need to inspect its | |||||
* work-state to figure out what to do. | |||||
*/ | |||||
pthread_mutex_lock(&tp->ltp_mtx); | |||||
ht_unlock(&conn->lc_requests); | |||||
switch (flushee->lr_workstate) { | |||||
case L9P_WS_NOTSTARTED: | |||||
/* | |||||
* Flushee is on work queue, but not yet being | |||||
* handled by a worker. | |||||
* | |||||
* The documentation -- see | |||||
* http://ericvh.github.io/9p-rfc/rfc9p2000.html | |||||
* https://swtch.com/plan9port/man/man9/flush.html | |||||
* -- says that "the server should answer the | |||||
* flush message immediately". However, Linux | |||||
* sends flush requests for operations that | |||||
* must finish, such as Tclunk, and it's not | |||||
* possible to *answer* the flush request until | |||||
* it has been handled (if necessary) or aborted | |||||
* (if allowed). | |||||
* | |||||
* We therefore now just the original request | |||||
* and let the request-handler do whatever is | |||||
* appropriate. NOTE: we could have a table of | |||||
* "requests that can be aborted without being | |||||
* run" vs "requests that must be run to be | |||||
* aborted", but for now that seems like an | |||||
* unnecessary complication. | |||||
*/ | |||||
nstate = L9P_FLUSH_REQUESTED_PRE_START; | |||||
break; | |||||
case L9P_WS_IMMEDIATE: | |||||
/* | |||||
* This state only applies to Tflush requests, and | |||||
* flushing a Tflush is illegal. But we'll do nothing | |||||
* special here, which will make us act like a flush | |||||
* request for the flushee that arrived too late to | |||||
* do anything about the flushee. | |||||
*/ | |||||
nstate = L9P_FLUSH_REQUESTED_POST_START; | |||||
break; | |||||
case L9P_WS_INPROGRESS: | |||||
/* | |||||
* Worker thread flushee->lr_worker is working on it. | |||||
* Kick it to get it out of blocking system calls. | |||||
* (This requires that it carefully set up some | |||||
* signal handlers, and may be FreeBSD-dependent, | |||||
* it probably cannot be handled this way on MacOS.) | |||||
*/ | |||||
#ifdef notyet | |||||
pthread_kill(...); | |||||
#endif | |||||
nstate = L9P_FLUSH_REQUESTED_POST_START; | |||||
break; | |||||
case L9P_WS_RESPQUEUED: | |||||
/* | |||||
* The flushee is already in the response queue. | |||||
* We'll just mark it as having had some flush | |||||
* action applied. | |||||
*/ | |||||
nstate = L9P_FLUSH_TOOLATE; | |||||
break; | |||||
case L9P_WS_REPLYING: | |||||
/* | |||||
* Although we found the flushee, it's too late to | |||||
* make us depend on it: it's already heading out | |||||
* the door as a reply. | |||||
* | |||||
* We don't want to do anything to the flushee. | |||||
* Instead, we want to work the same way as if | |||||
* we had never found the tag. | |||||
*/ | |||||
goto done; | |||||
} | |||||
/* | |||||
* Now add us to the list of Tflush-es that are waiting | |||||
* for the flushee (creating the list if needed, i.e., if | |||||
* this is the first Tflush for the flushee). We (req) | |||||
* will get queued for reply later, when the responder | |||||
* processes the flushee and calls l9p_threadpool_rflush(). | |||||
*/ | |||||
if (flushee->lr_flushstate == L9P_FLUSH_NONE) | |||||
STAILQ_INIT(&flushee->lr_flushq); | |||||
flushee->lr_flushstate = nstate; | |||||
STAILQ_INSERT_TAIL(&flushee->lr_flushq, req, lr_flushlink); | |||||
pthread_mutex_unlock(&tp->ltp_mtx); | |||||
return (0); | |||||
done: | |||||
/* | |||||
* This immediate op is ready to be replied-to now, so just | |||||
* stick it onto the reply queue. | |||||
*/ | |||||
req->lr_workstate = L9P_WS_RESPQUEUED; | |||||
STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink); | |||||
pthread_mutex_unlock(&tp->ltp_mtx); | |||||
pthread_cond_signal(&tp->ltp_reply_cv); | |||||
return (0); | |||||
} | |||||
int | |||||
l9p_threadpool_shutdown(struct l9p_threadpool *tp) | |||||
{ | |||||
struct l9p_worker *worker, *tmp; | |||||
LIST_FOREACH_SAFE(worker, &tp->ltp_workers, ltw_link, tmp) { | |||||
pthread_mutex_lock(&tp->ltp_mtx); | |||||
worker->ltw_exiting = true; | |||||
if (worker->ltw_responder) | |||||
pthread_cond_signal(&tp->ltp_reply_cv); | |||||
else | |||||
pthread_cond_broadcast(&tp->ltp_work_cv); | |||||
pthread_mutex_unlock(&tp->ltp_mtx); | |||||
pthread_join(worker->ltw_thread, NULL); | |||||
LIST_REMOVE(worker, ltw_link); | |||||
free(worker); | |||||
} | |||||
pthread_cond_destroy(&tp->ltp_reply_cv); | |||||
pthread_cond_destroy(&tp->ltp_work_cv); | |||||
pthread_mutex_destroy(&tp->ltp_mtx); | |||||
return (0); | |||||
} |