Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/ir/mod.rs
8446 views
1
mod dot;
2
mod format;
3
pub mod inputs;
4
mod schema;
5
pub(crate) mod tree_format;
6
7
use std::borrow::Cow;
8
use std::fmt;
9
10
pub use dot::{EscapeLabel, IRDotDisplay, PathsDisplay, ScanSourcesDisplay};
11
pub use format::{ExprIRDisplay, IRDisplay, write_group_by, write_ir_non_recursive};
12
use polars_core::prelude::*;
13
use polars_utils::idx_vec::UnitVec;
14
use polars_utils::unique_id::UniqueId;
15
#[cfg(feature = "ir_serde")]
16
use serde::{Deserialize, Serialize};
17
use strum_macros::IntoStaticStr;
18
19
use self::hive::HivePartitionsDf;
20
use crate::prelude::*;
21
22
#[cfg_attr(feature = "ir_serde", derive(serde::Serialize, serde::Deserialize))]
23
pub struct IRPlan {
24
pub lp_top: Node,
25
pub lp_arena: Arena<IR>,
26
pub expr_arena: Arena<AExpr>,
27
}
28
29
#[derive(Clone, Copy)]
30
pub struct IRPlanRef<'a> {
31
pub lp_top: Node,
32
pub lp_arena: &'a Arena<IR>,
33
pub expr_arena: &'a Arena<AExpr>,
34
}
35
36
/// [`IR`] is a representation of [`DslPlan`] with [`Node`]s which are allocated in an [`Arena`]
37
/// In this IR the logical plan has access to the full dataset.
38
#[derive(Clone, Debug, Default, IntoStaticStr)]
39
#[cfg_attr(feature = "ir_serde", derive(Serialize, Deserialize))]
40
#[strum(serialize_all = "SCREAMING_SNAKE_CASE")]
41
pub enum IR {
42
#[cfg(feature = "python")]
43
PythonScan {
44
options: PythonOptions,
45
},
46
Slice {
47
input: Node,
48
offset: i64,
49
len: IdxSize,
50
},
51
Filter {
52
input: Node,
53
predicate: ExprIR,
54
},
55
Scan {
56
sources: ScanSources,
57
file_info: FileInfo,
58
hive_parts: Option<HivePartitionsDf>,
59
predicate: Option<ExprIR>,
60
/// * None: No skipping
61
/// * Some(v): Files were skipped (filtered out)
62
predicate_file_skip_applied: Option<PredicateFileSkip>,
63
/// schema of the projected file
64
output_schema: Option<SchemaRef>,
65
scan_type: Box<FileScanIR>,
66
/// generic options that can be used for all file types.
67
unified_scan_args: Box<UnifiedScanArgs>,
68
},
69
DataFrameScan {
70
df: Arc<DataFrame>,
71
schema: SchemaRef,
72
// Schema of the projected file
73
// If `None`, no projection is applied
74
output_schema: Option<SchemaRef>,
75
},
76
// Only selects columns (semantically only has row access).
77
// This is a more restricted operation than `Select`.
78
SimpleProjection {
79
input: Node,
80
columns: SchemaRef,
81
},
82
// Polars' `select` operation. This may access full materialized data.
83
Select {
84
input: Node,
85
expr: Vec<ExprIR>,
86
schema: SchemaRef,
87
options: ProjectionOptions,
88
},
89
Sort {
90
input: Node,
91
by_column: Vec<ExprIR>,
92
slice: Option<(i64, usize, Option<DynamicPred>)>,
93
sort_options: SortMultipleOptions,
94
},
95
Cache {
96
input: Node,
97
/// This holds the `Arc<DslPlan>` to guarantee uniqueness.
98
id: UniqueId,
99
},
100
GroupBy {
101
input: Node,
102
keys: Vec<ExprIR>,
103
aggs: Vec<ExprIR>,
104
schema: SchemaRef,
105
maintain_order: bool,
106
options: Arc<GroupbyOptions>,
107
apply: Option<PlanCallback<DataFrame, DataFrame>>,
108
},
109
Join {
110
input_left: Node,
111
input_right: Node,
112
schema: SchemaRef,
113
left_on: Vec<ExprIR>,
114
right_on: Vec<ExprIR>,
115
options: Arc<JoinOptionsIR>,
116
},
117
HStack {
118
input: Node,
119
exprs: Vec<ExprIR>,
120
schema: SchemaRef,
121
options: ProjectionOptions,
122
},
123
Distinct {
124
input: Node,
125
options: DistinctOptionsIR,
126
},
127
MapFunction {
128
input: Node,
129
function: FunctionIR,
130
},
131
Union {
132
inputs: Vec<Node>,
133
options: UnionOptions,
134
},
135
/// Horizontal concatenation
136
/// - Invariant: the names will be unique
137
HConcat {
138
inputs: Vec<Node>,
139
schema: SchemaRef,
140
options: HConcatOptions,
141
},
142
ExtContext {
143
input: Node,
144
contexts: Vec<Node>,
145
schema: SchemaRef,
146
},
147
Sink {
148
input: Node,
149
payload: SinkTypeIR,
150
},
151
/// Node that allows for multiple plans to be executed in parallel with common subplan
152
/// elimination and everything.
153
SinkMultiple {
154
inputs: Vec<Node>,
155
},
156
#[cfg(feature = "merge_sorted")]
157
MergeSorted {
158
input_left: Node,
159
input_right: Node,
160
key: PlSmallStr,
161
},
162
#[default]
163
Invalid,
164
}
165
166
impl IRPlan {
167
pub fn new(top: Node, ir_arena: Arena<IR>, expr_arena: Arena<AExpr>) -> Self {
168
Self {
169
lp_top: top,
170
lp_arena: ir_arena,
171
expr_arena,
172
}
173
}
174
175
pub fn root(&self) -> &IR {
176
self.lp_arena.get(self.lp_top)
177
}
178
179
pub fn as_ref(&self) -> IRPlanRef<'_> {
180
IRPlanRef {
181
lp_top: self.lp_top,
182
lp_arena: &self.lp_arena,
183
expr_arena: &self.expr_arena,
184
}
185
}
186
187
pub fn describe(&self) -> String {
188
self.as_ref().describe()
189
}
190
191
pub fn describe_tree_format(&self) -> String {
192
self.as_ref().describe_tree_format()
193
}
194
195
pub fn display(&self) -> format::IRDisplay<'_> {
196
self.as_ref().display()
197
}
198
199
pub fn display_dot(&self) -> dot::IRDotDisplay<'_> {
200
self.as_ref().display_dot()
201
}
202
}
203
204
impl<'a> IRPlanRef<'a> {
205
pub fn root(self) -> &'a IR {
206
self.lp_arena.get(self.lp_top)
207
}
208
209
pub fn with_root(self, root: Node) -> Self {
210
Self {
211
lp_top: root,
212
lp_arena: self.lp_arena,
213
expr_arena: self.expr_arena,
214
}
215
}
216
217
pub fn display(self) -> format::IRDisplay<'a> {
218
format::IRDisplay::new(self)
219
}
220
221
pub fn display_dot(self) -> dot::IRDotDisplay<'a> {
222
dot::IRDotDisplay::new(self)
223
}
224
225
pub fn describe(self) -> String {
226
self.display().to_string()
227
}
228
229
pub fn describe_tree_format(self) -> String {
230
let mut visitor = tree_format::TreeFmtVisitor::default();
231
tree_format::TreeFmtNode::root_logical_plan(self).traverse(&mut visitor);
232
format!("{visitor:#?}")
233
}
234
}
235
236
impl fmt::Debug for IRPlan {
237
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
238
<format::IRDisplay as fmt::Display>::fmt(&self.display(), f)
239
}
240
}
241
242
impl fmt::Debug for IRPlanRef<'_> {
243
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
244
<format::IRDisplay as fmt::Display>::fmt(&self.display(), f)
245
}
246
}
247
248
#[cfg(test)]
249
mod test {
250
use super::*;
251
252
// skipped for now
253
#[ignore]
254
#[test]
255
fn test_alp_size() {
256
assert!(size_of::<IR>() <= 152);
257
}
258
}
259
260