Path: blob/main/crates/polars-stream/src/nodes/io_sinks/phase.rs
6939 views
use std::sync::Arc;1use std::sync::atomic::{AtomicBool, Ordering};23use crate::async_primitives::wait_group::WaitToken;45/// The outcome of a phase in a task.6///7/// This indicates whether a task finished (and does not need to be started again) or has stopped8/// prematurely. When this is dropped without calling `stop`, it is assumed that the task is9/// finished (most likely because it errored).10pub struct PhaseOutcome {11// This is used to see when phase is finished.12#[expect(unused)]13consume_token: WaitToken,1415outcome_token: PhaseOutcomeToken,16}1718impl PhaseOutcome {19pub fn new_shared_wait(consume_token: WaitToken) -> (PhaseOutcomeToken, Self) {20let outcome_token = PhaseOutcomeToken::new();21(22outcome_token.clone(),23Self {24consume_token,25outcome_token,26},27)28}2930/// Phase ended before the task finished and needs to be called again.31pub fn stopped(self) {32self.outcome_token.stop();33}34}3536/// Token that contains the outcome of a phase.37///38/// Namely, this indicates whether a phase finished completely or whether it was stopped before39/// that.40#[derive(Clone)]41pub struct PhaseOutcomeToken {42/// - `false` -> finished / panicked43/// - `true` -> stopped before finishing44stop: Arc<AtomicBool>,45}4647impl PhaseOutcomeToken {48pub fn new() -> Self {49Self {50stop: Arc::new(AtomicBool::new(false)),51}52}5354/// Indicate that the phase was stopped before finishing.55pub fn stop(&self) {56self.stop.store(true, Ordering::Relaxed);57}5859/// Returns whether the phase was stopped before finishing.60pub fn was_stopped(&self) -> bool {61self.stop.load(Ordering::Relaxed)62}6364/// Returns whether the phase was finished completely.65pub fn did_finish(&self) -> bool {66!self.was_stopped()67}68}697071