Path: blob/main/system/lib/pthread/em_task_queue.c
6171 views
/*1* Copyright 2021 The Emscripten Authors. All rights reserved.2* Emscripten is available under two separate licenses, the MIT license and the3* University of Illinois/NCSA Open Source License. Both these licenses can be4* found in the LICENSE file.5*/67#include <assert.h>8#include <emscripten/threading.h>9#include <stdatomic.h>10#include <stdlib.h>11#include <string.h>1213#include "em_task_queue.h"14#include "proxying_notification_state.h"15#include "thread_mailbox.h"1617#define EM_TASK_QUEUE_INITIAL_CAPACITY 1281819// Task Queue Lifetime Management20// -------------------------------21//22// When tasks are added to a task queue, the Worker running the target thread23// receives an event that will cause it to execute the queue when it next24// returns to its event loop. In some cases the queue will already have been25// executed before then, but the event is still received and the queue is still26// executed. These events contain references to the queue so that the target27// thread will know which queue to execute.28//29// To avoid use-after-free bugs, we cannot free a task queue immediately when30// `em_task_queue_destroy` is called; instead, we must defer freeing the queue31// until all of its outstanding notifications have been processed. We defer32// freeing the queue using an atomic flag. Each time a notification containing a33// reference to a task queue is generated, we set the flag on that task queue.34// Each time that task queue is processed, we clear the flag as long as another35// notification for the queue has not been generated in the mean time. The36// proxying queue can only be freed once `em_task_queue_destroy` has been called37// and its notification flag has been cleared.38//39// But an extra complication is that the target thread may have died by the time40// it gets back to its event loop to process its notifications. In that case the41// thread's Worker will still receive a notification and have to clear the42// notification flag without a live runtime. Without a live runtime, there is no43// stack, so the worker cannot safely free the queue at this point even if the44// notification flag is cleared. We need a separate thread with a live runtime45// to perform the free.46//47// To ensure that queues are eventually freed, we place destroyed queues in a48// global "zombie list" where they wait for their notification flags to be49// cleared. The zombie list is scanned and zombie queues without outstanding50// notifications are freed whenever a new queue is constructed. In principle the51// zombie list could be scanned at any time, but the queue constructor is a nice52// place to do it because scanning there is sufficient to keep the number of53// zombie queues from growing without bound; creating a new zombie ultimately54// requires creating a new queue.55//56// -------------------------------5758// The head of the zombie list. Its mutex protects access to the list and its59// other fields are not used.60static em_task_queue zombie_list_head = {.mutex = PTHREAD_MUTEX_INITIALIZER,61.zombie_prev = &zombie_list_head,62.zombie_next = &zombie_list_head};6364static void em_task_queue_free(em_task_queue* queue) {65pthread_mutex_destroy(&queue->mutex);66free(queue->tasks);67free(queue);68}6970static void cull_zombies() {71if (pthread_mutex_trylock(&zombie_list_head.mutex) != 0) {72// Some other thread is already culling. In principle there may be new73// cullable zombies after it finishes, but it's not worth waiting to find74// out.75return;76}77em_task_queue* curr = zombie_list_head.zombie_next;78while (curr != &zombie_list_head) {79em_task_queue* next = curr->zombie_next;80if (curr->notification == NOTIFICATION_NONE) {81// Remove the zombie from the list and free it.82curr->zombie_prev->zombie_next = curr->zombie_next;83curr->zombie_next->zombie_prev = curr->zombie_prev;84em_task_queue_free(curr);85}86curr = next;87}88pthread_mutex_unlock(&zombie_list_head.mutex);89}9091em_task_queue* em_task_queue_create(pthread_t thread) {92// Free any queue that has been destroyed and is safe to free.93cull_zombies();9495em_task_queue* queue = malloc(sizeof(em_task_queue));96if (queue == NULL) {97return NULL;98}99task* tasks = malloc(sizeof(task) * EM_TASK_QUEUE_INITIAL_CAPACITY);100if (tasks == NULL) {101free(queue);102return NULL;103}104*queue = (em_task_queue){.notification = NOTIFICATION_NONE,105.mutex = PTHREAD_MUTEX_INITIALIZER,106.thread = thread,107.processing = 0,108.tasks = tasks,109.capacity = EM_TASK_QUEUE_INITIAL_CAPACITY,110.head = 0,111.tail = 0,112.zombie_prev = NULL,113.zombie_next = NULL};114return queue;115}116117void em_task_queue_destroy(em_task_queue* queue) {118assert(queue->zombie_next == NULL && queue->zombie_prev == NULL);119if (queue->notification == NOTIFICATION_NONE) {120// No outstanding references to the queue, so we can go ahead and free it.121em_task_queue_free(queue);122return;123}124// Otherwise add the queue to the zombie list so that it will eventually be125// freed safely.126pthread_mutex_lock(&zombie_list_head.mutex);127queue->zombie_next = &zombie_list_head;128queue->zombie_prev = zombie_list_head.zombie_prev;129queue->zombie_next->zombie_prev = queue;130queue->zombie_prev->zombie_next = queue;131pthread_mutex_unlock(&zombie_list_head.mutex);132}133134// Not thread safe. Returns 1 on success and 0 on failure.135static int em_task_queue_grow(em_task_queue* queue) {136// Allocate a larger task queue.137int new_capacity = queue->capacity * 2;138task* new_tasks = malloc(sizeof(task) * new_capacity);139if (new_tasks == NULL) {140return 0;141}142// Copy the tasks such that the head of the queue is at the beginning of the143// buffer. There are two cases to handle: either the queue wraps around the144// end of the old buffer or it does not.145int queued_tasks;146if (queue->head <= queue->tail) {147// No wrap. Copy the tasks in one chunk.148queued_tasks = queue->tail - queue->head;149memcpy(new_tasks, &queue->tasks[queue->head], sizeof(task) * queued_tasks);150} else {151// Wrap. Copy `first_queued` tasks up to the end of the old buffer and152// `last_queued` tasks at the beginning of the old buffer.153int first_queued = queue->capacity - queue->head;154int last_queued = queue->tail;155queued_tasks = first_queued + last_queued;156memcpy(new_tasks, &queue->tasks[queue->head], sizeof(task) * first_queued);157memcpy(new_tasks + first_queued, queue->tasks, sizeof(task) * last_queued);158}159free(queue->tasks);160queue->tasks = new_tasks;161queue->capacity = new_capacity;162queue->head = 0;163queue->tail = queued_tasks;164return 1;165}166167void em_task_queue_execute(em_task_queue* queue) {168queue->processing = 1;169pthread_mutex_lock(&queue->mutex);170while (!em_task_queue_is_empty(queue)) {171task t = em_task_queue_dequeue(queue);172// Unlock while the task is running to allow more work to be queued in173// parallel.174pthread_mutex_unlock(&queue->mutex);175t.func(t.arg);176pthread_mutex_lock(&queue->mutex);177}178pthread_mutex_unlock(&queue->mutex);179queue->processing = 0;180}181182void em_task_queue_cancel(em_task_queue* queue) {183pthread_mutex_lock(&queue->mutex);184while (!em_task_queue_is_empty(queue)) {185task t = em_task_queue_dequeue(queue);186if (t.cancel) {187t.cancel(t.arg);188}189}190pthread_mutex_unlock(&queue->mutex);191// Any subsequent messages to this queue (for example if a pthread struct is192// reused for a future thread, potentially on a different worker) will require193// a new notification. Clearing the flag is safe here because in both the194// proxying queue and mailbox cases, there are no more outstanding references195// to the queue after thread shutdown.196queue->notification = NOTIFICATION_NONE;197}198199int em_task_queue_enqueue(em_task_queue* queue, task t) {200if (em_task_queue_is_full(queue) && !em_task_queue_grow(queue)) {201return 0;202}203queue->tasks[queue->tail] = t;204queue->tail = (queue->tail + 1) % queue->capacity;205return 1;206}207208task em_task_queue_dequeue(em_task_queue* queue) {209task t = queue->tasks[queue->head];210queue->head = (queue->head + 1) % queue->capacity;211return t;212}213214static void receive_notification(void* arg) {215em_task_queue* tasks = arg;216tasks->notification = NOTIFICATION_RECEIVED;217em_task_queue_execute(tasks);218notification_state expected = NOTIFICATION_RECEIVED;219atomic_compare_exchange_strong(220&tasks->notification, &expected, NOTIFICATION_NONE);221}222223static void cancel_notification(void* arg) {224em_task_queue* tasks = arg;225em_task_queue_cancel(tasks);226}227228int em_task_queue_send(em_task_queue* queue, task t) {229// Ensure the target mailbox will remain open or detect that it is already230// closed.231if (!emscripten_thread_mailbox_ref(queue->thread)) {232return 0;233}234235pthread_mutex_lock(&queue->mutex);236int enqueued = em_task_queue_enqueue(queue, t);237pthread_mutex_unlock(&queue->mutex);238if (!enqueued) {239emscripten_thread_mailbox_unref(queue->thread);240return 0;241}242243// We're done if there is already a pending notification for this task queue.244// Otherwise, we will send one.245notification_state previous =246atomic_exchange(&queue->notification, NOTIFICATION_PENDING);247if (previous == NOTIFICATION_PENDING) {248emscripten_thread_mailbox_unref(queue->thread);249return 1;250}251252emscripten_thread_mailbox_send(queue->thread,253(task){.func = receive_notification,254.cancel = cancel_notification,255.arg = queue});256emscripten_thread_mailbox_unref(queue->thread);257return 1;258}259260261