Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sinks/phase.rs
6939 views
1
use std::sync::Arc;
2
use std::sync::atomic::{AtomicBool, Ordering};
3
4
use crate::async_primitives::wait_group::WaitToken;
5
6
/// The outcome of a phase in a task.
7
///
8
/// This indicates whether a task finished (and does not need to be started again) or has stopped
9
/// prematurely. When this is dropped without calling `stop`, it is assumed that the task is
10
/// finished (most likely because it errored).
11
pub struct PhaseOutcome {
12
// This is used to see when phase is finished.
13
#[expect(unused)]
14
consume_token: WaitToken,
15
16
outcome_token: PhaseOutcomeToken,
17
}
18
19
impl PhaseOutcome {
20
pub fn new_shared_wait(consume_token: WaitToken) -> (PhaseOutcomeToken, Self) {
21
let outcome_token = PhaseOutcomeToken::new();
22
(
23
outcome_token.clone(),
24
Self {
25
consume_token,
26
outcome_token,
27
},
28
)
29
}
30
31
/// Phase ended before the task finished and needs to be called again.
32
pub fn stopped(self) {
33
self.outcome_token.stop();
34
}
35
}
36
37
/// Token that contains the outcome of a phase.
38
///
39
/// Namely, this indicates whether a phase finished completely or whether it was stopped before
40
/// that.
41
#[derive(Clone)]
42
pub struct PhaseOutcomeToken {
43
/// - `false` -> finished / panicked
44
/// - `true` -> stopped before finishing
45
stop: Arc<AtomicBool>,
46
}
47
48
impl PhaseOutcomeToken {
49
pub fn new() -> Self {
50
Self {
51
stop: Arc::new(AtomicBool::new(false)),
52
}
53
}
54
55
/// Indicate that the phase was stopped before finishing.
56
pub fn stop(&self) {
57
self.stop.store(true, Ordering::Relaxed);
58
}
59
60
/// Returns whether the phase was stopped before finishing.
61
pub fn was_stopped(&self) -> bool {
62
self.stop.load(Ordering::Relaxed)
63
}
64
65
/// Returns whether the phase was finished completely.
66
pub fn did_finish(&self) -> bool {
67
!self.was_stopped()
68
}
69
}
70
71