Path: blob/main/system/lib/pthread/thread_mailbox.c
6171 views
/*1* Copyright 2023 The Emscripten Authors. All rights reserved.2* Emscripten is available under two separate licenses, the MIT license and the3* University of Illinois/NCSA Open Source License. Both these licenses can be4* found in the LICENSE file.5*/67#include <assert.h>8#include <math.h>9#include <pthread.h>10#include <stdatomic.h>1112#include "em_task_queue.h"13#include "pthread_impl.h"14#include "thread_mailbox.h"15#include "threading_internal.h"1617int emscripten_thread_mailbox_ref(pthread_t thread) {18// Attempt to increment the refcount, being careful not to increment it if we19// ever observe a 0.20int prev_count = thread->mailbox_refcount;21while (1) {22if (prev_count == 0) {23// The mailbox is already closed!24return 0;25}26int desired_count = prev_count + 1;27if (atomic_compare_exchange_weak(28&thread->mailbox_refcount, &prev_count, desired_count)) {29return 1;30}31}32}3334// Decrement and return the refcount.35void emscripten_thread_mailbox_unref(pthread_t thread) {36int new_count = atomic_fetch_sub(&thread->mailbox_refcount, 1) - 1;37assert(new_count >= 0);38if (new_count == 0) {39// The count is now zero. The thread that owns this queue may be waiting to40// shut down. Notify the thread that it is safe to proceed now that the41// mailbox is closed.42emscripten_futex_wake(&thread->mailbox_refcount, INT_MAX);43}44}4546void _emscripten_thread_mailbox_shutdown(pthread_t thread) {47assert(thread == pthread_self());4849// Decrement the refcount and wait for it to reach zero.50assert(thread->mailbox_refcount > 0);51int count = atomic_fetch_sub(&thread->mailbox_refcount, 1) - 1;5253while (count != 0) {54emscripten_futex_wait(&thread->mailbox_refcount, count, INFINITY);55count = thread->mailbox_refcount;56}5758// The mailbox is now closed. No more messages will be enqueued. Run the59// shutdown handler for any message already in the queue.60em_task_queue_cancel(thread->mailbox);6162// The mailbox is now empty and will not be accessed again after this point.63em_task_queue_destroy(thread->mailbox);64}6566void _emscripten_thread_mailbox_init(pthread_t thread) {67thread->mailbox = em_task_queue_create(thread);68thread->mailbox_refcount = 1;69thread->waiting_async = 0;70}7172// Internal function, called from runtime_pthread.js73void _emscripten_check_mailbox() {74// Before we attempt to execute a request from another thread make sure we75// are in sync with all the loaded code.76// For example, in PROXY_TO_PTHREAD the atexit functions are called via77// a proxied call, and without this call to synchronize we would crash if78// any atexit functions were registered from a side module.79assert(pthread_self());80em_task_queue* mailbox = pthread_self()->mailbox;81mailbox->notification = NOTIFICATION_RECEIVED;82em_task_queue_execute(pthread_self()->mailbox);83notification_state expected = NOTIFICATION_RECEIVED;84atomic_compare_exchange_strong(85&mailbox->notification, &expected, NOTIFICATION_NONE);86// After every mailbox check we call `__pthread_testcancel` in case87// one of the proxied functions was from pthread_kill(SIGCANCEL).88__pthread_testcancel();89}9091void emscripten_thread_mailbox_send(pthread_t thread, task t) {92assert(thread->mailbox_refcount > 0);9394pthread_mutex_lock(&thread->mailbox->mutex);95if (!em_task_queue_enqueue(thread->mailbox, t)) {96assert(0 && "No way to correctly recover from allocation failure");97}98pthread_mutex_unlock(&thread->mailbox->mutex);99100// If there is no pending notification for this mailbox, create one. If an old101// notification is currently being processed, it may or may not execute the102// new work. In case it does not, the new notification will ensure the work is103// still executed.104notification_state previous =105atomic_exchange(&thread->mailbox->notification, NOTIFICATION_PENDING);106if (previous != NOTIFICATION_PENDING) {107if (thread->waiting_async) {108__builtin_wasm_memory_atomic_notify((int*)thread, -1);109} else {110_emscripten_notify_mailbox_postmessage(thread, pthread_self());111}112}113}114115116