Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/dispatch.rs
7884 views
1
use std::sync::{Arc, Mutex};
2
3
use polars_core::error::PolarsResult;
4
use polars_core::frame::DataFrame;
5
use polars_expr::state::ExecutionState;
6
use polars_mem_engine::Executor;
7
use polars_plan::dsl::SinkTypeIR;
8
use polars_plan::plans::{AExpr, IR};
9
use polars_utils::arena::{Arena, Node};
10
11
pub fn build_streaming_query_executor(
12
node: Node,
13
ir_arena: &mut Arena<IR>,
14
expr_arena: &mut Arena<AExpr>,
15
) -> PolarsResult<Box<dyn Executor>> {
16
let rechunk = match ir_arena.get(node) {
17
IR::Scan {
18
unified_scan_args, ..
19
} => unified_scan_args.rechunk,
20
_ => false,
21
};
22
23
let node = match ir_arena.get(node) {
24
IR::SinkMultiple { .. } => panic!("SinkMultiple not supported"),
25
IR::Sink { .. } => node,
26
_ => ir_arena.add(IR::Sink {
27
input: node,
28
payload: SinkTypeIR::Memory,
29
}),
30
};
31
32
crate::StreamingQuery::build(node, ir_arena, expr_arena)
33
.map(Some)
34
.map(Mutex::new)
35
.map(Arc::new)
36
.map(|x| StreamingQueryExecutor {
37
executor: x,
38
rechunk,
39
})
40
.map(|x| Box::new(x) as Box<dyn Executor>)
41
}
42
43
// Note: Arc/Mutex is because Executor requires Sync, but SlotMap is not Sync.
44
struct StreamingQueryExecutor {
45
executor: Arc<Mutex<Option<crate::StreamingQuery>>>,
46
rechunk: bool,
47
}
48
49
impl Executor for StreamingQueryExecutor {
50
fn execute(&mut self, _cache: &mut ExecutionState) -> PolarsResult<DataFrame> {
51
let mut df = { self.executor.try_lock().unwrap().take() }
52
.expect("unhandled: execute() more than once")
53
.execute()
54
.map(|x| x.unwrap_single())?;
55
56
if self.rechunk {
57
df.as_single_chunk_par();
58
}
59
60
Ok(df)
61
}
62
}
63
64