Path: blob/main/system/lib/pthread/proxying.c
6175 views
/*1* Copyright 2021 The Emscripten Authors. All rights reserved.2* Emscripten is available under two separate licenses, the MIT license and the3* University of Illinois/NCSA Open Source License. Both these licenses can be4* found in the LICENSE file.5*/67#include <assert.h>8#include <emscripten/proxying.h>9#include <emscripten/threading.h>10#include <pthread.h>11#include <stdatomic.h>12#include <stdbool.h>13#include <stdlib.h>14#include <string.h>1516#include "em_task_queue.h"17#include "thread_mailbox.h"18#include "threading_internal.h"1920struct em_proxying_queue {21// Protects all accesses to em_task_queues, size, and capacity.22pthread_mutex_t mutex;23// `size` task queue pointers stored in an array of size `capacity`.24em_task_queue** task_queues;25int size;26int capacity;27};2829// The system proxying queue.30static em_proxying_queue system_proxying_queue = {31.mutex = PTHREAD_MUTEX_INITIALIZER,32.task_queues = NULL,33.size = 0,34.capacity = 0,35};3637static _Thread_local bool system_queue_in_use = false;3839em_proxying_queue* emscripten_proxy_get_system_queue(void) {40return &system_proxying_queue;41}4243em_proxying_queue* em_proxying_queue_create(void) {44// Allocate the new queue.45em_proxying_queue* q = malloc(sizeof(em_proxying_queue));46if (q == NULL) {47return NULL;48}49*q = (em_proxying_queue){50.mutex = PTHREAD_MUTEX_INITIALIZER,51.task_queues = NULL,52.size = 0,53.capacity = 0,54};55return q;56}5758void em_proxying_queue_destroy(em_proxying_queue* q) {59assert(q != NULL);60assert(q != &system_proxying_queue && "cannot destroy system proxying queue");6162pthread_mutex_destroy(&q->mutex);63for (int i = 0; i < q->size; i++) {64em_task_queue_destroy(q->task_queues[i]);65}66free(q->task_queues);67free(q);68}6970// Not thread safe. Returns NULL if there are no tasks for the thread.71static em_task_queue* get_tasks_for_thread(em_proxying_queue* q,72pthread_t thread) {73assert(q != NULL);74for (int i = 0; i < q->size; i++) {75if (pthread_equal(q->task_queues[i]->thread, thread)) {76return q->task_queues[i];77}78}79return NULL;80}8182// Not thread safe.83static em_task_queue* get_or_add_tasks_for_thread(em_proxying_queue* q,84pthread_t thread) {85em_task_queue* tasks = get_tasks_for_thread(q, thread);86if (tasks != NULL) {87return tasks;88}89// There were no tasks for the thread; initialize a new em_task_queue. If90// there are not enough queues, allocate more.91if (q->size == q->capacity) {92int new_capacity = q->capacity == 0 ? 1 : q->capacity * 2;93em_task_queue** new_task_queues =94realloc(q->task_queues, sizeof(em_task_queue*) * new_capacity);95if (new_task_queues == NULL) {96return NULL;97}98q->task_queues = new_task_queues;99q->capacity = new_capacity;100}101// Initialize the next available task queue.102tasks = em_task_queue_create(thread);103if (tasks == NULL) {104return NULL;105}106q->task_queues[q->size++] = tasks;107return tasks;108}109110void emscripten_proxy_execute_queue(em_proxying_queue* q) {111assert(q != NULL);112assert(pthread_self());113114// Below is a recursion and deadlock guard: The recursion guard is to avoid115// infinite recursion when we arrive here from the pthread_lock call below116// that executes the system queue. The per-task_queue recursion lock can't117// catch these recursions because it can only be checked after the lock has118// been acquired.119//120// This also guards against deadlocks when adding to the system queue. When121// the current thread is adding tasks, it locks the queue, but we can122// potentially try to execute the queue during the add (from emscripten_yield123// when malloc takes a lock). This will deadlock the thread, so only try to124// take the lock if the current thread is not using the queue. We then hope125// the queue is executed later when it is unlocked.126bool is_system_queue = q == &system_proxying_queue;127if (is_system_queue) {128if (system_queue_in_use) {129return;130}131system_queue_in_use = true;132}133134pthread_mutex_lock(&q->mutex);135em_task_queue* tasks = get_tasks_for_thread(q, pthread_self());136pthread_mutex_unlock(&q->mutex);137138if (tasks != NULL && !tasks->processing) {139// Found the task queue and it is not already being processed; process it.140em_task_queue_execute(tasks);141}142143if (is_system_queue) {144system_queue_in_use = false;145}146}147148static int do_proxy(em_proxying_queue* q, pthread_t target_thread, task t) {149assert(q != NULL);150pthread_mutex_lock(&q->mutex);151bool is_system_queue = q == &system_proxying_queue;152if (is_system_queue) {153system_queue_in_use = true;154}155em_task_queue* tasks = get_or_add_tasks_for_thread(q, target_thread);156if (is_system_queue) {157system_queue_in_use = false;158}159pthread_mutex_unlock(&q->mutex);160if (tasks == NULL) {161return 0;162}163164return em_task_queue_send(tasks, t);165}166167int emscripten_proxy_async(em_proxying_queue* q,168pthread_t target_thread,169void (*func)(void*),170void* arg) {171return do_proxy(q, target_thread, (task){func, NULL, arg});172}173174enum ctx_kind { SYNC, CALLBACK };175176enum ctx_state { PENDING, DONE, CANCELED };177178struct em_proxying_ctx {179// The user-provided function and argument.180void (*func)(em_proxying_ctx*, void*);181void* arg;182183enum ctx_kind kind;184union {185// Context for synchronous proxying.186struct {187// Update `state` and signal the condition variable once the proxied task188// is done or canceled.189enum ctx_state state;190pthread_mutex_t mutex;191pthread_cond_t cond;192} sync;193194// Context for proxying with callbacks.195struct {196em_proxying_queue* queue;197pthread_t caller_thread;198void (*callback)(void*);199void (*cancel)(void*);200} cb;201};202203// A doubly linked list of contexts associated with active work on a single204// thread. If the thread is canceled, it will traverse this list to find205// contexts that need to be canceled.206struct em_proxying_ctx* next;207struct em_proxying_ctx* prev;208};209210// The key that `cancel_active_ctxs` is bound to so that it runs when a thread211// is canceled or exits.212static pthread_key_t active_ctxs;213static pthread_once_t active_ctxs_once = PTHREAD_ONCE_INIT;214215static void cancel_ctx(void* arg);216static void cancel_active_ctxs(void* arg);217218static void init_active_ctxs(void) {219int ret = pthread_key_create(&active_ctxs, cancel_active_ctxs);220assert(ret == 0);221(void)ret;222}223224static void add_active_ctx(em_proxying_ctx* ctx) {225assert(ctx != NULL);226em_proxying_ctx* head = pthread_getspecific(active_ctxs);227if (head == NULL) {228// This is the only active context; initialize the active contexts list.229ctx->next = ctx->prev = ctx;230pthread_setspecific(active_ctxs, ctx);231} else {232// Insert this context at the tail of the list just before `head`.233ctx->next = head;234ctx->prev = head->prev;235ctx->next->prev = ctx;236ctx->prev->next = ctx;237}238}239240static void remove_active_ctx(em_proxying_ctx* ctx) {241assert(ctx != NULL);242assert(ctx->next != NULL);243assert(ctx->prev != NULL);244if (ctx->next == ctx) {245// This is the only active context; clear the active contexts list.246ctx->next = ctx->prev = NULL;247pthread_setspecific(active_ctxs, NULL);248return;249}250251// Update the list head if we are removing the current head.252em_proxying_ctx* head = pthread_getspecific(active_ctxs);253if (ctx == head) {254pthread_setspecific(active_ctxs, head->next);255}256257// Remove the context from the list.258ctx->prev->next = ctx->next;259ctx->next->prev = ctx->prev;260ctx->next = ctx->prev = NULL;261}262263static void cancel_active_ctxs(void* arg) {264pthread_setspecific(active_ctxs, NULL);265em_proxying_ctx* head = arg;266em_proxying_ctx* curr = head;267do {268em_proxying_ctx* next = curr->next;269cancel_ctx(curr);270curr = next;271} while (curr != head);272}273274static void em_proxying_ctx_init_sync(em_proxying_ctx* ctx,275void (*func)(em_proxying_ctx*, void*),276void* arg) {277pthread_once(&active_ctxs_once, init_active_ctxs);278*ctx = (em_proxying_ctx){279.func = func,280.arg = arg,281.kind = SYNC,282.sync =283{284.state = PENDING,285.mutex = PTHREAD_MUTEX_INITIALIZER,286.cond = PTHREAD_COND_INITIALIZER,287},288};289}290291static void em_proxying_ctx_init_callback(em_proxying_ctx* ctx,292em_proxying_queue* queue,293pthread_t caller_thread,294void (*func)(em_proxying_ctx*, void*),295void (*callback)(void*),296void (*cancel)(void*),297void* arg) {298pthread_once(&active_ctxs_once, init_active_ctxs);299*ctx = (em_proxying_ctx){300.func = func,301.arg = arg,302.kind = CALLBACK,303.cb =304{305.queue = queue,306.caller_thread = caller_thread,307.callback = callback,308.cancel = cancel,309},310};311}312313static void em_proxying_ctx_deinit(em_proxying_ctx* ctx) {314if (ctx->kind == SYNC) {315pthread_mutex_destroy(&ctx->sync.mutex);316pthread_cond_destroy(&ctx->sync.cond);317}318// TODO: We should probably have some kind of refcounting scheme to keep319// `queue` alive for callback ctxs.320}321322static void free_ctx(void* arg) {323em_proxying_ctx* ctx = arg;324em_proxying_ctx_deinit(ctx);325free(ctx);326}327328// Free the callback info on the same thread it was originally allocated on.329// This may be more efficient.330static void call_callback_then_free_ctx(void* arg) {331em_proxying_ctx* ctx = arg;332ctx->cb.callback(ctx->arg);333free_ctx(ctx);334}335336void emscripten_proxy_finish(em_proxying_ctx* ctx) {337if (ctx->kind == SYNC) {338pthread_mutex_lock(&ctx->sync.mutex);339ctx->sync.state = DONE;340remove_active_ctx(ctx);341pthread_mutex_unlock(&ctx->sync.mutex);342pthread_cond_signal(&ctx->sync.cond);343} else {344// Schedule the callback on the caller thread. If the caller thread has345// already died or dies before the callback is executed, then at least make346// sure the context is freed.347remove_active_ctx(ctx);348if (!do_proxy(ctx->cb.queue,349ctx->cb.caller_thread,350(task){call_callback_then_free_ctx, free_ctx, ctx})) {351free_ctx(ctx);352}353}354}355356static void call_cancel_then_free_ctx(void* arg) {357em_proxying_ctx* ctx = arg;358ctx->cb.cancel(ctx->arg);359free_ctx(ctx);360}361362static void cancel_ctx(void* arg) {363em_proxying_ctx* ctx = arg;364if (ctx->kind == SYNC) {365pthread_mutex_lock(&ctx->sync.mutex);366ctx->sync.state = CANCELED;367pthread_mutex_unlock(&ctx->sync.mutex);368pthread_cond_signal(&ctx->sync.cond);369} else {370if (ctx->cb.cancel == NULL ||371!do_proxy(ctx->cb.queue,372ctx->cb.caller_thread,373(task){call_cancel_then_free_ctx, free_ctx, ctx})) {374free_ctx(ctx);375}376}377}378379// Helper for wrapping the call with ctx as a `void (*)(void*)`.380static void call_with_ctx(void* arg) {381em_proxying_ctx* ctx = arg;382add_active_ctx(ctx);383ctx->func(ctx, ctx->arg);384}385386int emscripten_proxy_sync_with_ctx(em_proxying_queue* q,387pthread_t target_thread,388void (*func)(em_proxying_ctx*, void*),389void* arg) {390assert(!pthread_equal(target_thread, pthread_self()) &&391"Cannot synchronously wait for work proxied to the current thread");392em_proxying_ctx ctx;393em_proxying_ctx_init_sync(&ctx, func, arg);394if (!do_proxy(q, target_thread, (task){call_with_ctx, cancel_ctx, &ctx})) {395em_proxying_ctx_deinit(&ctx);396return 0;397}398pthread_mutex_lock(&ctx.sync.mutex);399while (ctx.sync.state == PENDING) {400pthread_cond_wait(&ctx.sync.cond, &ctx.sync.mutex);401}402pthread_mutex_unlock(&ctx.sync.mutex);403int ret = ctx.sync.state == DONE;404em_proxying_ctx_deinit(&ctx);405return ret;406}407408// Helper for signaling the end of the task after the user function returns.409static void call_then_finish_task(em_proxying_ctx* ctx, void* arg) {410task* t = arg;411t->func(t->arg);412emscripten_proxy_finish(ctx);413}414415int emscripten_proxy_sync(em_proxying_queue* q,416pthread_t target_thread,417void (*func)(void*),418void* arg) {419task t = {.func = func, .arg = arg};420return emscripten_proxy_sync_with_ctx(421q, target_thread, call_then_finish_task, &t);422}423424static int do_proxy_callback(em_proxying_queue* q,425pthread_t target_thread,426void (*func)(em_proxying_ctx* ctx, void*),427void (*callback)(void*),428void (*cancel)(void*),429void* arg,430em_proxying_ctx* ctx) {431em_proxying_ctx_init_callback(432ctx, q, pthread_self(), func, callback, cancel, arg);433if (!do_proxy(q, target_thread, (task){call_with_ctx, cancel_ctx, ctx})) {434free_ctx(ctx);435return 0;436}437return 1;438}439440int emscripten_proxy_callback_with_ctx(em_proxying_queue* q,441pthread_t target_thread,442void (*func)(em_proxying_ctx* ctx,443void*),444void (*callback)(void*),445void (*cancel)(void*),446void* arg) {447em_proxying_ctx* ctx = malloc(sizeof(*ctx));448if (ctx == NULL) {449return 0;450}451return do_proxy_callback(q, target_thread, func, callback, cancel, arg, ctx);452}453454typedef struct callback_ctx {455void (*func)(void*);456void (*callback)(void*);457void (*cancel)(void*);458void* arg;459} callback_ctx;460461static void call_then_finish_callback(em_proxying_ctx* ctx, void* arg) {462callback_ctx* cb_ctx = arg;463cb_ctx->func(cb_ctx->arg);464emscripten_proxy_finish(ctx);465}466467static void callback_call(void* arg) {468callback_ctx* cb_ctx = arg;469cb_ctx->callback(cb_ctx->arg);470}471472static void callback_cancel(void* arg) {473callback_ctx* cb_ctx = arg;474if (cb_ctx->cancel != NULL) {475cb_ctx->cancel(cb_ctx->arg);476}477}478479int emscripten_proxy_callback(em_proxying_queue* q,480pthread_t target_thread,481void (*func)(void*),482void (*callback)(void*),483void (*cancel)(void*),484void* arg) {485// Allocate the em_proxying_ctx and the user ctx as a single block that will486// be freed when the `em_proxying_ctx` is freed.487struct block {488em_proxying_ctx ctx;489callback_ctx cb_ctx;490};491struct block* block = malloc(sizeof(*block));492if (block == NULL) {493return 0;494}495block->cb_ctx = (callback_ctx){func, callback, cancel, arg};496return do_proxy_callback(q,497target_thread,498call_then_finish_callback,499callback_call,500callback_cancel,501&block->cb_ctx,502&block->ctx);503}504505typedef struct promise_ctx {506void (*func)(em_proxying_ctx*, void*);507void* arg;508em_promise_t promise;509} promise_ctx;510511static void promise_call(em_proxying_ctx* ctx, void* arg) {512promise_ctx* promise_ctx = arg;513promise_ctx->func(ctx, promise_ctx->arg);514}515516static void promise_fulfill(void* arg) {517promise_ctx* promise_ctx = arg;518emscripten_promise_resolve(promise_ctx->promise, EM_PROMISE_FULFILL, NULL);519emscripten_promise_destroy(promise_ctx->promise);520}521522static void promise_reject(void* arg) {523promise_ctx* promise_ctx = arg;524emscripten_promise_resolve(promise_ctx->promise, EM_PROMISE_REJECT, NULL);525emscripten_promise_destroy(promise_ctx->promise);526}527528static em_promise_t do_proxy_promise(em_proxying_queue* q,529pthread_t target_thread,530void (*func)(em_proxying_ctx*, void*),531void* arg,532em_promise_t promise,533em_proxying_ctx* ctx,534promise_ctx* promise_ctx) {535*promise_ctx = (struct promise_ctx){func, arg, promise};536if (!do_proxy_callback(q,537target_thread,538promise_call,539promise_fulfill,540promise_reject,541promise_ctx,542ctx)) {543emscripten_promise_resolve(promise, EM_PROMISE_REJECT, NULL);544return promise;545}546// Return a separate promise to ensure that the internal promise will stay547// alive until the callbacks are called.548em_promise_t ret = emscripten_promise_create();549emscripten_promise_resolve(ret, EM_PROMISE_MATCH, promise);550return ret;551}552553em_promise_t emscripten_proxy_promise_with_ctx(em_proxying_queue* q,554pthread_t target_thread,555void (*func)(em_proxying_ctx*,556void*),557void* arg) {558em_promise_t promise = emscripten_promise_create();559// Allocate the em_proxying_ctx and promise ctx as a single block that will be560// freed when the `em_proxying_ctx` is freed.561struct block {562em_proxying_ctx ctx;563promise_ctx promise_ctx;564};565struct block* block = malloc(sizeof(*block));566if (block == NULL) {567emscripten_promise_resolve(promise, EM_PROMISE_REJECT, NULL);568return promise;569}570return do_proxy_promise(571q, target_thread, func, arg, promise, &block->ctx, &block->promise_ctx);572}573574em_promise_t emscripten_proxy_promise(em_proxying_queue* q,575pthread_t target_thread,576void (*func)(void*),577void* arg) {578em_promise_t promise = emscripten_promise_create();579// Allocate the em_proxying_ctx, promise ctx, and user task as a single block580// that will be freed when the `em_proxying_ctx` is freed.581struct block {582em_proxying_ctx ctx;583promise_ctx promise_ctx;584task task;585};586struct block* block = malloc(sizeof(*block));587if (block == NULL) {588emscripten_promise_resolve(promise, EM_PROMISE_REJECT, NULL);589return promise;590}591block->task = (task){.func = func, .arg = arg};592return do_proxy_promise(q,593target_thread,594call_then_finish_task,595&block->task,596promise,597&block->ctx,598&block->promise_ctx);599}600601typedef struct proxied_js_func_t {602int funcIndex;603void* emAsmAddr;604pthread_t callingThread;605int bufSize;606double* argBuffer;607double result;608bool owned;609} proxied_js_func_t;610611static void run_js_func(void* arg) {612proxied_js_func_t* f = (proxied_js_func_t*)arg;613f->result = _emscripten_receive_on_main_thread_js(614f->funcIndex, f->emAsmAddr, f->callingThread, f->bufSize, f->argBuffer, 0, 0);615if (f->owned) {616free(f->argBuffer);617free(f);618}619}620621static void run_js_func_with_ctx(em_proxying_ctx* ctx, void* arg) {622proxied_js_func_t* f = (proxied_js_func_t*)arg;623_emscripten_receive_on_main_thread_js(624f->funcIndex, f->emAsmAddr, f->callingThread, f->bufSize, f->argBuffer, ctx, arg);625626// run_js_func_with_ctx is always synchronously proxied and therefore arg627// should never be owned on the main thread (i.e. the argument here always628// exists on the stack of the calling thread, it's never copied/malloced).629assert(!f->owned);630}631632void _emscripten_run_js_on_main_thread_done(void* ctx, void* arg, double result) {633proxied_js_func_t* f = (proxied_js_func_t*)arg;634f->result = result;635emscripten_proxy_finish(ctx);636}637638/*639* The 'proxy_mode' argument to _emscripten_run_js_on_main_thread has 3 possible640* values:641*642* - PROXY_ASYNC: Returns immediately on the calling thread, does not signal643* - PROXY_SYNC: Synchronous on the calling thread, and also on the main thread644* - PROXY_SYNC_ASYNC: Synchronous on the calling thread, but async on the main645* thread.646*647* Note: 'PROXY_SYNC_ASYNC' is only passed when a function is marked as648* both "__async" and "__proxy: 'sync'"649*/650#define PROXY_ASYNC 0651#define PROXY_SYNC 1652#define PROXY_SYNC_ASYNC 2653654double _emscripten_run_js_on_main_thread(int func_index,655void* em_asm_addr,656int buf_size,657double* buffer,658int proxyMode) {659proxied_js_func_t f = {660.funcIndex = func_index,661.emAsmAddr = em_asm_addr,662.callingThread = pthread_self(),663.bufSize = buf_size,664.argBuffer = buffer,665.owned = false,666};667668em_proxying_queue* q = emscripten_proxy_get_system_queue();669pthread_t target = emscripten_main_runtime_thread_id();670671if (proxyMode != PROXY_ASYNC) {672int rtn;673if (proxyMode == PROXY_SYNC_ASYNC) {674rtn = emscripten_proxy_sync_with_ctx(q, target, run_js_func_with_ctx, &f);675} else {676rtn = emscripten_proxy_sync(q, target, run_js_func, &f);677}678if (!rtn) {679assert(false && "emscripten_proxy_sync_with_ctx failed");680return 0;681}682return f.result;683}684685// Make a heap allocated copy of the proxied_js_func_t686proxied_js_func_t* arg = malloc(sizeof(proxied_js_func_t));687*arg = f;688arg->owned = true;689690// Also make a copy of the argBuffer.691arg->argBuffer = malloc(buf_size);692memcpy(arg->argBuffer, buffer, buf_size);693694if (!emscripten_proxy_async(q, target, run_js_func, arg)) {695assert(false && "emscripten_proxy_async failed");696}697return 0;698}699700701