Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/sys/contrib/openzfs/module/zfs/bqueue.c
48383 views
1
// SPDX-License-Identifier: CDDL-1.0
2
/*
3
* CDDL HEADER START
4
*
5
* This file and its contents are supplied under the terms of the
6
* Common Development and Distribution License ("CDDL"), version 1.0.
7
* You may only use this file in accordance with the terms of version
8
* 1.0 of the CDDL.
9
*
10
* A full copy of the text of the CDDL should have accompanied this
11
* source. A copy of the CDDL is also available via the Internet at
12
* http://www.illumos.org/license/CDDL.
13
*
14
* CDDL HEADER END
15
*/
16
/*
17
* Copyright (c) 2014, 2018 by Delphix. All rights reserved.
18
*/
19
20
#include <sys/bqueue.h>
21
#include <sys/zfs_context.h>
22
23
static inline bqueue_node_t *
24
obj2node(bqueue_t *q, void *data)
25
{
26
return ((bqueue_node_t *)((char *)data + q->bq_node_offset));
27
}
28
29
/*
30
* Initialize a blocking queue The maximum capacity of the queue is set to
31
* size. Types that are stored in a bqueue must contain a bqueue_node_t, and
32
* node_offset must be its offset from the start of the struct. fill_fraction
33
* is a performance tuning value; when the queue is full, any threads
34
* attempting to enqueue records will block. They will block until they're
35
* signaled, which will occur when the queue is at least 1/fill_fraction
36
* empty. Similar behavior occurs on dequeue; if the queue is empty, threads
37
* block. They will be signalled when the queue has 1/fill_fraction full.
38
* As a result, you must call bqueue_enqueue_flush() when you enqueue your
39
* final record on a thread, in case the dequeuing threads are currently
40
* blocked and that enqueue does not cause them to be woken. Alternatively,
41
* this behavior can be disabled (causing signaling to happen immediately) by
42
* setting fill_fraction to any value larger than size. Return 0 on success,
43
* or -1 on failure.
44
*
45
* Note: The caller must ensure that for a given bqueue_t, there's only a
46
* single call to bqueue_enqueue() running at a time (e.g. by calling only
47
* from a single thread, or with locking around the call). Similarly, the
48
* caller must ensure that there's only a single call to bqueue_dequeue()
49
* running at a time. However, the one call to bqueue_enqueue() may be
50
* invoked concurrently with the one call to bqueue_dequeue().
51
*/
52
int
53
bqueue_init(bqueue_t *q, uint_t fill_fraction, size_t size, size_t node_offset)
54
{
55
if (fill_fraction == 0) {
56
return (-1);
57
}
58
list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t),
59
node_offset + offsetof(bqueue_node_t, bqn_node));
60
list_create(&q->bq_dequeuing_list, node_offset + sizeof (bqueue_node_t),
61
node_offset + offsetof(bqueue_node_t, bqn_node));
62
list_create(&q->bq_enqueuing_list, node_offset + sizeof (bqueue_node_t),
63
node_offset + offsetof(bqueue_node_t, bqn_node));
64
cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL);
65
cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL);
66
mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL);
67
q->bq_node_offset = node_offset;
68
q->bq_size = 0;
69
q->bq_dequeuing_size = 0;
70
q->bq_enqueuing_size = 0;
71
q->bq_maxsize = size;
72
q->bq_fill_fraction = fill_fraction;
73
return (0);
74
}
75
76
/*
77
* Destroy a blocking queue. This function asserts that there are no
78
* elements in the queue, and no one is blocked on the condition
79
* variables.
80
*/
81
void
82
bqueue_destroy(bqueue_t *q)
83
{
84
mutex_enter(&q->bq_lock);
85
ASSERT0(q->bq_size);
86
ASSERT0(q->bq_dequeuing_size);
87
ASSERT0(q->bq_enqueuing_size);
88
cv_destroy(&q->bq_add_cv);
89
cv_destroy(&q->bq_pop_cv);
90
list_destroy(&q->bq_list);
91
list_destroy(&q->bq_dequeuing_list);
92
list_destroy(&q->bq_enqueuing_list);
93
mutex_exit(&q->bq_lock);
94
mutex_destroy(&q->bq_lock);
95
}
96
97
static void
98
bqueue_enqueue_impl(bqueue_t *q, void *data, size_t item_size, boolean_t flush)
99
{
100
ASSERT3U(item_size, >, 0);
101
ASSERT3U(item_size, <=, q->bq_maxsize);
102
103
obj2node(q, data)->bqn_size = item_size;
104
q->bq_enqueuing_size += item_size;
105
list_insert_tail(&q->bq_enqueuing_list, data);
106
107
if (flush ||
108
q->bq_enqueuing_size >= q->bq_maxsize / q->bq_fill_fraction) {
109
/* Append the enquing list to the shared list. */
110
mutex_enter(&q->bq_lock);
111
while (q->bq_size > q->bq_maxsize) {
112
cv_wait_sig(&q->bq_add_cv, &q->bq_lock);
113
}
114
q->bq_size += q->bq_enqueuing_size;
115
list_move_tail(&q->bq_list, &q->bq_enqueuing_list);
116
q->bq_enqueuing_size = 0;
117
cv_broadcast(&q->bq_pop_cv);
118
mutex_exit(&q->bq_lock);
119
}
120
}
121
122
/*
123
* Add data to q, consuming size units of capacity. If there is insufficient
124
* capacity to consume size units, block until capacity exists. Asserts size is
125
* > 0.
126
*/
127
void
128
bqueue_enqueue(bqueue_t *q, void *data, size_t item_size)
129
{
130
bqueue_enqueue_impl(q, data, item_size, B_FALSE);
131
}
132
133
/*
134
* Enqueue an entry, and then flush the queue. This forces the popping threads
135
* to wake up, even if we're below the fill fraction. We have this in a single
136
* function, rather than having a separate call, because it prevents race
137
* conditions between the enqueuing thread and the dequeuing thread, where the
138
* enqueueing thread will wake up the dequeuing thread, that thread will
139
* destroy the condvar before the enqueuing thread is done.
140
*/
141
void
142
bqueue_enqueue_flush(bqueue_t *q, void *data, size_t item_size)
143
{
144
bqueue_enqueue_impl(q, data, item_size, B_TRUE);
145
}
146
147
/*
148
* Take the first element off of q. If there are no elements on the queue, wait
149
* until one is put there. Return the removed element.
150
*/
151
void *
152
bqueue_dequeue(bqueue_t *q)
153
{
154
void *ret = list_remove_head(&q->bq_dequeuing_list);
155
if (ret == NULL) {
156
/*
157
* Dequeuing list is empty. Wait for there to be something on
158
* the shared list, then move the entire shared list to the
159
* dequeuing list.
160
*/
161
mutex_enter(&q->bq_lock);
162
while (q->bq_size == 0) {
163
cv_wait_sig(&q->bq_pop_cv, &q->bq_lock);
164
}
165
ASSERT0(q->bq_dequeuing_size);
166
ASSERT(list_is_empty(&q->bq_dequeuing_list));
167
list_move_tail(&q->bq_dequeuing_list, &q->bq_list);
168
q->bq_dequeuing_size = q->bq_size;
169
q->bq_size = 0;
170
cv_broadcast(&q->bq_add_cv);
171
mutex_exit(&q->bq_lock);
172
ret = list_remove_head(&q->bq_dequeuing_list);
173
}
174
q->bq_dequeuing_size -= obj2node(q, ret)->bqn_size;
175
return (ret);
176
}
177
178