Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/select.rs
6939 views
1
use std::sync::Arc;
2
3
use polars_core::prelude::IntoColumn;
4
use polars_core::schema::Schema;
5
6
use super::compute_node_prelude::*;
7
use crate::expression::StreamExpr;
8
9
pub struct SelectNode {
10
selectors: Vec<StreamExpr>,
11
schema: Arc<Schema>,
12
extend_original: bool,
13
}
14
15
impl SelectNode {
16
pub fn new(selectors: Vec<StreamExpr>, schema: Arc<Schema>, extend_original: bool) -> Self {
17
Self {
18
selectors,
19
schema,
20
extend_original,
21
}
22
}
23
}
24
25
impl ComputeNode for SelectNode {
26
fn name(&self) -> &str {
27
if self.extend_original {
28
"with-columns"
29
} else {
30
"select"
31
}
32
}
33
34
fn update_state(
35
&mut self,
36
recv: &mut [PortState],
37
send: &mut [PortState],
38
_state: &StreamingExecutionState,
39
) -> PolarsResult<()> {
40
assert!(recv.len() == 1 && send.len() == 1);
41
recv.swap_with_slice(send);
42
Ok(())
43
}
44
45
fn spawn<'env, 's>(
46
&'env mut self,
47
scope: &'s TaskScope<'s, 'env>,
48
recv_ports: &mut [Option<RecvPort<'_>>],
49
send_ports: &mut [Option<SendPort<'_>>],
50
state: &'s StreamingExecutionState,
51
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
52
) {
53
assert!(recv_ports.len() == 1 && send_ports.len() == 1);
54
let receivers = recv_ports[0].take().unwrap().parallel();
55
let senders = send_ports[0].take().unwrap().parallel();
56
57
for (mut recv, mut send) in receivers.into_iter().zip(senders) {
58
let slf = &*self;
59
join_handles.push(scope.spawn_task(TaskPriority::High, async move {
60
while let Ok(morsel) = recv.recv().await {
61
let (df, seq, source_token, consume_token) = morsel.into_inner();
62
let mut selected = Vec::new();
63
for selector in slf.selectors.iter() {
64
let s = selector.evaluate(&df, &state.in_memory_exec_state).await?;
65
selected.push(s.into_column());
66
}
67
68
let ret = if slf.extend_original {
69
let mut out = df;
70
out._add_columns(selected, &slf.schema)?;
71
out
72
} else {
73
DataFrame::new_with_broadcast(selected)?
74
};
75
76
let mut morsel = Morsel::new(ret, seq, source_token);
77
if let Some(token) = consume_token {
78
morsel.set_consume_token(token);
79
}
80
81
if send.send(morsel).await.is_err() {
82
break;
83
}
84
}
85
86
Ok(())
87
}));
88
}
89
}
90
}
91
92