Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/simple_projection.rs
6939 views
1
use std::sync::Arc;
2
3
use polars_core::schema::Schema;
4
use polars_utils::pl_str::PlSmallStr;
5
6
use super::compute_node_prelude::*;
7
8
pub struct SimpleProjectionNode {
9
columns: Vec<PlSmallStr>,
10
input_schema: Arc<Schema>,
11
}
12
13
impl SimpleProjectionNode {
14
pub fn new(columns: Vec<PlSmallStr>, input_schema: Arc<Schema>) -> Self {
15
Self {
16
columns,
17
input_schema,
18
}
19
}
20
}
21
22
impl ComputeNode for SimpleProjectionNode {
23
fn name(&self) -> &str {
24
"simple-projection"
25
}
26
27
fn update_state(
28
&mut self,
29
recv: &mut [PortState],
30
send: &mut [PortState],
31
_state: &StreamingExecutionState,
32
) -> PolarsResult<()> {
33
assert!(recv.len() == 1 && send.len() == 1);
34
recv.swap_with_slice(send);
35
Ok(())
36
}
37
38
fn spawn<'env, 's>(
39
&'env mut self,
40
scope: &'s TaskScope<'s, 'env>,
41
recv_ports: &mut [Option<RecvPort<'_>>],
42
send_ports: &mut [Option<SendPort<'_>>],
43
_state: &'s StreamingExecutionState,
44
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
45
) {
46
assert!(recv_ports.len() == 1 && send_ports.len() == 1);
47
let receivers = recv_ports[0].take().unwrap().parallel();
48
let senders = send_ports[0].take().unwrap().parallel();
49
50
for (mut recv, mut send) in receivers.into_iter().zip(senders) {
51
let slf = &*self;
52
join_handles.push(scope.spawn_task(TaskPriority::High, async move {
53
while let Ok(morsel) = recv.recv().await {
54
let morsel = morsel.try_map(|df| {
55
// TODO: can this be unchecked?
56
let check_duplicates = true;
57
df._select_with_schema_impl(
58
slf.columns.as_slice(),
59
&slf.input_schema,
60
check_duplicates,
61
)
62
})?;
63
64
if send.send(morsel).await.is_err() {
65
break;
66
}
67
}
68
69
Ok(())
70
}));
71
}
72
}
73
}
74
75