Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/optimizer/cluster_with_columns.rs
6940 views
1
use std::sync::Arc;
2
3
use arrow::bitmap::MutableBitmap;
4
use polars_core::schema::Schema;
5
use polars_utils::aliases::{InitHashMaps, PlHashMap};
6
use polars_utils::arena::{Arena, Node};
7
use polars_utils::vec::inplace_zip_filtermap;
8
9
use super::aexpr::AExpr;
10
use super::ir::IR;
11
use super::{PlSmallStr, aexpr_to_leaf_names_iter};
12
13
type ColumnMap = PlHashMap<PlSmallStr, usize>;
14
15
fn column_map_finalize_bitset(bitset: &mut MutableBitmap, column_map: &ColumnMap) {
16
assert!(bitset.len() <= column_map.len());
17
18
let size = bitset.len();
19
bitset.extend_constant(column_map.len() - size, false);
20
}
21
22
fn column_map_set(bitset: &mut MutableBitmap, column_map: &mut ColumnMap, column: PlSmallStr) {
23
let size = column_map.len();
24
column_map
25
.entry(column)
26
.and_modify(|idx| bitset.set(*idx, true))
27
.or_insert_with(|| {
28
bitset.push(true);
29
size
30
});
31
}
32
33
pub fn optimize(root: Node, lp_arena: &mut Arena<IR>, expr_arena: &Arena<AExpr>) {
34
let mut ir_stack = Vec::with_capacity(16);
35
ir_stack.push(root);
36
37
// We define these here to reuse the allocations across the loops
38
let mut column_map = ColumnMap::with_capacity(8);
39
let mut input_genset = MutableBitmap::with_capacity(16);
40
let mut current_expr_livesets: Vec<MutableBitmap> = Vec::with_capacity(16);
41
let mut current_liveset = MutableBitmap::with_capacity(16);
42
let mut pushable = MutableBitmap::with_capacity(16);
43
let mut potential_pushable = Vec::with_capacity(4);
44
45
while let Some(current) = ir_stack.pop() {
46
let current_ir = lp_arena.get(current);
47
current_ir.copy_inputs(&mut ir_stack);
48
let IR::HStack { input, .. } = current_ir else {
49
continue;
50
};
51
let input = *input;
52
53
let [current_ir, input_ir] = lp_arena.get_many_mut([current, input]);
54
55
let IR::HStack {
56
input: current_input,
57
exprs: current_exprs,
58
schema: current_schema,
59
options: current_options,
60
} = current_ir
61
else {
62
unreachable!();
63
};
64
let IR::HStack {
65
input: input_input,
66
exprs: input_exprs,
67
schema: input_schema,
68
options: input_options,
69
} = input_ir
70
else {
71
continue;
72
};
73
74
let column_map = &mut column_map;
75
76
// Reuse the allocations of the previous loop
77
column_map.clear();
78
input_genset.clear();
79
current_expr_livesets.clear();
80
current_liveset.clear();
81
pushable.clear();
82
potential_pushable.clear();
83
84
pushable.reserve(current_exprs.len());
85
potential_pushable.reserve(current_exprs.len());
86
87
// @NOTE
88
// We can pushdown any column that utilizes no live columns that are generated in the
89
// input.
90
91
for input_expr in input_exprs.iter() {
92
column_map_set(
93
&mut input_genset,
94
column_map,
95
input_expr.output_name().clone(),
96
);
97
}
98
99
for expr in current_exprs.iter() {
100
let mut liveset = MutableBitmap::from_len_zeroed(column_map.len());
101
102
for live in aexpr_to_leaf_names_iter(expr.node(), expr_arena) {
103
column_map_set(&mut liveset, column_map, live.clone());
104
}
105
106
current_expr_livesets.push(liveset);
107
}
108
109
// Force that column_map is not further mutated from this point on
110
let column_map = column_map as &_;
111
112
column_map_finalize_bitset(&mut input_genset, column_map);
113
114
current_liveset.extend_constant(column_map.len(), false);
115
for expr_liveset in &mut current_expr_livesets {
116
use std::ops::BitOrAssign;
117
column_map_finalize_bitset(expr_liveset, column_map);
118
(&mut current_liveset).bitor_assign(expr_liveset as &_);
119
}
120
121
// Check for every expression in the current WITH_COLUMNS node whether it can be pushed
122
// down or pruned.
123
inplace_zip_filtermap(
124
current_exprs,
125
&mut current_expr_livesets,
126
|mut expr, liveset| {
127
let does_input_assign_column_that_expr_used =
128
input_genset.intersects_with(&liveset);
129
130
if does_input_assign_column_that_expr_used {
131
pushable.push(false);
132
return Some((expr, liveset));
133
}
134
135
let column_name = expr.output_name();
136
let is_pushable = if let Some(idx) = column_map.get(column_name) {
137
let does_input_alias_also_expr = input_genset.get(*idx);
138
let is_alias_live_in_current = current_liveset.get(*idx);
139
140
if does_input_alias_also_expr && !is_alias_live_in_current {
141
// @NOTE: Pruning of re-assigned columns
142
//
143
// We checked if this expression output is also assigned by the input and
144
// that this assignment is not used in the current WITH_COLUMNS.
145
// Consequently, we are free to prune the input's assignment to the output.
146
//
147
// We immediately prune here to simplify the later code.
148
//
149
// @NOTE: Expressions in a `WITH_COLUMNS` cannot alias to the same column.
150
// Otherwise, this would be faulty and would panic.
151
let input_expr = input_exprs
152
.iter_mut()
153
.find(|input_expr| column_name == input_expr.output_name())
154
.expect("No assigning expression for generated column");
155
156
// @NOTE
157
// Since we are reassigning a column and we are pushing to the input, we do
158
// not need to change the schema of the current or input nodes.
159
std::mem::swap(&mut expr, input_expr);
160
return None;
161
}
162
163
// We cannot have multiple assignments to the same column in one WITH_COLUMNS
164
// and we need to make sure that we are not changing the column value that
165
// neighbouring expressions are seeing.
166
167
// @NOTE: In this case it might be possible to push this down if all the
168
// expressions that use the output are also being pushed down.
169
if !does_input_alias_also_expr && is_alias_live_in_current {
170
potential_pushable.push(pushable.len());
171
pushable.push(false);
172
return Some((expr, liveset));
173
}
174
175
!does_input_alias_also_expr && !is_alias_live_in_current
176
} else {
177
true
178
};
179
180
pushable.push(is_pushable);
181
Some((expr, liveset))
182
},
183
);
184
185
debug_assert_eq!(pushable.len(), current_exprs.len());
186
187
// Here we do a last check for expressions to push down.
188
// This will pushdown the expressions that "has an output column that is mentioned by
189
// neighbour columns, but all those neighbours were being pushed down".
190
for candidate in potential_pushable.iter().copied() {
191
let column_name = current_exprs[candidate].output_name();
192
let column_idx = column_map.get(column_name).unwrap();
193
194
current_liveset.clear();
195
current_liveset.extend_constant(column_map.len(), false);
196
for (i, expr_liveset) in current_expr_livesets.iter().enumerate() {
197
if pushable.get(i) || i == candidate {
198
continue;
199
}
200
use std::ops::BitOrAssign;
201
(&mut current_liveset).bitor_assign(expr_liveset as &_);
202
}
203
204
if !current_liveset.get(*column_idx) {
205
pushable.set(candidate, true);
206
}
207
}
208
209
let pushable_set_bits = pushable.set_bits();
210
211
// If all columns are pushable, we can merge the input into the current. This should be
212
// a relatively common case.
213
if pushable_set_bits == pushable.len() {
214
// @NOTE: To keep the schema correct, we reverse the order here. As a
215
// `WITH_COLUMNS` higher up produces later columns. This also allows us not to
216
// have to deal with schemas.
217
input_exprs.extend(std::mem::take(current_exprs));
218
std::mem::swap(current_exprs, input_exprs);
219
220
// Here, we perform the trick where we switch the inputs. This makes it possible to
221
// change the essentially remove the `current` node without knowing the parent of
222
// `current`. Essentially, we move the input node to the current node.
223
*current_input = *input_input;
224
*current_options = current_options.merge_options(input_options);
225
226
// Let us just make this node invalid so we can detect when someone tries to
227
// mention it later.
228
lp_arena.take(input);
229
230
// Since we merged the current and input nodes and the input node might have
231
// optimizations with their input, we loop again on this node.
232
ir_stack.pop();
233
ir_stack.push(current);
234
continue;
235
}
236
237
// There is nothing to push down. Move on.
238
if pushable_set_bits == 0 {
239
continue;
240
}
241
242
let input_schema_inner = Arc::make_mut(input_schema);
243
244
// @NOTE: We don't have to insert a SimpleProjection or redo the `current_schema` if
245
// `pushable` contains only 0..N for some N. We use these two variables to keep track
246
// of this.
247
let mut has_seen_unpushable = false;
248
let mut needs_simple_projection = false;
249
250
input_schema_inner.reserve(pushable_set_bits);
251
input_exprs.reserve(pushable_set_bits);
252
*current_exprs = std::mem::take(current_exprs)
253
.into_iter()
254
.zip(pushable.iter())
255
.filter_map(|(expr, do_pushdown)| {
256
if do_pushdown {
257
needs_simple_projection = has_seen_unpushable;
258
259
let column = expr.output_name().as_ref();
260
// @NOTE: we cannot just use the index here, as there might be renames that sit
261
// earlier in the schema
262
let datatype = current_schema.get(column).unwrap();
263
input_schema_inner.with_column(column.into(), datatype.clone());
264
input_exprs.push(expr);
265
266
None
267
} else {
268
has_seen_unpushable = true;
269
Some(expr)
270
}
271
})
272
.collect();
273
274
let options = current_options.merge_options(input_options);
275
*current_options = options;
276
*input_options = options;
277
278
// @NOTE: Here we add a simple projection to make sure that the output still
279
// has the right schema.
280
if needs_simple_projection {
281
// @NOTE: This may seem stupid, but this way we prioritize the input columns and then
282
// the existing columns which is exactly what we want.
283
let mut new_current_schema = Schema::with_capacity(current_schema.len());
284
new_current_schema.merge_from_ref(input_schema.as_ref());
285
new_current_schema.merge_from_ref(current_schema.as_ref());
286
287
debug_assert_eq!(new_current_schema.len(), current_schema.len());
288
289
let proj_schema = std::mem::replace(current_schema, Arc::new(new_current_schema));
290
291
let moved_current = lp_arena.add(IR::Invalid);
292
let projection = IR::SimpleProjection {
293
input: moved_current,
294
columns: proj_schema,
295
};
296
let current = lp_arena.replace(current, projection);
297
lp_arena.replace(moved_current, current);
298
}
299
}
300
}
301
302