Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-mem-engine/src/executors/sink.rs
6940 views
1
use std::borrow::Cow;
2
use std::sync::Arc;
3
4
use polars_core::frame::DataFrame;
5
use polars_core::schema::Schema;
6
use polars_error::PolarsResult;
7
use polars_expr::state::ExecutionState;
8
use polars_plan::dsl::{FileType, SinkTypeIR};
9
use polars_plan::plans::prune::prune;
10
use polars_plan::plans::{AExpr, IR, IRPlan};
11
use polars_utils::arena::{Arena, Node};
12
13
use crate::{Executor, StreamingExecutorBuilder};
14
15
pub struct PartitionedSinkExecutor {
16
name: String,
17
input_exec: Box<dyn Executor>,
18
input_scan_node: Node,
19
plan: IRPlan,
20
builder: StreamingExecutorBuilder,
21
}
22
23
impl PartitionedSinkExecutor {
24
pub fn new(
25
input: Box<dyn Executor>,
26
builder: StreamingExecutorBuilder,
27
root: Node,
28
lp_arena: &mut Arena<IR>,
29
expr_arena: &Arena<AExpr>,
30
) -> Self {
31
// Create a DataFrame scan for injecting the input result
32
let scan = lp_arena.add(IR::DataFrameScan {
33
df: Arc::new(DataFrame::empty()),
34
schema: Arc::new(Schema::default()),
35
output_schema: None,
36
});
37
38
let name = {
39
let IR::Sink {
40
input: sink_input,
41
payload: SinkTypeIR::Partition(part),
42
} = lp_arena.get_mut(root)
43
else {
44
unreachable!();
45
};
46
47
// Set the scan as the sink input
48
*sink_input = scan;
49
50
// Generate a name based on the sink file type
51
format!("sink_{}[partitioned]", sink_name(&part.file_type))
52
};
53
54
// Prune the subplan into separate arenas
55
let mut new_ir_arena = Arena::new();
56
let mut new_expr_arena = Arena::new();
57
let [new_root, new_scan] = prune(
58
&[root, scan],
59
lp_arena,
60
expr_arena,
61
&mut new_ir_arena,
62
&mut new_expr_arena,
63
)
64
.try_into()
65
.unwrap();
66
67
let plan = IRPlan {
68
lp_top: new_root,
69
lp_arena: new_ir_arena,
70
expr_arena: new_expr_arena,
71
};
72
73
Self {
74
name,
75
input_exec: input,
76
input_scan_node: new_scan,
77
plan,
78
builder,
79
}
80
}
81
}
82
83
impl Executor for PartitionedSinkExecutor {
84
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
85
state.should_stop()?;
86
#[cfg(debug_assertions)]
87
{
88
if state.verbose() {
89
eprintln!("run {}", self.name)
90
}
91
}
92
let input_df = self.input_exec.execute(state)?;
93
94
let profile_name = if state.has_node_timer() {
95
Cow::Owned(format!(".{}()", &self.name))
96
} else {
97
Cow::Borrowed("")
98
};
99
100
// Insert the input DataFrame into our DataFrame scan node
101
if let IR::DataFrameScan { df, schema, .. } =
102
self.plan.lp_arena.get_mut(self.input_scan_node)
103
{
104
*schema = input_df.schema().clone();
105
*df = Arc::new(input_df);
106
} else {
107
unreachable!();
108
}
109
110
let mut streaming_exec = (self.builder)(
111
self.plan.lp_top,
112
&mut self.plan.lp_arena,
113
&mut self.plan.expr_arena,
114
)?;
115
116
state
117
.clone()
118
.record(|| streaming_exec.execute(state), profile_name)
119
}
120
}
121
122
pub fn sink_name(file_type: &FileType) -> &'static str {
123
match file_type {
124
#[cfg(feature = "parquet")]
125
FileType::Parquet(_) => "parquet",
126
#[cfg(feature = "ipc")]
127
FileType::Ipc(_) => "ipc",
128
#[cfg(feature = "csv")]
129
FileType::Csv(_) => "csv",
130
#[cfg(feature = "json")]
131
FileType::Json(_) => "json",
132
#[allow(unreachable_patterns)]
133
_ => panic!("enable filetype feature"),
134
}
135
}
136
137