Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/ir/mod.rs
6940 views
1
mod dot;
2
mod format;
3
pub mod inputs;
4
mod schema;
5
pub(crate) mod tree_format;
6
7
use std::borrow::Cow;
8
use std::fmt;
9
10
pub use dot::{EscapeLabel, IRDotDisplay, PathsDisplay, ScanSourcesDisplay};
11
pub use format::{ExprIRDisplay, IRDisplay, write_group_by, write_ir_non_recursive};
12
use polars_core::prelude::*;
13
use polars_utils::idx_vec::UnitVec;
14
use polars_utils::unique_id::UniqueId;
15
#[cfg(feature = "ir_serde")]
16
use serde::{Deserialize, Serialize};
17
use strum_macros::IntoStaticStr;
18
19
use self::hive::HivePartitionsDf;
20
use crate::prelude::*;
21
22
#[cfg_attr(feature = "ir_serde", derive(serde::Serialize, serde::Deserialize))]
23
pub struct IRPlan {
24
pub lp_top: Node,
25
pub lp_arena: Arena<IR>,
26
pub expr_arena: Arena<AExpr>,
27
}
28
29
#[derive(Clone, Copy)]
30
pub struct IRPlanRef<'a> {
31
pub lp_top: Node,
32
pub lp_arena: &'a Arena<IR>,
33
pub expr_arena: &'a Arena<AExpr>,
34
}
35
36
/// [`IR`] is a representation of [`DslPlan`] with [`Node`]s which are allocated in an [`Arena`]
37
/// In this IR the logical plan has access to the full dataset.
38
#[derive(Clone, Debug, Default, IntoStaticStr)]
39
#[cfg_attr(feature = "ir_serde", derive(Serialize, Deserialize))]
40
#[strum(serialize_all = "SCREAMING_SNAKE_CASE")]
41
pub enum IR {
42
#[cfg(feature = "python")]
43
PythonScan {
44
options: PythonOptions,
45
},
46
Slice {
47
input: Node,
48
offset: i64,
49
len: IdxSize,
50
},
51
Filter {
52
input: Node,
53
predicate: ExprIR,
54
},
55
Scan {
56
sources: ScanSources,
57
file_info: FileInfo,
58
hive_parts: Option<HivePartitionsDf>,
59
predicate: Option<ExprIR>,
60
/// schema of the projected file
61
output_schema: Option<SchemaRef>,
62
scan_type: Box<FileScanIR>,
63
/// generic options that can be used for all file types.
64
unified_scan_args: Box<UnifiedScanArgs>,
65
},
66
DataFrameScan {
67
df: Arc<DataFrame>,
68
schema: SchemaRef,
69
// Schema of the projected file
70
// If `None`, no projection is applied
71
output_schema: Option<SchemaRef>,
72
},
73
// Only selects columns (semantically only has row access).
74
// This is a more restricted operation than `Select`.
75
SimpleProjection {
76
input: Node,
77
columns: SchemaRef,
78
},
79
// Polars' `select` operation. This may access full materialized data.
80
Select {
81
input: Node,
82
expr: Vec<ExprIR>,
83
schema: SchemaRef,
84
options: ProjectionOptions,
85
},
86
Sort {
87
input: Node,
88
by_column: Vec<ExprIR>,
89
slice: Option<(i64, usize)>,
90
sort_options: SortMultipleOptions,
91
},
92
Cache {
93
input: Node,
94
/// This holds the `Arc<DslPlan>` to guarantee uniqueness.
95
id: UniqueId,
96
},
97
GroupBy {
98
input: Node,
99
keys: Vec<ExprIR>,
100
aggs: Vec<ExprIR>,
101
schema: SchemaRef,
102
maintain_order: bool,
103
options: Arc<GroupbyOptions>,
104
apply: Option<PlanCallback<DataFrame, DataFrame>>,
105
},
106
Join {
107
input_left: Node,
108
input_right: Node,
109
schema: SchemaRef,
110
left_on: Vec<ExprIR>,
111
right_on: Vec<ExprIR>,
112
options: Arc<JoinOptionsIR>,
113
},
114
HStack {
115
input: Node,
116
exprs: Vec<ExprIR>,
117
schema: SchemaRef,
118
options: ProjectionOptions,
119
},
120
Distinct {
121
input: Node,
122
options: DistinctOptionsIR,
123
},
124
MapFunction {
125
input: Node,
126
function: FunctionIR,
127
},
128
Union {
129
inputs: Vec<Node>,
130
options: UnionOptions,
131
},
132
/// Horizontal concatenation
133
/// - Invariant: the names will be unique
134
HConcat {
135
inputs: Vec<Node>,
136
schema: SchemaRef,
137
options: HConcatOptions,
138
},
139
ExtContext {
140
input: Node,
141
contexts: Vec<Node>,
142
schema: SchemaRef,
143
},
144
Sink {
145
input: Node,
146
payload: SinkTypeIR,
147
},
148
/// Node that allows for multiple plans to be executed in parallel with common subplan
149
/// elimination and everything.
150
SinkMultiple {
151
inputs: Vec<Node>,
152
},
153
#[cfg(feature = "merge_sorted")]
154
MergeSorted {
155
input_left: Node,
156
input_right: Node,
157
key: PlSmallStr,
158
},
159
#[default]
160
Invalid,
161
}
162
163
impl IRPlan {
164
pub fn new(top: Node, ir_arena: Arena<IR>, expr_arena: Arena<AExpr>) -> Self {
165
Self {
166
lp_top: top,
167
lp_arena: ir_arena,
168
expr_arena,
169
}
170
}
171
172
pub fn root(&self) -> &IR {
173
self.lp_arena.get(self.lp_top)
174
}
175
176
pub fn as_ref(&self) -> IRPlanRef<'_> {
177
IRPlanRef {
178
lp_top: self.lp_top,
179
lp_arena: &self.lp_arena,
180
expr_arena: &self.expr_arena,
181
}
182
}
183
184
pub fn describe(&self) -> String {
185
self.as_ref().describe()
186
}
187
188
pub fn describe_tree_format(&self) -> String {
189
self.as_ref().describe_tree_format()
190
}
191
192
pub fn display(&self) -> format::IRDisplay<'_> {
193
self.as_ref().display()
194
}
195
196
pub fn display_dot(&self) -> dot::IRDotDisplay<'_> {
197
self.as_ref().display_dot()
198
}
199
}
200
201
impl<'a> IRPlanRef<'a> {
202
pub fn root(self) -> &'a IR {
203
self.lp_arena.get(self.lp_top)
204
}
205
206
pub fn with_root(self, root: Node) -> Self {
207
Self {
208
lp_top: root,
209
lp_arena: self.lp_arena,
210
expr_arena: self.expr_arena,
211
}
212
}
213
214
pub fn display(self) -> format::IRDisplay<'a> {
215
format::IRDisplay::new(self)
216
}
217
218
pub fn display_dot(self) -> dot::IRDotDisplay<'a> {
219
dot::IRDotDisplay::new(self)
220
}
221
222
pub fn describe(self) -> String {
223
self.display().to_string()
224
}
225
226
pub fn describe_tree_format(self) -> String {
227
let mut visitor = tree_format::TreeFmtVisitor::default();
228
tree_format::TreeFmtNode::root_logical_plan(self).traverse(&mut visitor);
229
format!("{visitor:#?}")
230
}
231
}
232
233
impl fmt::Debug for IRPlan {
234
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
235
<format::IRDisplay as fmt::Display>::fmt(&self.display(), f)
236
}
237
}
238
239
impl fmt::Debug for IRPlanRef<'_> {
240
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
241
<format::IRDisplay as fmt::Display>::fmt(&self.display(), f)
242
}
243
}
244
245
#[cfg(test)]
246
mod test {
247
use super::*;
248
249
// skipped for now
250
#[ignore]
251
#[test]
252
fn test_alp_size() {
253
assert!(size_of::<IR>() <= 152);
254
}
255
}
256
257