/*1* Copyright 2018 Paul Khuong, Google LLC.2* All rights reserved.3*4* Redistribution and use in source and binary forms, with or without5* modification, are permitted provided that the following conditions6* are met:7* 1. Redistributions of source code must retain the above copyright8* notice, this list of conditions and the following disclaimer.9* 2. Redistributions in binary form must reproduce the above copyright10* notice, this list of conditions and the following disclaimer in the11* documentation and/or other materials provided with the distribution.12*13* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND14* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE15* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE16* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE17* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL18* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS19* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)20* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT21* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY22* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF23* SUCH DAMAGE.24*/2526/*27* Overview28* ========29*30* ck_ec implements 32- and 64- bit event counts. Event counts let us31* easily integrate OS-level blocking (e.g., futexes) in lock-free32* protocols. Waiters block conditionally, if the event count's value33* is still equal to some old value.34*35* Event counts come in four variants: 32 and 64 bit (with one bit36* stolen for internal signaling, so 31 and 63 bit counters), and37* single or multiple producers (wakers). Waiters are always multiple38* consumers. The 32 bit variants are smaller, and more efficient,39* especially in single producer mode. The 64 bit variants are larger,40* but practically invulnerable to ABA.41*42* The 32 bit variant is always available. The 64 bit variant is only43* available if CK supports 64-bit atomic operations. Currently,44* specialization for single producer is only implemented for x86 and45* x86-64, on compilers that support GCC extended inline assembly;46* other platforms fall back to the multiple producer code path.47*48* A typical usage pattern is:49*50* 1. On the producer side:51*52* - Make changes to some shared data structure, without involving53* the event count at all.54* - After each change, call ck_ec_inc on the event count. The call55* acts as a write-write barrier, and wakes up any consumer blocked56* on the event count (waiting for new changes).57*58* 2. On the consumer side:59*60* - Snapshot ck_ec_value of the event count. The call acts as a61* read barrier.62* - Read and process the shared data structure.63* - Wait for new changes by calling ck_ec_wait with the snapshot value.64*65* Some data structures may opt for tighter integration with their66* event count. For example, an SPMC ring buffer or disruptor might67* use the event count's value as the write pointer. If the buffer is68* regularly full, it might also make sense to store the read pointer69* in an MP event count.70*71* This event count implementation supports tighter integration in two72* ways.73*74* Producers may opt to increment by an arbitrary value (less than75* INT32_MAX / INT64_MAX), in order to encode, e.g., byte76* offsets. Larger increment values make wraparound more likely, so77* the increments should still be relatively small.78*79* Consumers may pass a predicate to ck_ec_wait_pred. This predicate80* can make `ck_ec_wait_pred` return early, before the event count's81* value changes, and can override the deadline passed to futex_wait.82* This lets consumer block on one eventcount, while optimistically83* looking at other waking conditions.84*85* API Reference86* =============87*88* When compiled as C11 or later, this header defines type-generic89* macros for ck_ec32 and ck_ec64; the reference describes this90* type-generic API.91*92* ck_ec needs additional OS primitives to determine the current time,93* to wait on an address, and to wake all threads waiting on a given94* address. These are defined with fields in a struct ck_ec_ops. Each95* ck_ec_ops may additionally define the number of spin loop96* iterations in the slow path, as well as the initial wait time in97* the internal exponential backoff, the exponential scale factor, and98* the right shift count (< 32).99*100* The ops, in addition to the single/multiple producer flag, are101* encapsulated in a struct ck_ec_mode, passed to most ck_ec102* operations.103*104* ec is a struct ck_ec32 *, or a struct ck_ec64 *.105*106* value is an uint32_t for ck_ec32, and an uint64_t for ck_ec64. It107* never exceeds INT32_MAX and INT64_MAX respectively.108*109* mode is a struct ck_ec_mode *.110*111* deadline is either NULL, or a `const struct timespec *` that will112* be treated as an absolute deadline.113*114* `void ck_ec_init(ec, value)`: initializes the event count to value.115*116* `value ck_ec_value(ec)`: returns the current value of the event117* counter. This read acts as a read (acquire) barrier.118*119* `bool ck_ec_has_waiters(ec)`: returns whether some thread has120* marked the event count as requiring an OS wakeup.121*122* `void ck_ec_inc(ec, mode)`: increments the value of the event123* counter by one. This writes acts as a write barrier. Wakes up124* any waiting thread.125*126* `value ck_ec_add(ec, mode, value)`: increments the event counter by127* `value`, and returns the event counter's previous value. This128* write acts as a write barrier. Wakes up any waiting thread.129*130* `int ck_ec_deadline(struct timespec *new_deadline,131* mode,132* const struct timespec *timeout)`:133* computes a deadline `timeout` away from the current time. If134* timeout is NULL, computes a deadline in the infinite future. The135* resulting deadline is written to `new_deadline`. Returns 0 on136* success, and -1 if ops->gettime failed (without touching errno).137*138* `int ck_ec_wait(ec, mode, value, deadline)`: waits until the event139* counter's value differs from `value`, or, if `deadline` is140* provided and non-NULL, until the current time is after that141* deadline. Use a deadline with tv_sec = 0 for a non-blocking142* execution. Returns 0 if the event counter has changed, and -1 on143* timeout. This function acts as a read (acquire) barrier.144*145* `int ck_ec_wait_pred(ec, mode, value, pred, data, deadline)`: waits146* until the event counter's value differs from `value`, or until147* `pred` returns non-zero, or, if `deadline` is provided and148* non-NULL, until the current time is after that deadline. Use a149* deadline with tv_sec = 0 for a non-blocking execution. Returns 0 if150* the event counter has changed, `pred`'s return value if non-zero,151* and -1 on timeout. This function acts as a read (acquire) barrier.152*153* `pred` is always called as `pred(data, iteration_deadline, now)`,154* where `iteration_deadline` is a timespec of the deadline for this155* exponential backoff iteration, and `now` is the current time. If156* `pred` returns a non-zero value, that value is immediately returned157* to the waiter. Otherwise, `pred` is free to modify158* `iteration_deadline` (moving it further in the future is a bad159* idea).160*161* Implementation notes162* ====================163*164* The multiple producer implementation is a regular locked event165* count, with a single flag bit to denote the need to wake up waiting166* threads.167*168* The single producer specialization is heavily tied to169* [x86-TSO](https://www.cl.cam.ac.uk/~pes20/weakmemory/cacm.pdf), and170* to non-atomic read-modify-write instructions (e.g., `inc mem`);171* these non-atomic RMW let us write to the same memory locations with172* atomic and non-atomic instructions, without suffering from process173* scheduling stalls.174*175* The reason we can mix atomic and non-atomic writes to the `counter`176* word is that every non-atomic write obviates the need for the177* atomically flipped flag bit: we only use non-atomic writes to178* update the event count, and the atomic flag only informs the179* producer that we would like a futex_wake, because of the update.180* We only require the non-atomic RMW counter update to prevent181* preemption from introducing arbitrarily long worst case delays.182*183* Correctness does not rely on the usual ordering argument: in the184* absence of fences, there is no strict ordering between atomic and185* non-atomic writes. The key is instead x86-TSO's guarantee that a186* read is satisfied from the most recent buffered write in the local187* store queue if there is one, or from memory if there is no write to188* that address in the store queue.189*190* x86-TSO's constraint on reads suffices to guarantee that the191* producer will never forget about a counter update. If the last192* update is still queued, the new update will be based on the queued193* value. Otherwise, the new update will be based on the value in194* memory, which may or may not have had its flag flipped. In either195* case, the value of the counter (modulo flag) is correct.196*197* When the producer forwards the counter's value from its store198* queue, the new update might not preserve a flag flip. Any waiter199* thus has to check from time to time to determine if it wasn't200* woken up because the flag bit was silently cleared.201*202* In reality, the store queue in x86-TSO stands for in-flight203* instructions in the chip's out-of-order backend. In the vast204* majority of cases, instructions will only remain in flight for a205* few hundred or thousand of cycles. That's why ck_ec_wait spins on206* the `counter` word for ~100 iterations after flipping its flag bit:207* if the counter hasn't changed after that many iterations, it is208* very likely that the producer's next counter update will observe209* the flag flip.210*211* That's still not a hard guarantee of correctness. Conservatively,212* we can expect that no instruction will remain in flight for more213* than 1 second... if only because some interrupt will have forced214* the chip to store its architectural state in memory, at which point215* an instruction is either fully retired or rolled back. Interrupts,216* particularly the pre-emption timer, are why single-producer updates217* must happen in a single non-atomic read-modify-write instruction.218* Having a single instruction as the critical section means we only219* have to consider the worst-case execution time for that220* instruction. That's easier than doing the same for a pair of221* instructions, which an unlucky pre-emption could delay for222* arbitrarily long.223*224* Thus, after a short spin loop, ck_ec_wait enters an exponential225* backoff loop, where each "sleep" is instead a futex_wait. The226* backoff is only necessary to handle rare cases where the flag flip227* was overwritten after the spin loop. Eventually, more than one228* second will have elapsed since the flag flip, and the sleep timeout229* becomes infinite: since the flag bit has been set for much longer230* than the time for which an instruction may remain in flight, the231* flag will definitely be observed at the next counter update.232*233* The 64 bit ck_ec_wait pulls another trick: futexes only handle 32234* bit ints, so we must treat the 64 bit counter's low 32 bits as an235* int in futex_wait. That's a bit dodgy, but fine in practice, given236* that the OS's futex code will always read whatever value is237* currently in memory: even if the producer thread were to wait on238* its own event count, the syscall and ring transition would empty239* the store queue (the out-of-order execution backend).240*241* Finally, what happens when the producer is migrated to another core242* or otherwise pre-empted? Migration must already incur a barrier, so243* that thread always sees its own writes, so that's safe. As for244* pre-emption, that requires storing the architectural state, which245* means every instruction must either be executed fully or not at246* all when pre-emption happens.247*/248249#ifndef CK_EC_H250#define CK_EC_H251#include <ck_cc.h>252#include <ck_pr.h>253#include <ck_stdbool.h>254#include <ck_stdint.h>255#include <ck_stddef.h>256#include <sys/time.h>257258/*259* If we have ck_pr_faa_64 (and, presumably, ck_pr_load_64), we260* support 63 bit counters.261*/262#ifdef CK_F_PR_FAA_64263#define CK_F_EC64264#endif /* CK_F_PR_FAA_64 */265266/*267* GCC inline assembly lets us exploit non-atomic read-modify-write268* instructions on x86/x86_64 for a fast single-producer mode.269*270* If we CK_F_EC_SP is not defined, CK_EC always uses the slower271* multiple producer code.272*/273#if defined(__GNUC__) && (defined(__i386__) || defined(__x86_64__))274#define CK_F_EC_SP275#endif /* GNUC && (__i386__ || __x86_64__) */276277struct ck_ec_ops;278279struct ck_ec_wait_state {280struct timespec start; /* Time when we entered ck_ec_wait. */281struct timespec now; /* Time now. */282const struct ck_ec_ops *ops;283void *data; /* Opaque pointer for the predicate's internal state. */284285};286287/*288* ck_ec_ops define system-specific functions to get the current time,289* atomically wait on an address if it still has some expected value,290* and to wake all threads waiting on an address.291*292* Each platform is expected to have few (one) opaque pointer to a293* const ops struct, and reuse it for all ck_ec_mode structs.294*/295struct ck_ec_ops {296/* Populates out with the current time. Returns non-zero on failure. */297int (*gettime)(const struct ck_ec_ops *, struct timespec *out);298299/*300* Waits on address if its value is still `expected`. If301* deadline is non-NULL, stops waiting once that deadline is302* reached. May return early for any reason.303*/304void (*wait32)(const struct ck_ec_wait_state *, const uint32_t *,305uint32_t expected, const struct timespec *deadline);306307/*308* Same as wait32, but for a 64 bit counter. Only used if309* CK_F_EC64 is defined.310*311* If underlying blocking primitive only supports 32 bit312* control words, it should be safe to block on the least313* significant half of the 64 bit address.314*/315void (*wait64)(const struct ck_ec_wait_state *, const uint64_t *,316uint64_t expected, const struct timespec *deadline);317318/* Wakes up all threads waiting on address. */319void (*wake32)(const struct ck_ec_ops *, const uint32_t *address);320321/*322* Same as wake32, but for a 64 bit counter. Only used if323* CK_F_EC64 is defined.324*325* When wait64 truncates the control word at address to `only`326* consider its least significant half, wake64 should perform327* any necessary fixup (e.g., on big endian platforms).328*/329void (*wake64)(const struct ck_ec_ops *, const uint64_t *address);330331/*332* Number of iterations for the initial busy wait. 0 defaults333* to 100 (not ABI stable).334*/335uint32_t busy_loop_iter;336337/*338* Delay in nanoseconds for the first iteration of the339* exponential backoff. 0 defaults to 2 ms (not ABI stable).340*/341uint32_t initial_wait_ns;342343/*344* Scale factor for the exponential backoff. 0 defaults to 8x345* (not ABI stable).346*/347uint32_t wait_scale_factor;348349/*350* Right shift count for the exponential backoff. The update351* after each iteration is352* wait_ns = (wait_ns * wait_scale_factor) >> wait_shift_count,353* until one second has elapsed. After that, the deadline goes354* to infinity.355*/356uint32_t wait_shift_count;357};358359/*360* ck_ec_mode wraps the ops table, and informs the fast path whether361* it should attempt to specialize for single producer mode.362*363* mode structs are expected to be exposed by value, e.g.,364*365* extern const struct ck_ec_ops system_ec_ops;366*367* static const struct ck_ec_mode ec_sp = {368* .ops = &system_ec_ops,369* .single_producer = true370* };371*372* static const struct ck_ec_mode ec_mp = {373* .ops = &system_ec_ops,374* .single_producer = false375* };376*377* ck_ec_mode structs are only passed to inline functions defined in378* this header, and never escape to their slow paths, so they should379* not result in any object file size increase.380*/381struct ck_ec_mode {382const struct ck_ec_ops *ops;383/*384* If single_producer is true, the event count has a unique385* incrementer. The implementation will specialize ck_ec_inc386* and ck_ec_add if possible (if CK_F_EC_SP is defined).387*/388bool single_producer;389};390391struct ck_ec32 {392/* Flag is "sign" bit, value in bits 0:30. */393uint32_t counter;394};395396typedef struct ck_ec32 ck_ec32_t;397398#ifdef CK_F_EC64399struct ck_ec64 {400/*401* Flag is bottom bit, value in bits 1:63. Eventcount only402* works on x86-64 (i.e., little endian), so the futex int403* lies in the first 4 (bottom) bytes.404*/405uint64_t counter;406};407408typedef struct ck_ec64 ck_ec64_t;409#endif /* CK_F_EC64 */410411#define CK_EC_INITIALIZER { .counter = 0 }412413/*414* Initializes the event count to `value`. The value must not415* exceed INT32_MAX.416*/417static void ck_ec32_init(struct ck_ec32 *ec, uint32_t value);418419#ifndef CK_F_EC64420#define ck_ec_init ck_ec32_init421#else422/*423* Initializes the event count to `value`. The value must not424* exceed INT64_MAX.425*/426static void ck_ec64_init(struct ck_ec64 *ec, uint64_t value);427428#if __STDC_VERSION__ >= 201112L429#define ck_ec_init(EC, VALUE) \430(_Generic(*(EC), \431struct ck_ec32 : ck_ec32_init, \432struct ck_ec64 : ck_ec64_init)((EC), (VALUE)))433#endif /* __STDC_VERSION__ */434#endif /* CK_F_EC64 */435436/*437* Returns the counter value in the event count. The value is at most438* INT32_MAX.439*/440static uint32_t ck_ec32_value(const struct ck_ec32* ec);441442#ifndef CK_F_EC64443#define ck_ec_value ck_ec32_value444#else445/*446* Returns the counter value in the event count. The value is at most447* INT64_MAX.448*/449static uint64_t ck_ec64_value(const struct ck_ec64* ec);450451#if __STDC_VERSION__ >= 201112L452#define ck_ec_value(EC) \453(_Generic(*(EC), \454struct ck_ec32 : ck_ec32_value, \455struct ck_ec64 : ck_ec64_value)((EC)))456#endif /* __STDC_VERSION__ */457#endif /* CK_F_EC64 */458459/*460* Returns whether there may be slow pathed waiters that need an461* explicit OS wakeup for this event count.462*/463static bool ck_ec32_has_waiters(const struct ck_ec32 *ec);464465#ifndef CK_F_EC64466#define ck_ec_has_waiters ck_ec32_has_waiters467#else468static bool ck_ec64_has_waiters(const struct ck_ec64 *ec);469470#if __STDC_VERSION__ >= 201112L471#define ck_ec_has_waiters(EC) \472(_Generic(*(EC), \473struct ck_ec32 : ck_ec32_has_waiters, \474struct ck_ec64 : ck_ec64_has_waiters)((EC)))475#endif /* __STDC_VERSION__ */476#endif /* CK_F_EC64 */477478/*479* Increments the counter value in the event count by one, and wakes480* up any waiter.481*/482static void ck_ec32_inc(struct ck_ec32 *ec, const struct ck_ec_mode *mode);483484#ifndef CK_F_EC64485#define ck_ec_inc ck_ec32_inc486#else487static void ck_ec64_inc(struct ck_ec64 *ec, const struct ck_ec_mode *mode);488489#if __STDC_VERSION__ >= 201112L490#define ck_ec_inc(EC, MODE) \491(_Generic(*(EC), \492struct ck_ec32 : ck_ec32_inc, \493struct ck_ec64 : ck_ec64_inc)((EC), (MODE)))494#endif /* __STDC_VERSION__ */495#endif /* CK_F_EC64 */496497/*498* Increments the counter value in the event count by delta, wakes499* up any waiter, and returns the previous counter value.500*/501static uint32_t ck_ec32_add(struct ck_ec32 *ec,502const struct ck_ec_mode *mode,503uint32_t delta);504505#ifndef CK_F_EC64506#define ck_ec_add ck_ec32_add507#else508static uint64_t ck_ec64_add(struct ck_ec64 *ec,509const struct ck_ec_mode *mode,510uint64_t delta);511512#if __STDC_VERSION__ >= 201112L513#define ck_ec_add(EC, MODE, DELTA) \514(_Generic(*(EC), \515struct ck_ec32 : ck_ec32_add, \516struct ck_ec64 : ck_ec64_add)((EC), (MODE), (DELTA)))517#endif /* __STDC_VERSION__ */518#endif /* CK_F_EC64 */519520/*521* Populates `new_deadline` with a deadline `timeout` in the future.522* Returns 0 on success, and -1 if clock_gettime failed, in which523* case errno is left as is.524*/525static int ck_ec_deadline(struct timespec *new_deadline,526const struct ck_ec_mode *mode,527const struct timespec *timeout);528529/*530* Waits until the counter value in the event count differs from531* old_value, or, if deadline is non-NULL, until CLOCK_MONOTONIC is532* past the deadline.533*534* Returns 0 on success, and -1 on timeout.535*/536static int ck_ec32_wait(struct ck_ec32 *ec,537const struct ck_ec_mode *mode,538uint32_t old_value,539const struct timespec *deadline);540541#ifndef CK_F_EC64542#define ck_ec_wait ck_ec32_wait543#else544static int ck_ec64_wait(struct ck_ec64 *ec,545const struct ck_ec_mode *mode,546uint64_t old_value,547const struct timespec *deadline);548549#if __STDC_VERSION__ >= 201112L550#define ck_ec_wait(EC, MODE, OLD_VALUE, DEADLINE) \551(_Generic(*(EC), \552struct ck_ec32 : ck_ec32_wait, \553struct ck_ec64 : ck_ec64_wait)((EC), (MODE), \554(OLD_VALUE), (DEADLINE)))555556#endif /* __STDC_VERSION__ */557#endif /* CK_F_EC64 */558559/*560* Waits until the counter value in the event count differs from561* old_value, pred returns non-zero, or, if deadline is non-NULL,562* until CLOCK_MONOTONIC is past the deadline.563*564* Returns 0 on success, -1 on timeout, and the return value of pred565* if it returns non-zero.566*567* A NULL pred represents a function that always returns 0.568*/569static int ck_ec32_wait_pred(struct ck_ec32 *ec,570const struct ck_ec_mode *mode,571uint32_t old_value,572int (*pred)(const struct ck_ec_wait_state *,573struct timespec *deadline),574void *data,575const struct timespec *deadline);576577#ifndef CK_F_EC64578#define ck_ec_wait_pred ck_ec32_wait_pred579#else580static int ck_ec64_wait_pred(struct ck_ec64 *ec,581const struct ck_ec_mode *mode,582uint64_t old_value,583int (*pred)(const struct ck_ec_wait_state *,584struct timespec *deadline),585void *data,586const struct timespec *deadline);587588#if __STDC_VERSION__ >= 201112L589#define ck_ec_wait_pred(EC, MODE, OLD_VALUE, PRED, DATA, DEADLINE) \590(_Generic(*(EC), \591struct ck_ec32 : ck_ec32_wait_pred, \592struct ck_ec64 : ck_ec64_wait_pred) \593((EC), (MODE), (OLD_VALUE), (PRED), (DATA), (DEADLINE)))594#endif /* __STDC_VERSION__ */595#endif /* CK_F_EC64 */596597/*598* Inline implementation details. 32 bit first, then 64 bit599* conditionally.600*/601CK_CC_FORCE_INLINE void ck_ec32_init(struct ck_ec32 *ec, uint32_t value)602{603ec->counter = value & ~(1UL << 31);604return;605}606607CK_CC_FORCE_INLINE uint32_t ck_ec32_value(const struct ck_ec32 *ec)608{609uint32_t ret = ck_pr_load_32(&ec->counter) & ~(1UL << 31);610611ck_pr_fence_acquire();612return ret;613}614615CK_CC_FORCE_INLINE bool ck_ec32_has_waiters(const struct ck_ec32 *ec)616{617return ck_pr_load_32(&ec->counter) & (1UL << 31);618}619620/* Slow path for ck_ec{32,64}_{inc,add} */621void ck_ec32_wake(struct ck_ec32 *ec, const struct ck_ec_ops *ops);622623CK_CC_FORCE_INLINE void ck_ec32_inc(struct ck_ec32 *ec,624const struct ck_ec_mode *mode)625{626#if !defined(CK_F_EC_SP)627/* Nothing to specialize if we don't have EC_SP. */628ck_ec32_add(ec, mode, 1);629return;630#else631char flagged;632633#if __GNUC__ >= 6634/*635* We don't want to wake if the sign bit is 0. We do want to636* wake if the sign bit just flipped from 1 to 0. We don't637* care what happens when our increment caused the sign bit to638* flip from 0 to 1 (that's once per 2^31 increment).639*640* This leaves us with four cases:641*642* old sign bit | new sign bit | SF | OF | ZF643* -------------------------------------------644* 0 | 0 | 0 | 0 | ?645* 0 | 1 | 1 | 0 | ?646* 1 | 1 | 1 | 0 | ?647* 1 | 0 | 0 | 0 | 1648*649* In the first case, we don't want to hit ck_ec32_wake. In650* the last two cases, we do want to call ck_ec32_wake. In the651* second case, we don't care, so we arbitrarily choose to652* call ck_ec32_wake.653*654* The "le" condition checks if SF != OF, or ZF == 1, which655* meets our requirements.656*/657#define CK_EC32_INC_ASM(PREFIX) \658__asm__ volatile(PREFIX " incl %0" \659: "+m"(ec->counter), "=@ccle"(flagged) \660:: "cc", "memory")661#else662#define CK_EC32_INC_ASM(PREFIX) \663__asm__ volatile(PREFIX " incl %0; setle %1" \664: "+m"(ec->counter), "=r"(flagged) \665:: "cc", "memory")666#endif /* __GNUC__ */667668if (mode->single_producer == true) {669ck_pr_fence_store();670CK_EC32_INC_ASM("");671} else {672ck_pr_fence_store_atomic();673CK_EC32_INC_ASM("lock");674}675#undef CK_EC32_INC_ASM676677if (CK_CC_UNLIKELY(flagged)) {678ck_ec32_wake(ec, mode->ops);679}680681return;682#endif /* CK_F_EC_SP */683}684685CK_CC_FORCE_INLINE uint32_t ck_ec32_add_epilogue(struct ck_ec32 *ec,686const struct ck_ec_mode *mode,687uint32_t old)688{689const uint32_t flag_mask = 1U << 31;690uint32_t ret;691692ret = old & ~flag_mask;693/* These two only differ if the flag bit is set. */694if (CK_CC_UNLIKELY(old != ret)) {695ck_ec32_wake(ec, mode->ops);696}697698return ret;699}700701static CK_CC_INLINE uint32_t ck_ec32_add_mp(struct ck_ec32 *ec,702const struct ck_ec_mode *mode,703uint32_t delta)704{705uint32_t old;706707ck_pr_fence_store_atomic();708old = ck_pr_faa_32(&ec->counter, delta);709return ck_ec32_add_epilogue(ec, mode, old);710}711712#ifdef CK_F_EC_SP713static CK_CC_INLINE uint32_t ck_ec32_add_sp(struct ck_ec32 *ec,714const struct ck_ec_mode *mode,715uint32_t delta)716{717uint32_t old;718719/*720* Correctness of this racy write depends on actually721* having an update to write. Exit here if the update722* is a no-op.723*/724if (CK_CC_UNLIKELY(delta == 0)) {725return ck_ec32_value(ec);726}727728ck_pr_fence_store();729old = delta;730__asm__ volatile("xaddl %1, %0"731: "+m"(ec->counter), "+r"(old)732:: "cc", "memory");733return ck_ec32_add_epilogue(ec, mode, old);734}735#endif /* CK_F_EC_SP */736737CK_CC_FORCE_INLINE uint32_t ck_ec32_add(struct ck_ec32 *ec,738const struct ck_ec_mode *mode,739uint32_t delta)740{741#ifdef CK_F_EC_SP742if (mode->single_producer == true) {743return ck_ec32_add_sp(ec, mode, delta);744}745#endif746747return ck_ec32_add_mp(ec, mode, delta);748}749750int ck_ec_deadline_impl(struct timespec *new_deadline,751const struct ck_ec_ops *ops,752const struct timespec *timeout);753754CK_CC_FORCE_INLINE int ck_ec_deadline(struct timespec *new_deadline,755const struct ck_ec_mode *mode,756const struct timespec *timeout)757{758return ck_ec_deadline_impl(new_deadline, mode->ops, timeout);759}760761762int ck_ec32_wait_slow(struct ck_ec32 *ec,763const struct ck_ec_ops *ops,764uint32_t old_value,765const struct timespec *deadline);766767CK_CC_FORCE_INLINE int ck_ec32_wait(struct ck_ec32 *ec,768const struct ck_ec_mode *mode,769uint32_t old_value,770const struct timespec *deadline)771{772if (ck_ec32_value(ec) != old_value) {773return 0;774}775776return ck_ec32_wait_slow(ec, mode->ops, old_value, deadline);777}778779int ck_ec32_wait_pred_slow(struct ck_ec32 *ec,780const struct ck_ec_ops *ops,781uint32_t old_value,782int (*pred)(const struct ck_ec_wait_state *state,783struct timespec *deadline),784void *data,785const struct timespec *deadline);786787CK_CC_FORCE_INLINE int788ck_ec32_wait_pred(struct ck_ec32 *ec,789const struct ck_ec_mode *mode,790uint32_t old_value,791int (*pred)(const struct ck_ec_wait_state *state,792struct timespec *deadline),793void *data,794const struct timespec *deadline)795{796if (ck_ec32_value(ec) != old_value) {797return 0;798}799800return ck_ec32_wait_pred_slow(ec, mode->ops, old_value,801pred, data, deadline);802}803804#ifdef CK_F_EC64805CK_CC_FORCE_INLINE void ck_ec64_init(struct ck_ec64 *ec, uint64_t value)806{807ec->counter = value << 1;808return;809}810811CK_CC_FORCE_INLINE uint64_t ck_ec64_value(const struct ck_ec64 *ec)812{813uint64_t ret = ck_pr_load_64(&ec->counter) >> 1;814815ck_pr_fence_acquire();816return ret;817}818819CK_CC_FORCE_INLINE bool ck_ec64_has_waiters(const struct ck_ec64 *ec)820{821return ck_pr_load_64(&ec->counter) & 1;822}823824void ck_ec64_wake(struct ck_ec64 *ec, const struct ck_ec_ops *ops);825826CK_CC_FORCE_INLINE void ck_ec64_inc(struct ck_ec64 *ec,827const struct ck_ec_mode *mode)828{829/* We always xadd, so there's no special optimization here. */830(void)ck_ec64_add(ec, mode, 1);831return;832}833834CK_CC_FORCE_INLINE uint64_t ck_ec_add64_epilogue(struct ck_ec64 *ec,835const struct ck_ec_mode *mode,836uint64_t old)837{838uint64_t ret = old >> 1;839840if (CK_CC_UNLIKELY(old & 1)) {841ck_ec64_wake(ec, mode->ops);842}843844return ret;845}846847static CK_CC_INLINE uint64_t ck_ec64_add_mp(struct ck_ec64 *ec,848const struct ck_ec_mode *mode,849uint64_t delta)850{851uint64_t inc = 2 * delta; /* The low bit is the flag bit. */852853ck_pr_fence_store_atomic();854return ck_ec_add64_epilogue(ec, mode, ck_pr_faa_64(&ec->counter, inc));855}856857#ifdef CK_F_EC_SP858/* Single-producer specialisation. */859static CK_CC_INLINE uint64_t ck_ec64_add_sp(struct ck_ec64 *ec,860const struct ck_ec_mode *mode,861uint64_t delta)862{863uint64_t old;864865/*866* Correctness of this racy write depends on actually867* having an update to write. Exit here if the update868* is a no-op.869*/870if (CK_CC_UNLIKELY(delta == 0)) {871return ck_ec64_value(ec);872}873874ck_pr_fence_store();875old = 2 * delta; /* The low bit is the flag bit. */876__asm__ volatile("xaddq %1, %0"877: "+m"(ec->counter), "+r"(old)878:: "cc", "memory");879return ck_ec_add64_epilogue(ec, mode, old);880}881#endif /* CK_F_EC_SP */882883/*884* Dispatch on mode->single_producer in this FORCE_INLINE function:885* the end result is always small, but not all compilers have enough886* foresight to inline and get the reduction.887*/888CK_CC_FORCE_INLINE uint64_t ck_ec64_add(struct ck_ec64 *ec,889const struct ck_ec_mode *mode,890uint64_t delta)891{892#ifdef CK_F_EC_SP893if (mode->single_producer == true) {894return ck_ec64_add_sp(ec, mode, delta);895}896#endif897898return ck_ec64_add_mp(ec, mode, delta);899}900901int ck_ec64_wait_slow(struct ck_ec64 *ec,902const struct ck_ec_ops *ops,903uint64_t old_value,904const struct timespec *deadline);905906CK_CC_FORCE_INLINE int ck_ec64_wait(struct ck_ec64 *ec,907const struct ck_ec_mode *mode,908uint64_t old_value,909const struct timespec *deadline)910{911if (ck_ec64_value(ec) != old_value) {912return 0;913}914915return ck_ec64_wait_slow(ec, mode->ops, old_value, deadline);916}917918int ck_ec64_wait_pred_slow(struct ck_ec64 *ec,919const struct ck_ec_ops *ops,920uint64_t old_value,921int (*pred)(const struct ck_ec_wait_state *state,922struct timespec *deadline),923void *data,924const struct timespec *deadline);925926927CK_CC_FORCE_INLINE int928ck_ec64_wait_pred(struct ck_ec64 *ec,929const struct ck_ec_mode *mode,930uint64_t old_value,931int (*pred)(const struct ck_ec_wait_state *state,932struct timespec *deadline),933void *data,934const struct timespec *deadline)935{936if (ck_ec64_value(ec) != old_value) {937return 0;938}939940return ck_ec64_wait_pred_slow(ec, mode->ops, old_value,941pred, data, deadline);942}943#endif /* CK_F_EC64 */944#endif /* !CK_EC_H */945946947