Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/sys/contrib/ck/include/ck_ring.h
48255 views
1
/*
2
* Copyright 2009-2015 Samy Al Bahra.
3
* All rights reserved.
4
*
5
* Redistribution and use in source and binary forms, with or without
6
* modification, are permitted provided that the following conditions
7
* are met:
8
* 1. Redistributions of source code must retain the above copyright
9
* notice, this list of conditions and the following disclaimer.
10
* 2. Redistributions in binary form must reproduce the above copyright
11
* notice, this list of conditions and the following disclaimer in the
12
* documentation and/or other materials provided with the distribution.
13
*
14
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
* SUCH DAMAGE.
25
*/
26
27
#ifndef CK_RING_H
28
#define CK_RING_H
29
30
#include <ck_cc.h>
31
#include <ck_md.h>
32
#include <ck_pr.h>
33
#include <ck_stdbool.h>
34
#include <ck_string.h>
35
36
/*
37
* Concurrent ring buffer.
38
*/
39
40
struct ck_ring {
41
unsigned int c_head;
42
char pad[CK_MD_CACHELINE - sizeof(unsigned int)];
43
unsigned int p_tail;
44
unsigned int p_head;
45
char _pad[CK_MD_CACHELINE - sizeof(unsigned int) * 2];
46
unsigned int size;
47
unsigned int mask;
48
};
49
typedef struct ck_ring ck_ring_t;
50
51
struct ck_ring_buffer {
52
void *value;
53
};
54
typedef struct ck_ring_buffer ck_ring_buffer_t;
55
56
CK_CC_INLINE static unsigned int
57
ck_ring_size(const struct ck_ring *ring)
58
{
59
unsigned int c, p;
60
61
c = ck_pr_load_uint(&ring->c_head);
62
p = ck_pr_load_uint(&ring->p_tail);
63
return (p - c) & ring->mask;
64
}
65
66
CK_CC_INLINE static unsigned int
67
ck_ring_capacity(const struct ck_ring *ring)
68
{
69
70
return ring->size;
71
}
72
73
/*
74
* This function is only safe to call when there are no concurrent operations
75
* on the ring. This is primarily meant for persistent ck_ring use-cases. The
76
* function returns true if any mutations were performed on the ring.
77
*/
78
CK_CC_INLINE static bool
79
ck_ring_repair(struct ck_ring *ring)
80
{
81
bool r = false;
82
83
if (ring->p_tail != ring->p_head) {
84
ring->p_tail = ring->p_head;
85
r = true;
86
}
87
88
return r;
89
}
90
91
/*
92
* This can be called when no concurrent updates are occurring on the ring
93
* structure to check for consistency. This is primarily meant to be used for
94
* persistent storage of the ring. If this functions returns false, the ring
95
* is in an inconsistent state.
96
*/
97
CK_CC_INLINE static bool
98
ck_ring_valid(const struct ck_ring *ring)
99
{
100
unsigned int size = ring->size;
101
unsigned int c_head = ring->c_head;
102
unsigned int p_head = ring->p_head;
103
104
/* The ring must be a power of 2. */
105
if (size & (size - 1))
106
return false;
107
108
/* The consumer counter must always be smaller than the producer. */
109
if (c_head > p_head)
110
return false;
111
112
/* The producer may only be up to size slots ahead of consumer. */
113
if (p_head - c_head >= size)
114
return false;
115
116
return true;
117
}
118
119
CK_CC_INLINE static void
120
ck_ring_init(struct ck_ring *ring, unsigned int size)
121
{
122
123
ring->size = size;
124
ring->mask = size - 1;
125
ring->p_tail = 0;
126
ring->p_head = 0;
127
ring->c_head = 0;
128
return;
129
}
130
131
/*
132
* The _ck_ring_* namespace is internal only and must not used externally.
133
*/
134
135
/*
136
* This function will return a region of memory to write for the next value
137
* for a single producer.
138
*/
139
CK_CC_FORCE_INLINE static void *
140
_ck_ring_enqueue_reserve_sp(struct ck_ring *ring,
141
void *CK_CC_RESTRICT buffer,
142
unsigned int ts,
143
unsigned int *size)
144
{
145
const unsigned int mask = ring->mask;
146
unsigned int consumer, producer, delta;
147
148
consumer = ck_pr_load_uint(&ring->c_head);
149
producer = ring->p_tail;
150
delta = producer + 1;
151
if (size != NULL)
152
*size = (producer - consumer) & mask;
153
154
if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask)))
155
return NULL;
156
157
return (char *)buffer + ts * (producer & mask);
158
}
159
160
/*
161
* This is to be called to commit and make visible a region of previously
162
* reserved with reverse_sp.
163
*/
164
CK_CC_FORCE_INLINE static void
165
_ck_ring_enqueue_commit_sp(struct ck_ring *ring)
166
{
167
168
ck_pr_fence_store();
169
ck_pr_store_uint(&ring->p_tail, ring->p_tail + 1);
170
return;
171
}
172
173
CK_CC_FORCE_INLINE static bool
174
_ck_ring_enqueue_sp(struct ck_ring *ring,
175
void *CK_CC_RESTRICT buffer,
176
const void *CK_CC_RESTRICT entry,
177
unsigned int ts,
178
unsigned int *size)
179
{
180
const unsigned int mask = ring->mask;
181
unsigned int consumer, producer, delta;
182
183
consumer = ck_pr_load_uint(&ring->c_head);
184
producer = ring->p_tail;
185
delta = producer + 1;
186
if (size != NULL)
187
*size = (producer - consumer) & mask;
188
189
if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask)))
190
return false;
191
192
buffer = (char *)buffer + ts * (producer & mask);
193
memcpy(buffer, entry, ts);
194
195
/*
196
* Make sure to update slot value before indicating
197
* that the slot is available for consumption.
198
*/
199
ck_pr_fence_store();
200
ck_pr_store_uint(&ring->p_tail, delta);
201
return true;
202
}
203
204
CK_CC_FORCE_INLINE static bool
205
_ck_ring_enqueue_sp_size(struct ck_ring *ring,
206
void *CK_CC_RESTRICT buffer,
207
const void *CK_CC_RESTRICT entry,
208
unsigned int ts,
209
unsigned int *size)
210
{
211
unsigned int sz;
212
bool r;
213
214
r = _ck_ring_enqueue_sp(ring, buffer, entry, ts, &sz);
215
*size = sz;
216
return r;
217
}
218
219
CK_CC_FORCE_INLINE static bool
220
_ck_ring_dequeue_sc(struct ck_ring *ring,
221
const void *CK_CC_RESTRICT buffer,
222
void *CK_CC_RESTRICT target,
223
unsigned int size)
224
{
225
const unsigned int mask = ring->mask;
226
unsigned int consumer, producer;
227
228
consumer = ring->c_head;
229
producer = ck_pr_load_uint(&ring->p_tail);
230
231
if (CK_CC_UNLIKELY(consumer == producer))
232
return false;
233
234
/*
235
* Make sure to serialize with respect to our snapshot
236
* of the producer counter.
237
*/
238
ck_pr_fence_load();
239
240
buffer = (const char *)buffer + size * (consumer & mask);
241
memcpy(target, buffer, size);
242
243
/*
244
* Make sure copy is completed with respect to consumer
245
* update.
246
*/
247
ck_pr_fence_store();
248
ck_pr_store_uint(&ring->c_head, consumer + 1);
249
return true;
250
}
251
252
CK_CC_FORCE_INLINE static void *
253
_ck_ring_enqueue_reserve_mp(struct ck_ring *ring,
254
void *buffer,
255
unsigned int ts,
256
unsigned int *ticket,
257
unsigned int *size)
258
{
259
const unsigned int mask = ring->mask;
260
unsigned int producer, consumer, delta;
261
262
producer = ck_pr_load_uint(&ring->p_head);
263
264
for (;;) {
265
ck_pr_fence_load();
266
consumer = ck_pr_load_uint(&ring->c_head);
267
268
delta = producer + 1;
269
270
if (CK_CC_LIKELY((producer - consumer) < mask)) {
271
if (ck_pr_cas_uint_value(&ring->p_head,
272
producer, delta, &producer) == true) {
273
break;
274
}
275
} else {
276
unsigned int new_producer;
277
278
ck_pr_fence_load();
279
new_producer = ck_pr_load_uint(&ring->p_head);
280
281
if (producer == new_producer) {
282
if (size != NULL)
283
*size = (producer - consumer) & mask;
284
285
return false;
286
}
287
288
producer = new_producer;
289
}
290
}
291
292
*ticket = producer;
293
if (size != NULL)
294
*size = (producer - consumer) & mask;
295
296
return (char *)buffer + ts * (producer & mask);
297
}
298
299
CK_CC_FORCE_INLINE static void
300
_ck_ring_enqueue_commit_mp(struct ck_ring *ring, unsigned int producer)
301
{
302
303
while (ck_pr_load_uint(&ring->p_tail) != producer)
304
ck_pr_stall();
305
306
ck_pr_fence_store();
307
ck_pr_store_uint(&ring->p_tail, producer + 1);
308
return;
309
}
310
311
CK_CC_FORCE_INLINE static bool
312
_ck_ring_enqueue_mp(struct ck_ring *ring,
313
void *buffer,
314
const void *entry,
315
unsigned int ts,
316
unsigned int *size)
317
{
318
const unsigned int mask = ring->mask;
319
unsigned int producer, consumer, delta;
320
bool r = true;
321
322
producer = ck_pr_load_uint(&ring->p_head);
323
324
for (;;) {
325
/*
326
* The snapshot of producer must be up to date with respect to
327
* consumer.
328
*/
329
ck_pr_fence_load();
330
consumer = ck_pr_load_uint(&ring->c_head);
331
332
delta = producer + 1;
333
334
/*
335
* Only try to CAS if the producer is not clearly stale (not
336
* less than consumer) and the buffer is definitely not full.
337
*/
338
if (CK_CC_LIKELY((producer - consumer) < mask)) {
339
if (ck_pr_cas_uint_value(&ring->p_head,
340
producer, delta, &producer) == true) {
341
break;
342
}
343
} else {
344
unsigned int new_producer;
345
346
/*
347
* Slow path. Either the buffer is full or we have a
348
* stale snapshot of p_head. Execute a second read of
349
* p_read that must be ordered wrt the snapshot of
350
* c_head.
351
*/
352
ck_pr_fence_load();
353
new_producer = ck_pr_load_uint(&ring->p_head);
354
355
/*
356
* Only fail if we haven't made forward progress in
357
* production: the buffer must have been full when we
358
* read new_producer (or we wrapped around UINT_MAX
359
* during this iteration).
360
*/
361
if (producer == new_producer) {
362
r = false;
363
goto leave;
364
}
365
366
/*
367
* p_head advanced during this iteration. Try again.
368
*/
369
producer = new_producer;
370
}
371
}
372
373
buffer = (char *)buffer + ts * (producer & mask);
374
memcpy(buffer, entry, ts);
375
376
/*
377
* Wait until all concurrent producers have completed writing
378
* their data into the ring buffer.
379
*/
380
while (ck_pr_load_uint(&ring->p_tail) != producer)
381
ck_pr_stall();
382
383
/*
384
* Ensure that copy is completed before updating shared producer
385
* counter.
386
*/
387
ck_pr_fence_store();
388
ck_pr_store_uint(&ring->p_tail, delta);
389
390
leave:
391
if (size != NULL)
392
*size = (producer - consumer) & mask;
393
394
return r;
395
}
396
397
CK_CC_FORCE_INLINE static bool
398
_ck_ring_enqueue_mp_size(struct ck_ring *ring,
399
void *buffer,
400
const void *entry,
401
unsigned int ts,
402
unsigned int *size)
403
{
404
unsigned int sz;
405
bool r;
406
407
r = _ck_ring_enqueue_mp(ring, buffer, entry, ts, &sz);
408
*size = sz;
409
return r;
410
}
411
412
CK_CC_FORCE_INLINE static bool
413
_ck_ring_trydequeue_mc(struct ck_ring *ring,
414
const void *buffer,
415
void *data,
416
unsigned int size)
417
{
418
const unsigned int mask = ring->mask;
419
unsigned int consumer, producer;
420
421
consumer = ck_pr_load_uint(&ring->c_head);
422
ck_pr_fence_load();
423
producer = ck_pr_load_uint(&ring->p_tail);
424
425
if (CK_CC_UNLIKELY(consumer == producer))
426
return false;
427
428
ck_pr_fence_load();
429
430
buffer = (const char *)buffer + size * (consumer & mask);
431
memcpy(data, buffer, size);
432
433
ck_pr_fence_store_atomic();
434
return ck_pr_cas_uint(&ring->c_head, consumer, consumer + 1);
435
}
436
437
CK_CC_FORCE_INLINE static bool
438
_ck_ring_dequeue_mc(struct ck_ring *ring,
439
const void *buffer,
440
void *data,
441
unsigned int ts)
442
{
443
const unsigned int mask = ring->mask;
444
unsigned int consumer, producer;
445
446
consumer = ck_pr_load_uint(&ring->c_head);
447
448
do {
449
const char *target;
450
451
/*
452
* Producer counter must represent state relative to
453
* our latest consumer snapshot.
454
*/
455
ck_pr_fence_load();
456
producer = ck_pr_load_uint(&ring->p_tail);
457
458
if (CK_CC_UNLIKELY(consumer == producer))
459
return false;
460
461
ck_pr_fence_load();
462
463
target = (const char *)buffer + ts * (consumer & mask);
464
memcpy(data, target, ts);
465
466
/* Serialize load with respect to head update. */
467
ck_pr_fence_store_atomic();
468
} while (ck_pr_cas_uint_value(&ring->c_head,
469
consumer,
470
consumer + 1,
471
&consumer) == false);
472
473
return true;
474
}
475
476
/*
477
* The ck_ring_*_spsc namespace is the public interface for interacting with a
478
* ring buffer containing pointers. Correctness is only provided if there is up
479
* to one concurrent consumer and up to one concurrent producer.
480
*/
481
CK_CC_INLINE static bool
482
ck_ring_enqueue_spsc_size(struct ck_ring *ring,
483
struct ck_ring_buffer *buffer,
484
const void *entry,
485
unsigned int *size)
486
{
487
488
return _ck_ring_enqueue_sp_size(ring, buffer, &entry,
489
sizeof(entry), size);
490
}
491
492
CK_CC_INLINE static bool
493
ck_ring_enqueue_spsc(struct ck_ring *ring,
494
struct ck_ring_buffer *buffer,
495
const void *entry)
496
{
497
498
return _ck_ring_enqueue_sp(ring, buffer,
499
&entry, sizeof(entry), NULL);
500
}
501
502
CK_CC_INLINE static void *
503
ck_ring_enqueue_reserve_spsc_size(struct ck_ring *ring,
504
struct ck_ring_buffer *buffer,
505
unsigned int *size)
506
{
507
508
return _ck_ring_enqueue_reserve_sp(ring, buffer, sizeof(void *),
509
size);
510
}
511
512
CK_CC_INLINE static void *
513
ck_ring_enqueue_reserve_spsc(struct ck_ring *ring,
514
struct ck_ring_buffer *buffer)
515
{
516
517
return _ck_ring_enqueue_reserve_sp(ring, buffer, sizeof(void *),
518
NULL);
519
}
520
521
CK_CC_INLINE static void
522
ck_ring_enqueue_commit_spsc(struct ck_ring *ring)
523
{
524
525
_ck_ring_enqueue_commit_sp(ring);
526
return;
527
}
528
529
CK_CC_INLINE static bool
530
ck_ring_dequeue_spsc(struct ck_ring *ring,
531
const struct ck_ring_buffer *buffer,
532
void *data)
533
{
534
535
return _ck_ring_dequeue_sc(ring, buffer,
536
(void **)data, sizeof(void *));
537
}
538
539
/*
540
* The ck_ring_*_mpmc namespace is the public interface for interacting with a
541
* ring buffer containing pointers. Correctness is provided for any number of
542
* producers and consumers.
543
*/
544
CK_CC_INLINE static bool
545
ck_ring_enqueue_mpmc(struct ck_ring *ring,
546
struct ck_ring_buffer *buffer,
547
const void *entry)
548
{
549
550
return _ck_ring_enqueue_mp(ring, buffer, &entry, sizeof(entry), NULL);
551
}
552
553
CK_CC_INLINE static bool
554
ck_ring_enqueue_mpmc_size(struct ck_ring *ring,
555
struct ck_ring_buffer *buffer,
556
const void *entry,
557
unsigned int *size)
558
{
559
560
return _ck_ring_enqueue_mp_size(ring, buffer, &entry, sizeof(entry),
561
size);
562
}
563
564
CK_CC_INLINE static void *
565
ck_ring_enqueue_reserve_mpmc(struct ck_ring *ring,
566
struct ck_ring_buffer *buffer,
567
unsigned int *ticket)
568
{
569
570
return _ck_ring_enqueue_reserve_mp(ring, buffer, sizeof(void *),
571
ticket, NULL);
572
}
573
574
CK_CC_INLINE static void *
575
ck_ring_enqueue_reserve_mpmc_size(struct ck_ring *ring,
576
struct ck_ring_buffer *buffer,
577
unsigned int *ticket,
578
unsigned int *size)
579
{
580
581
return _ck_ring_enqueue_reserve_mp(ring, buffer, sizeof(void *),
582
ticket, size);
583
}
584
585
CK_CC_INLINE static void
586
ck_ring_enqueue_commit_mpmc(struct ck_ring *ring, unsigned int ticket)
587
{
588
589
_ck_ring_enqueue_commit_mp(ring, ticket);
590
return;
591
}
592
593
CK_CC_INLINE static bool
594
ck_ring_trydequeue_mpmc(struct ck_ring *ring,
595
const struct ck_ring_buffer *buffer,
596
void *data)
597
{
598
599
return _ck_ring_trydequeue_mc(ring,
600
buffer, (void **)data, sizeof(void *));
601
}
602
603
CK_CC_INLINE static bool
604
ck_ring_dequeue_mpmc(struct ck_ring *ring,
605
const struct ck_ring_buffer *buffer,
606
void *data)
607
{
608
609
return _ck_ring_dequeue_mc(ring, buffer, (void **)data,
610
sizeof(void *));
611
}
612
613
/*
614
* The ck_ring_*_spmc namespace is the public interface for interacting with a
615
* ring buffer containing pointers. Correctness is provided for any number of
616
* consumers with up to one concurrent producer.
617
*/
618
CK_CC_INLINE static void *
619
ck_ring_enqueue_reserve_spmc_size(struct ck_ring *ring,
620
struct ck_ring_buffer *buffer,
621
unsigned int *size)
622
{
623
624
return _ck_ring_enqueue_reserve_sp(ring, buffer, sizeof(void *), size);
625
}
626
627
CK_CC_INLINE static void *
628
ck_ring_enqueue_reserve_spmc(struct ck_ring *ring,
629
struct ck_ring_buffer *buffer)
630
{
631
632
return _ck_ring_enqueue_reserve_sp(ring, buffer, sizeof(void *), NULL);
633
}
634
635
CK_CC_INLINE static void
636
ck_ring_enqueue_commit_spmc(struct ck_ring *ring)
637
{
638
639
_ck_ring_enqueue_commit_sp(ring);
640
return;
641
}
642
643
CK_CC_INLINE static bool
644
ck_ring_enqueue_spmc_size(struct ck_ring *ring,
645
struct ck_ring_buffer *buffer,
646
const void *entry,
647
unsigned int *size)
648
{
649
650
return _ck_ring_enqueue_sp_size(ring, buffer, &entry,
651
sizeof(entry), size);
652
}
653
654
CK_CC_INLINE static bool
655
ck_ring_enqueue_spmc(struct ck_ring *ring,
656
struct ck_ring_buffer *buffer,
657
const void *entry)
658
{
659
660
return _ck_ring_enqueue_sp(ring, buffer, &entry,
661
sizeof(entry), NULL);
662
}
663
664
CK_CC_INLINE static bool
665
ck_ring_trydequeue_spmc(struct ck_ring *ring,
666
const struct ck_ring_buffer *buffer,
667
void *data)
668
{
669
670
return _ck_ring_trydequeue_mc(ring, buffer, (void **)data, sizeof(void *));
671
}
672
673
CK_CC_INLINE static bool
674
ck_ring_dequeue_spmc(struct ck_ring *ring,
675
const struct ck_ring_buffer *buffer,
676
void *data)
677
{
678
679
return _ck_ring_dequeue_mc(ring, buffer, (void **)data, sizeof(void *));
680
}
681
682
/*
683
* The ck_ring_*_mpsc namespace is the public interface for interacting with a
684
* ring buffer containing pointers. Correctness is provided for any number of
685
* producers with up to one concurrent consumers.
686
*/
687
CK_CC_INLINE static void *
688
ck_ring_enqueue_reserve_mpsc(struct ck_ring *ring,
689
struct ck_ring_buffer *buffer,
690
unsigned int *ticket)
691
{
692
693
return _ck_ring_enqueue_reserve_mp(ring, buffer, sizeof(void *),
694
ticket, NULL);
695
}
696
697
CK_CC_INLINE static void *
698
ck_ring_enqueue_reserve_mpsc_size(struct ck_ring *ring,
699
struct ck_ring_buffer *buffer,
700
unsigned int *ticket,
701
unsigned int *size)
702
{
703
704
return _ck_ring_enqueue_reserve_mp(ring, buffer, sizeof(void *),
705
ticket, size);
706
}
707
708
CK_CC_INLINE static void
709
ck_ring_enqueue_commit_mpsc(struct ck_ring *ring, unsigned int ticket)
710
{
711
712
_ck_ring_enqueue_commit_mp(ring, ticket);
713
return;
714
}
715
716
CK_CC_INLINE static bool
717
ck_ring_enqueue_mpsc(struct ck_ring *ring,
718
struct ck_ring_buffer *buffer,
719
const void *entry)
720
{
721
722
return _ck_ring_enqueue_mp(ring, buffer, &entry,
723
sizeof(entry), NULL);
724
}
725
726
CK_CC_INLINE static bool
727
ck_ring_enqueue_mpsc_size(struct ck_ring *ring,
728
struct ck_ring_buffer *buffer,
729
const void *entry,
730
unsigned int *size)
731
{
732
733
return _ck_ring_enqueue_mp_size(ring, buffer, &entry,
734
sizeof(entry), size);
735
}
736
737
CK_CC_INLINE static bool
738
ck_ring_dequeue_mpsc(struct ck_ring *ring,
739
const struct ck_ring_buffer *buffer,
740
void *data)
741
{
742
743
return _ck_ring_dequeue_sc(ring, buffer, (void **)data,
744
sizeof(void *));
745
}
746
747
/*
748
* CK_RING_PROTOTYPE is used to define a type-safe interface for inlining
749
* values of a particular type in the ring the buffer.
750
*/
751
#define CK_RING_PROTOTYPE(name, type) \
752
CK_CC_INLINE static struct type * \
753
ck_ring_enqueue_reserve_spsc_##name(struct ck_ring *a, \
754
struct type *b) \
755
{ \
756
\
757
return _ck_ring_enqueue_reserve_sp(a, b, \
758
sizeof(struct type), NULL); \
759
} \
760
\
761
CK_CC_INLINE static struct type * \
762
ck_ring_enqueue_reserve_spsc_size_##name(struct ck_ring *a, \
763
struct type *b, \
764
unsigned int *c) \
765
{ \
766
\
767
return _ck_ring_enqueue_reserve_sp(a, b, \
768
sizeof(struct type), c); \
769
} \
770
\
771
CK_CC_INLINE static bool \
772
ck_ring_enqueue_spsc_size_##name(struct ck_ring *a, \
773
struct type *b, \
774
struct type *c, \
775
unsigned int *d) \
776
{ \
777
\
778
return _ck_ring_enqueue_sp_size(a, b, c, \
779
sizeof(struct type), d); \
780
} \
781
\
782
CK_CC_INLINE static bool \
783
ck_ring_enqueue_spsc_##name(struct ck_ring *a, \
784
struct type *b, \
785
struct type *c) \
786
{ \
787
\
788
return _ck_ring_enqueue_sp(a, b, c, \
789
sizeof(struct type), NULL); \
790
} \
791
\
792
CK_CC_INLINE static bool \
793
ck_ring_dequeue_spsc_##name(struct ck_ring *a, \
794
struct type *b, \
795
struct type *c) \
796
{ \
797
\
798
return _ck_ring_dequeue_sc(a, b, c, \
799
sizeof(struct type)); \
800
} \
801
\
802
CK_CC_INLINE static struct type * \
803
ck_ring_enqueue_reserve_spmc_##name(struct ck_ring *a, \
804
struct type *b) \
805
{ \
806
\
807
return _ck_ring_enqueue_reserve_sp(a, b, \
808
sizeof(struct type), NULL); \
809
} \
810
\
811
CK_CC_INLINE static struct type * \
812
ck_ring_enqueue_reserve_spmc_size_##name(struct ck_ring *a, \
813
struct type *b, \
814
unsigned int *c) \
815
{ \
816
\
817
return _ck_ring_enqueue_reserve_sp(a, b, \
818
sizeof(struct type), c); \
819
} \
820
\
821
CK_CC_INLINE static bool \
822
ck_ring_enqueue_spmc_size_##name(struct ck_ring *a, \
823
struct type *b, \
824
struct type *c, \
825
unsigned int *d) \
826
{ \
827
\
828
return _ck_ring_enqueue_sp_size(a, b, c, \
829
sizeof(struct type), d); \
830
} \
831
\
832
CK_CC_INLINE static bool \
833
ck_ring_enqueue_spmc_##name(struct ck_ring *a, \
834
struct type *b, \
835
struct type *c) \
836
{ \
837
\
838
return _ck_ring_enqueue_sp(a, b, c, \
839
sizeof(struct type), NULL); \
840
} \
841
\
842
CK_CC_INLINE static bool \
843
ck_ring_trydequeue_spmc_##name(struct ck_ring *a, \
844
struct type *b, \
845
struct type *c) \
846
{ \
847
\
848
return _ck_ring_trydequeue_mc(a, \
849
b, c, sizeof(struct type)); \
850
} \
851
\
852
CK_CC_INLINE static bool \
853
ck_ring_dequeue_spmc_##name(struct ck_ring *a, \
854
struct type *b, \
855
struct type *c) \
856
{ \
857
\
858
return _ck_ring_dequeue_mc(a, b, c, \
859
sizeof(struct type)); \
860
} \
861
\
862
CK_CC_INLINE static struct type * \
863
ck_ring_enqueue_reserve_mpsc_##name(struct ck_ring *a, \
864
struct type *b, \
865
unsigned int *c) \
866
{ \
867
\
868
return _ck_ring_enqueue_reserve_mp(a, b, \
869
sizeof(struct type), c, NULL); \
870
} \
871
\
872
CK_CC_INLINE static struct type * \
873
ck_ring_enqueue_reserve_mpsc_size_##name(struct ck_ring *a, \
874
struct type *b, \
875
unsigned int *c, \
876
unsigned int *d) \
877
{ \
878
\
879
return _ck_ring_enqueue_reserve_mp(a, b, \
880
sizeof(struct type), c, d); \
881
} \
882
\
883
CK_CC_INLINE static bool \
884
ck_ring_enqueue_mpsc_##name(struct ck_ring *a, \
885
struct type *b, \
886
struct type *c) \
887
{ \
888
\
889
return _ck_ring_enqueue_mp(a, b, c, \
890
sizeof(struct type), NULL); \
891
} \
892
\
893
CK_CC_INLINE static bool \
894
ck_ring_enqueue_mpsc_size_##name(struct ck_ring *a, \
895
struct type *b, \
896
struct type *c, \
897
unsigned int *d) \
898
{ \
899
\
900
return _ck_ring_enqueue_mp_size(a, b, c, \
901
sizeof(struct type), d); \
902
} \
903
\
904
CK_CC_INLINE static bool \
905
ck_ring_dequeue_mpsc_##name(struct ck_ring *a, \
906
struct type *b, \
907
struct type *c) \
908
{ \
909
\
910
return _ck_ring_dequeue_sc(a, b, c, \
911
sizeof(struct type)); \
912
} \
913
\
914
CK_CC_INLINE static struct type * \
915
ck_ring_enqueue_reserve_mpmc_##name(struct ck_ring *a, \
916
struct type *b, \
917
unsigned int *c) \
918
{ \
919
\
920
return _ck_ring_enqueue_reserve_mp(a, b, \
921
sizeof(struct type), c, NULL); \
922
} \
923
\
924
CK_CC_INLINE static struct type * \
925
ck_ring_enqueue_reserve_mpmc_size_##name(struct ck_ring *a, \
926
struct type *b, \
927
unsigned int *c, \
928
unsigned int *d) \
929
{ \
930
\
931
return _ck_ring_enqueue_reserve_mp(a, b, \
932
sizeof(struct type), c, d); \
933
} \
934
\
935
CK_CC_INLINE static bool \
936
ck_ring_enqueue_mpmc_size_##name(struct ck_ring *a, \
937
struct type *b, \
938
struct type *c, \
939
unsigned int *d) \
940
{ \
941
\
942
return _ck_ring_enqueue_mp_size(a, b, c, \
943
sizeof(struct type), d); \
944
} \
945
\
946
CK_CC_INLINE static bool \
947
ck_ring_enqueue_mpmc_##name(struct ck_ring *a, \
948
struct type *b, \
949
struct type *c) \
950
{ \
951
\
952
return _ck_ring_enqueue_mp(a, b, c, \
953
sizeof(struct type), NULL); \
954
} \
955
\
956
CK_CC_INLINE static bool \
957
ck_ring_trydequeue_mpmc_##name(struct ck_ring *a, \
958
struct type *b, \
959
struct type *c) \
960
{ \
961
\
962
return _ck_ring_trydequeue_mc(a, \
963
b, c, sizeof(struct type)); \
964
} \
965
\
966
CK_CC_INLINE static bool \
967
ck_ring_dequeue_mpmc_##name(struct ck_ring *a, \
968
struct type *b, \
969
struct type *c) \
970
{ \
971
\
972
return _ck_ring_dequeue_mc(a, b, c, \
973
sizeof(struct type)); \
974
}
975
976
/*
977
* A single producer with one concurrent consumer.
978
*/
979
#define CK_RING_ENQUEUE_SPSC(name, a, b, c) \
980
ck_ring_enqueue_spsc_##name(a, b, c)
981
#define CK_RING_ENQUEUE_SPSC_SIZE(name, a, b, c, d) \
982
ck_ring_enqueue_spsc_size_##name(a, b, c, d)
983
#define CK_RING_ENQUEUE_RESERVE_SPSC(name, a, b, c) \
984
ck_ring_enqueue_reserve_spsc_##name(a, b, c)
985
#define CK_RING_ENQUEUE_RESERVE_SPSC_SIZE(name, a, b, c, d) \
986
ck_ring_enqueue_reserve_spsc_size_##name(a, b, c, d)
987
#define CK_RING_DEQUEUE_SPSC(name, a, b, c) \
988
ck_ring_dequeue_spsc_##name(a, b, c)
989
990
/*
991
* A single producer with any number of concurrent consumers.
992
*/
993
#define CK_RING_ENQUEUE_SPMC(name, a, b, c) \
994
ck_ring_enqueue_spmc_##name(a, b, c)
995
#define CK_RING_ENQUEUE_SPMC_SIZE(name, a, b, c, d) \
996
ck_ring_enqueue_spmc_size_##name(a, b, c, d)
997
#define CK_RING_ENQUEUE_RESERVE_SPMC(name, a, b, c) \
998
ck_ring_enqueue_reserve_spmc_##name(a, b, c)
999
#define CK_RING_ENQUEUE_RESERVE_SPMC_SIZE(name, a, b, c, d) \
1000
ck_ring_enqueue_reserve_spmc_size_##name(a, b, c, d)
1001
#define CK_RING_TRYDEQUEUE_SPMC(name, a, b, c) \
1002
ck_ring_trydequeue_spmc_##name(a, b, c)
1003
#define CK_RING_DEQUEUE_SPMC(name, a, b, c) \
1004
ck_ring_dequeue_spmc_##name(a, b, c)
1005
1006
/*
1007
* Any number of concurrent producers with up to one
1008
* concurrent consumer.
1009
*/
1010
#define CK_RING_ENQUEUE_MPSC(name, a, b, c) \
1011
ck_ring_enqueue_mpsc_##name(a, b, c)
1012
#define CK_RING_ENQUEUE_MPSC_SIZE(name, a, b, c, d) \
1013
ck_ring_enqueue_mpsc_size_##name(a, b, c, d)
1014
#define CK_RING_ENQUEUE_RESERVE_MPSC(name, a, b, c) \
1015
ck_ring_enqueue_reserve_mpsc_##name(a, b, c)
1016
#define CK_RING_ENQUEUE_RESERVE_MPSC_SIZE(name, a, b, c, d) \
1017
ck_ring_enqueue_reserve_mpsc_size_##name(a, b, c, d)
1018
#define CK_RING_DEQUEUE_MPSC(name, a, b, c) \
1019
ck_ring_dequeue_mpsc_##name(a, b, c)
1020
1021
/*
1022
* Any number of concurrent producers and consumers.
1023
*/
1024
#define CK_RING_ENQUEUE_MPMC(name, a, b, c) \
1025
ck_ring_enqueue_mpmc_##name(a, b, c)
1026
#define CK_RING_ENQUEUE_MPMC_SIZE(name, a, b, c, d) \
1027
ck_ring_enqueue_mpmc_size_##name(a, b, c, d)
1028
#define CK_RING_ENQUEUE_RESERVE_MPMC(name, a, b, c) \
1029
ck_ring_enqueue_reserve_mpmc_##name(a, b, c)
1030
#define CK_RING_ENQUEUE_RESERVE_MPMC_SIZE(name, a, b, c, d) \
1031
ck_ring_enqueue_reserve_mpmc_size_##name(a, b, c, d)
1032
#define CK_RING_TRYDEQUEUE_MPMC(name, a, b, c) \
1033
ck_ring_trydequeue_mpmc_##name(a, b, c)
1034
#define CK_RING_DEQUEUE_MPMC(name, a, b, c) \
1035
ck_ring_dequeue_mpmc_##name(a, b, c)
1036
1037
#endif /* CK_RING_H */
1038
1039