Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/input_independent_select.rs
6939 views
1
use std::sync::Arc;
2
3
use polars_core::POOL;
4
use polars_core::prelude::IntoColumn;
5
use rayon::iter::IntoParallelRefIterator;
6
use rayon::prelude::*;
7
8
use super::compute_node_prelude::*;
9
use crate::expression::StreamExpr;
10
use crate::nodes::in_memory_source::InMemorySourceNode;
11
12
pub enum InputIndependentSelectNode {
13
ToSelect { selectors: Vec<StreamExpr> },
14
Source(InMemorySourceNode),
15
Done,
16
}
17
18
impl InputIndependentSelectNode {
19
pub fn new(selectors: Vec<StreamExpr>) -> Self {
20
Self::ToSelect { selectors }
21
}
22
}
23
24
impl ComputeNode for InputIndependentSelectNode {
25
fn name(&self) -> &str {
26
"input-independent-select"
27
}
28
29
fn update_state(
30
&mut self,
31
recv: &mut [PortState],
32
send: &mut [PortState],
33
state: &StreamingExecutionState,
34
) -> PolarsResult<()> {
35
assert!(recv.is_empty() && send.len() == 1);
36
if send[0] == PortState::Done {
37
*self = Self::Done;
38
return Ok(());
39
}
40
41
POOL.install(|| {
42
if let Self::ToSelect { selectors } = self {
43
let empty_df = DataFrame::empty();
44
let state = ExecutionState::new();
45
let selected: Result<Vec<_>, _> = selectors
46
.par_iter()
47
.map(|selector| {
48
let s = selector.evaluate_blocking(&empty_df, &state)?;
49
PolarsResult::Ok(s.into_column())
50
})
51
.collect();
52
let ret = DataFrame::new_with_broadcast(selected?)?;
53
let src_node = InMemorySourceNode::new(Arc::new(ret), MorselSeq::default());
54
*self = InputIndependentSelectNode::Source(src_node);
55
}
56
PolarsResult::Ok(())
57
})?;
58
59
match self {
60
Self::ToSelect { .. } => unreachable!(),
61
Self::Source(src) => src.update_state(recv, send, state),
62
Self::Done => {
63
send[0] = PortState::Done;
64
Ok(())
65
},
66
}
67
}
68
69
fn spawn<'env, 's>(
70
&'env mut self,
71
scope: &'s TaskScope<'s, 'env>,
72
recv_ports: &mut [Option<RecvPort<'_>>],
73
send_ports: &mut [Option<SendPort<'_>>],
74
state: &'s StreamingExecutionState,
75
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
76
) {
77
assert!(recv_ports.is_empty() && send_ports.len() == 1);
78
let Self::Source(src) = self else {
79
unreachable!()
80
};
81
src.spawn(scope, recv_ports, send_ports, state, join_handles);
82
}
83
}
84
85