Path: blob/main/sys/contrib/openzfs/module/zfs/bqueue.c
48383 views
// SPDX-License-Identifier: CDDL-1.01/*2* CDDL HEADER START3*4* This file and its contents are supplied under the terms of the5* Common Development and Distribution License ("CDDL"), version 1.0.6* You may only use this file in accordance with the terms of version7* 1.0 of the CDDL.8*9* A full copy of the text of the CDDL should have accompanied this10* source. A copy of the CDDL is also available via the Internet at11* http://www.illumos.org/license/CDDL.12*13* CDDL HEADER END14*/15/*16* Copyright (c) 2014, 2018 by Delphix. All rights reserved.17*/1819#include <sys/bqueue.h>20#include <sys/zfs_context.h>2122static inline bqueue_node_t *23obj2node(bqueue_t *q, void *data)24{25return ((bqueue_node_t *)((char *)data + q->bq_node_offset));26}2728/*29* Initialize a blocking queue The maximum capacity of the queue is set to30* size. Types that are stored in a bqueue must contain a bqueue_node_t, and31* node_offset must be its offset from the start of the struct. fill_fraction32* is a performance tuning value; when the queue is full, any threads33* attempting to enqueue records will block. They will block until they're34* signaled, which will occur when the queue is at least 1/fill_fraction35* empty. Similar behavior occurs on dequeue; if the queue is empty, threads36* block. They will be signalled when the queue has 1/fill_fraction full.37* As a result, you must call bqueue_enqueue_flush() when you enqueue your38* final record on a thread, in case the dequeuing threads are currently39* blocked and that enqueue does not cause them to be woken. Alternatively,40* this behavior can be disabled (causing signaling to happen immediately) by41* setting fill_fraction to any value larger than size. Return 0 on success,42* or -1 on failure.43*44* Note: The caller must ensure that for a given bqueue_t, there's only a45* single call to bqueue_enqueue() running at a time (e.g. by calling only46* from a single thread, or with locking around the call). Similarly, the47* caller must ensure that there's only a single call to bqueue_dequeue()48* running at a time. However, the one call to bqueue_enqueue() may be49* invoked concurrently with the one call to bqueue_dequeue().50*/51int52bqueue_init(bqueue_t *q, uint_t fill_fraction, size_t size, size_t node_offset)53{54if (fill_fraction == 0) {55return (-1);56}57list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t),58node_offset + offsetof(bqueue_node_t, bqn_node));59list_create(&q->bq_dequeuing_list, node_offset + sizeof (bqueue_node_t),60node_offset + offsetof(bqueue_node_t, bqn_node));61list_create(&q->bq_enqueuing_list, node_offset + sizeof (bqueue_node_t),62node_offset + offsetof(bqueue_node_t, bqn_node));63cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL);64cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL);65mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL);66q->bq_node_offset = node_offset;67q->bq_size = 0;68q->bq_dequeuing_size = 0;69q->bq_enqueuing_size = 0;70q->bq_maxsize = size;71q->bq_fill_fraction = fill_fraction;72return (0);73}7475/*76* Destroy a blocking queue. This function asserts that there are no77* elements in the queue, and no one is blocked on the condition78* variables.79*/80void81bqueue_destroy(bqueue_t *q)82{83mutex_enter(&q->bq_lock);84ASSERT0(q->bq_size);85ASSERT0(q->bq_dequeuing_size);86ASSERT0(q->bq_enqueuing_size);87cv_destroy(&q->bq_add_cv);88cv_destroy(&q->bq_pop_cv);89list_destroy(&q->bq_list);90list_destroy(&q->bq_dequeuing_list);91list_destroy(&q->bq_enqueuing_list);92mutex_exit(&q->bq_lock);93mutex_destroy(&q->bq_lock);94}9596static void97bqueue_enqueue_impl(bqueue_t *q, void *data, size_t item_size, boolean_t flush)98{99ASSERT3U(item_size, >, 0);100ASSERT3U(item_size, <=, q->bq_maxsize);101102obj2node(q, data)->bqn_size = item_size;103q->bq_enqueuing_size += item_size;104list_insert_tail(&q->bq_enqueuing_list, data);105106if (flush ||107q->bq_enqueuing_size >= q->bq_maxsize / q->bq_fill_fraction) {108/* Append the enquing list to the shared list. */109mutex_enter(&q->bq_lock);110while (q->bq_size > q->bq_maxsize) {111cv_wait_sig(&q->bq_add_cv, &q->bq_lock);112}113q->bq_size += q->bq_enqueuing_size;114list_move_tail(&q->bq_list, &q->bq_enqueuing_list);115q->bq_enqueuing_size = 0;116cv_broadcast(&q->bq_pop_cv);117mutex_exit(&q->bq_lock);118}119}120121/*122* Add data to q, consuming size units of capacity. If there is insufficient123* capacity to consume size units, block until capacity exists. Asserts size is124* > 0.125*/126void127bqueue_enqueue(bqueue_t *q, void *data, size_t item_size)128{129bqueue_enqueue_impl(q, data, item_size, B_FALSE);130}131132/*133* Enqueue an entry, and then flush the queue. This forces the popping threads134* to wake up, even if we're below the fill fraction. We have this in a single135* function, rather than having a separate call, because it prevents race136* conditions between the enqueuing thread and the dequeuing thread, where the137* enqueueing thread will wake up the dequeuing thread, that thread will138* destroy the condvar before the enqueuing thread is done.139*/140void141bqueue_enqueue_flush(bqueue_t *q, void *data, size_t item_size)142{143bqueue_enqueue_impl(q, data, item_size, B_TRUE);144}145146/*147* Take the first element off of q. If there are no elements on the queue, wait148* until one is put there. Return the removed element.149*/150void *151bqueue_dequeue(bqueue_t *q)152{153void *ret = list_remove_head(&q->bq_dequeuing_list);154if (ret == NULL) {155/*156* Dequeuing list is empty. Wait for there to be something on157* the shared list, then move the entire shared list to the158* dequeuing list.159*/160mutex_enter(&q->bq_lock);161while (q->bq_size == 0) {162cv_wait_sig(&q->bq_pop_cv, &q->bq_lock);163}164ASSERT0(q->bq_dequeuing_size);165ASSERT(list_is_empty(&q->bq_dequeuing_list));166list_move_tail(&q->bq_dequeuing_list, &q->bq_list);167q->bq_dequeuing_size = q->bq_size;168q->bq_size = 0;169cv_broadcast(&q->bq_add_cv);170mutex_exit(&q->bq_lock);171ret = list_remove_head(&q->bq_dequeuing_list);172}173q->bq_dequeuing_size -= obj2node(q, ret)->bqn_size;174return (ret);175}176177178