Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
emscripten-core
GitHub Repository: emscripten-core/emscripten
Path: blob/main/system/lib/pthread/thread_mailbox.c
6171 views
1
/*
2
* Copyright 2023 The Emscripten Authors. All rights reserved.
3
* Emscripten is available under two separate licenses, the MIT license and the
4
* University of Illinois/NCSA Open Source License. Both these licenses can be
5
* found in the LICENSE file.
6
*/
7
8
#include <assert.h>
9
#include <math.h>
10
#include <pthread.h>
11
#include <stdatomic.h>
12
13
#include "em_task_queue.h"
14
#include "pthread_impl.h"
15
#include "thread_mailbox.h"
16
#include "threading_internal.h"
17
18
int emscripten_thread_mailbox_ref(pthread_t thread) {
19
// Attempt to increment the refcount, being careful not to increment it if we
20
// ever observe a 0.
21
int prev_count = thread->mailbox_refcount;
22
while (1) {
23
if (prev_count == 0) {
24
// The mailbox is already closed!
25
return 0;
26
}
27
int desired_count = prev_count + 1;
28
if (atomic_compare_exchange_weak(
29
&thread->mailbox_refcount, &prev_count, desired_count)) {
30
return 1;
31
}
32
}
33
}
34
35
// Decrement and return the refcount.
36
void emscripten_thread_mailbox_unref(pthread_t thread) {
37
int new_count = atomic_fetch_sub(&thread->mailbox_refcount, 1) - 1;
38
assert(new_count >= 0);
39
if (new_count == 0) {
40
// The count is now zero. The thread that owns this queue may be waiting to
41
// shut down. Notify the thread that it is safe to proceed now that the
42
// mailbox is closed.
43
emscripten_futex_wake(&thread->mailbox_refcount, INT_MAX);
44
}
45
}
46
47
void _emscripten_thread_mailbox_shutdown(pthread_t thread) {
48
assert(thread == pthread_self());
49
50
// Decrement the refcount and wait for it to reach zero.
51
assert(thread->mailbox_refcount > 0);
52
int count = atomic_fetch_sub(&thread->mailbox_refcount, 1) - 1;
53
54
while (count != 0) {
55
emscripten_futex_wait(&thread->mailbox_refcount, count, INFINITY);
56
count = thread->mailbox_refcount;
57
}
58
59
// The mailbox is now closed. No more messages will be enqueued. Run the
60
// shutdown handler for any message already in the queue.
61
em_task_queue_cancel(thread->mailbox);
62
63
// The mailbox is now empty and will not be accessed again after this point.
64
em_task_queue_destroy(thread->mailbox);
65
}
66
67
void _emscripten_thread_mailbox_init(pthread_t thread) {
68
thread->mailbox = em_task_queue_create(thread);
69
thread->mailbox_refcount = 1;
70
thread->waiting_async = 0;
71
}
72
73
// Internal function, called from runtime_pthread.js
74
void _emscripten_check_mailbox() {
75
// Before we attempt to execute a request from another thread make sure we
76
// are in sync with all the loaded code.
77
// For example, in PROXY_TO_PTHREAD the atexit functions are called via
78
// a proxied call, and without this call to synchronize we would crash if
79
// any atexit functions were registered from a side module.
80
assert(pthread_self());
81
em_task_queue* mailbox = pthread_self()->mailbox;
82
mailbox->notification = NOTIFICATION_RECEIVED;
83
em_task_queue_execute(pthread_self()->mailbox);
84
notification_state expected = NOTIFICATION_RECEIVED;
85
atomic_compare_exchange_strong(
86
&mailbox->notification, &expected, NOTIFICATION_NONE);
87
// After every mailbox check we call `__pthread_testcancel` in case
88
// one of the proxied functions was from pthread_kill(SIGCANCEL).
89
__pthread_testcancel();
90
}
91
92
void emscripten_thread_mailbox_send(pthread_t thread, task t) {
93
assert(thread->mailbox_refcount > 0);
94
95
pthread_mutex_lock(&thread->mailbox->mutex);
96
if (!em_task_queue_enqueue(thread->mailbox, t)) {
97
assert(0 && "No way to correctly recover from allocation failure");
98
}
99
pthread_mutex_unlock(&thread->mailbox->mutex);
100
101
// If there is no pending notification for this mailbox, create one. If an old
102
// notification is currently being processed, it may or may not execute the
103
// new work. In case it does not, the new notification will ensure the work is
104
// still executed.
105
notification_state previous =
106
atomic_exchange(&thread->mailbox->notification, NOTIFICATION_PENDING);
107
if (previous != NOTIFICATION_PENDING) {
108
if (thread->waiting_async) {
109
__builtin_wasm_memory_atomic_notify((int*)thread, -1);
110
} else {
111
_emscripten_notify_mailbox_postmessage(thread, pthread_self());
112
}
113
}
114
}
115
116