Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/map.rs
6939 views
1
use std::sync::Arc;
2
3
use polars_plan::plans::DataFrameUdf;
4
5
use super::compute_node_prelude::*;
6
7
/// A simple mapping node. Assumes the given udf is elementwise.
8
pub struct MapNode {
9
map: Arc<dyn DataFrameUdf>,
10
}
11
12
impl MapNode {
13
pub fn new(map: Arc<dyn DataFrameUdf>) -> Self {
14
Self { map }
15
}
16
}
17
18
impl ComputeNode for MapNode {
19
fn name(&self) -> &str {
20
"map"
21
}
22
23
fn update_state(
24
&mut self,
25
recv: &mut [PortState],
26
send: &mut [PortState],
27
_state: &StreamingExecutionState,
28
) -> PolarsResult<()> {
29
assert!(recv.len() == 1 && send.len() == 1);
30
recv.swap_with_slice(send);
31
Ok(())
32
}
33
34
fn spawn<'env, 's>(
35
&'env mut self,
36
scope: &'s TaskScope<'s, 'env>,
37
recv_ports: &mut [Option<RecvPort<'_>>],
38
send_ports: &mut [Option<SendPort<'_>>],
39
_state: &'s StreamingExecutionState,
40
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
41
) {
42
assert!(recv_ports.len() == 1 && send_ports.len() == 1);
43
let receivers = recv_ports[0].take().unwrap().parallel();
44
let senders = send_ports[0].take().unwrap().parallel();
45
46
for (mut recv, mut send) in receivers.into_iter().zip(senders) {
47
let slf = &*self;
48
join_handles.push(scope.spawn_task(TaskPriority::High, async move {
49
while let Ok(morsel) = recv.recv().await {
50
let morsel = morsel.try_map(|df| slf.map.call_udf(df))?;
51
if send.send(morsel).await.is_err() {
52
break;
53
}
54
}
55
56
Ok(())
57
}));
58
}
59
}
60
}
61
62