Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
emscripten-core
GitHub Repository: emscripten-core/emscripten
Path: blob/main/system/lib/pthread/em_task_queue.c
6171 views
1
/*
2
* Copyright 2021 The Emscripten Authors. All rights reserved.
3
* Emscripten is available under two separate licenses, the MIT license and the
4
* University of Illinois/NCSA Open Source License. Both these licenses can be
5
* found in the LICENSE file.
6
*/
7
8
#include <assert.h>
9
#include <emscripten/threading.h>
10
#include <stdatomic.h>
11
#include <stdlib.h>
12
#include <string.h>
13
14
#include "em_task_queue.h"
15
#include "proxying_notification_state.h"
16
#include "thread_mailbox.h"
17
18
#define EM_TASK_QUEUE_INITIAL_CAPACITY 128
19
20
// Task Queue Lifetime Management
21
// -------------------------------
22
//
23
// When tasks are added to a task queue, the Worker running the target thread
24
// receives an event that will cause it to execute the queue when it next
25
// returns to its event loop. In some cases the queue will already have been
26
// executed before then, but the event is still received and the queue is still
27
// executed. These events contain references to the queue so that the target
28
// thread will know which queue to execute.
29
//
30
// To avoid use-after-free bugs, we cannot free a task queue immediately when
31
// `em_task_queue_destroy` is called; instead, we must defer freeing the queue
32
// until all of its outstanding notifications have been processed. We defer
33
// freeing the queue using an atomic flag. Each time a notification containing a
34
// reference to a task queue is generated, we set the flag on that task queue.
35
// Each time that task queue is processed, we clear the flag as long as another
36
// notification for the queue has not been generated in the mean time. The
37
// proxying queue can only be freed once `em_task_queue_destroy` has been called
38
// and its notification flag has been cleared.
39
//
40
// But an extra complication is that the target thread may have died by the time
41
// it gets back to its event loop to process its notifications. In that case the
42
// thread's Worker will still receive a notification and have to clear the
43
// notification flag without a live runtime. Without a live runtime, there is no
44
// stack, so the worker cannot safely free the queue at this point even if the
45
// notification flag is cleared. We need a separate thread with a live runtime
46
// to perform the free.
47
//
48
// To ensure that queues are eventually freed, we place destroyed queues in a
49
// global "zombie list" where they wait for their notification flags to be
50
// cleared. The zombie list is scanned and zombie queues without outstanding
51
// notifications are freed whenever a new queue is constructed. In principle the
52
// zombie list could be scanned at any time, but the queue constructor is a nice
53
// place to do it because scanning there is sufficient to keep the number of
54
// zombie queues from growing without bound; creating a new zombie ultimately
55
// requires creating a new queue.
56
//
57
// -------------------------------
58
59
// The head of the zombie list. Its mutex protects access to the list and its
60
// other fields are not used.
61
static em_task_queue zombie_list_head = {.mutex = PTHREAD_MUTEX_INITIALIZER,
62
.zombie_prev = &zombie_list_head,
63
.zombie_next = &zombie_list_head};
64
65
static void em_task_queue_free(em_task_queue* queue) {
66
pthread_mutex_destroy(&queue->mutex);
67
free(queue->tasks);
68
free(queue);
69
}
70
71
static void cull_zombies() {
72
if (pthread_mutex_trylock(&zombie_list_head.mutex) != 0) {
73
// Some other thread is already culling. In principle there may be new
74
// cullable zombies after it finishes, but it's not worth waiting to find
75
// out.
76
return;
77
}
78
em_task_queue* curr = zombie_list_head.zombie_next;
79
while (curr != &zombie_list_head) {
80
em_task_queue* next = curr->zombie_next;
81
if (curr->notification == NOTIFICATION_NONE) {
82
// Remove the zombie from the list and free it.
83
curr->zombie_prev->zombie_next = curr->zombie_next;
84
curr->zombie_next->zombie_prev = curr->zombie_prev;
85
em_task_queue_free(curr);
86
}
87
curr = next;
88
}
89
pthread_mutex_unlock(&zombie_list_head.mutex);
90
}
91
92
em_task_queue* em_task_queue_create(pthread_t thread) {
93
// Free any queue that has been destroyed and is safe to free.
94
cull_zombies();
95
96
em_task_queue* queue = malloc(sizeof(em_task_queue));
97
if (queue == NULL) {
98
return NULL;
99
}
100
task* tasks = malloc(sizeof(task) * EM_TASK_QUEUE_INITIAL_CAPACITY);
101
if (tasks == NULL) {
102
free(queue);
103
return NULL;
104
}
105
*queue = (em_task_queue){.notification = NOTIFICATION_NONE,
106
.mutex = PTHREAD_MUTEX_INITIALIZER,
107
.thread = thread,
108
.processing = 0,
109
.tasks = tasks,
110
.capacity = EM_TASK_QUEUE_INITIAL_CAPACITY,
111
.head = 0,
112
.tail = 0,
113
.zombie_prev = NULL,
114
.zombie_next = NULL};
115
return queue;
116
}
117
118
void em_task_queue_destroy(em_task_queue* queue) {
119
assert(queue->zombie_next == NULL && queue->zombie_prev == NULL);
120
if (queue->notification == NOTIFICATION_NONE) {
121
// No outstanding references to the queue, so we can go ahead and free it.
122
em_task_queue_free(queue);
123
return;
124
}
125
// Otherwise add the queue to the zombie list so that it will eventually be
126
// freed safely.
127
pthread_mutex_lock(&zombie_list_head.mutex);
128
queue->zombie_next = &zombie_list_head;
129
queue->zombie_prev = zombie_list_head.zombie_prev;
130
queue->zombie_next->zombie_prev = queue;
131
queue->zombie_prev->zombie_next = queue;
132
pthread_mutex_unlock(&zombie_list_head.mutex);
133
}
134
135
// Not thread safe. Returns 1 on success and 0 on failure.
136
static int em_task_queue_grow(em_task_queue* queue) {
137
// Allocate a larger task queue.
138
int new_capacity = queue->capacity * 2;
139
task* new_tasks = malloc(sizeof(task) * new_capacity);
140
if (new_tasks == NULL) {
141
return 0;
142
}
143
// Copy the tasks such that the head of the queue is at the beginning of the
144
// buffer. There are two cases to handle: either the queue wraps around the
145
// end of the old buffer or it does not.
146
int queued_tasks;
147
if (queue->head <= queue->tail) {
148
// No wrap. Copy the tasks in one chunk.
149
queued_tasks = queue->tail - queue->head;
150
memcpy(new_tasks, &queue->tasks[queue->head], sizeof(task) * queued_tasks);
151
} else {
152
// Wrap. Copy `first_queued` tasks up to the end of the old buffer and
153
// `last_queued` tasks at the beginning of the old buffer.
154
int first_queued = queue->capacity - queue->head;
155
int last_queued = queue->tail;
156
queued_tasks = first_queued + last_queued;
157
memcpy(new_tasks, &queue->tasks[queue->head], sizeof(task) * first_queued);
158
memcpy(new_tasks + first_queued, queue->tasks, sizeof(task) * last_queued);
159
}
160
free(queue->tasks);
161
queue->tasks = new_tasks;
162
queue->capacity = new_capacity;
163
queue->head = 0;
164
queue->tail = queued_tasks;
165
return 1;
166
}
167
168
void em_task_queue_execute(em_task_queue* queue) {
169
queue->processing = 1;
170
pthread_mutex_lock(&queue->mutex);
171
while (!em_task_queue_is_empty(queue)) {
172
task t = em_task_queue_dequeue(queue);
173
// Unlock while the task is running to allow more work to be queued in
174
// parallel.
175
pthread_mutex_unlock(&queue->mutex);
176
t.func(t.arg);
177
pthread_mutex_lock(&queue->mutex);
178
}
179
pthread_mutex_unlock(&queue->mutex);
180
queue->processing = 0;
181
}
182
183
void em_task_queue_cancel(em_task_queue* queue) {
184
pthread_mutex_lock(&queue->mutex);
185
while (!em_task_queue_is_empty(queue)) {
186
task t = em_task_queue_dequeue(queue);
187
if (t.cancel) {
188
t.cancel(t.arg);
189
}
190
}
191
pthread_mutex_unlock(&queue->mutex);
192
// Any subsequent messages to this queue (for example if a pthread struct is
193
// reused for a future thread, potentially on a different worker) will require
194
// a new notification. Clearing the flag is safe here because in both the
195
// proxying queue and mailbox cases, there are no more outstanding references
196
// to the queue after thread shutdown.
197
queue->notification = NOTIFICATION_NONE;
198
}
199
200
int em_task_queue_enqueue(em_task_queue* queue, task t) {
201
if (em_task_queue_is_full(queue) && !em_task_queue_grow(queue)) {
202
return 0;
203
}
204
queue->tasks[queue->tail] = t;
205
queue->tail = (queue->tail + 1) % queue->capacity;
206
return 1;
207
}
208
209
task em_task_queue_dequeue(em_task_queue* queue) {
210
task t = queue->tasks[queue->head];
211
queue->head = (queue->head + 1) % queue->capacity;
212
return t;
213
}
214
215
static void receive_notification(void* arg) {
216
em_task_queue* tasks = arg;
217
tasks->notification = NOTIFICATION_RECEIVED;
218
em_task_queue_execute(tasks);
219
notification_state expected = NOTIFICATION_RECEIVED;
220
atomic_compare_exchange_strong(
221
&tasks->notification, &expected, NOTIFICATION_NONE);
222
}
223
224
static void cancel_notification(void* arg) {
225
em_task_queue* tasks = arg;
226
em_task_queue_cancel(tasks);
227
}
228
229
int em_task_queue_send(em_task_queue* queue, task t) {
230
// Ensure the target mailbox will remain open or detect that it is already
231
// closed.
232
if (!emscripten_thread_mailbox_ref(queue->thread)) {
233
return 0;
234
}
235
236
pthread_mutex_lock(&queue->mutex);
237
int enqueued = em_task_queue_enqueue(queue, t);
238
pthread_mutex_unlock(&queue->mutex);
239
if (!enqueued) {
240
emscripten_thread_mailbox_unref(queue->thread);
241
return 0;
242
}
243
244
// We're done if there is already a pending notification for this task queue.
245
// Otherwise, we will send one.
246
notification_state previous =
247
atomic_exchange(&queue->notification, NOTIFICATION_PENDING);
248
if (previous == NOTIFICATION_PENDING) {
249
emscripten_thread_mailbox_unref(queue->thread);
250
return 1;
251
}
252
253
emscripten_thread_mailbox_send(queue->thread,
254
(task){.func = receive_notification,
255
.cancel = cancel_notification,
256
.arg = queue});
257
emscripten_thread_mailbox_unref(queue->thread);
258
return 1;
259
}
260
261