Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/optimizer/set_order.rs
6940 views
1
//! Pass to obtain and optimize using exhaustive row-order information.
2
//!
3
//! This pass attaches order information to all the IR node input and output ports.
4
//!
5
//! The pass performs two passes over the IR graph. First, it assigns and pushes ordering down from
6
//! the sinks to the leaves. Second, it pulls those orderings back up from the leaves to the sinks.
7
//! The two passes weaken order guarantees and simplify IR nodes where possible.
8
//!
9
//! When the two passes are done, we are left with a map from all the nodes to `PortOrder` which
10
//! contains the input and output port ordering information.
11
12
use std::sync::Arc;
13
14
use polars_core::frame::UniqueKeepStrategy;
15
use polars_core::prelude::PlHashMap;
16
use polars_ops::frame::{JoinType, MaintainOrderJoin};
17
use polars_utils::arena::{Arena, Node};
18
use polars_utils::idx_vec::UnitVec;
19
use polars_utils::unique_id::UniqueId;
20
21
use super::IR;
22
use crate::dsl::{SinkTypeIR, UnionOptions};
23
use crate::plans::ir::inputs::Inputs;
24
use crate::plans::{AExpr, is_order_sensitive_amortized, is_scalar_ae};
25
26
#[derive(Debug, Clone, Copy)]
27
pub enum InputOrder {
28
/// The input may receive data in an undefined order.
29
Unordered,
30
/// The input may propagate ordering into one or more of its outputs.
31
Preserving,
32
/// The input observes ordering and may propagate ordering into one or more of its outputs.
33
Observing,
34
/// The input observes and terminates ordering.
35
Consuming,
36
}
37
38
/// The ordering of the input and output ports of an IR node.
39
///
40
/// This gives information about how row ordering may be received, observed and passed an IR node.
41
///
42
/// Some general rules:
43
/// - An input or output being `Unordered` signifies that on that port can receive rows in any
44
/// order i.e. a shuffle could be inserted and the result would still be correct.
45
/// - If any input is `Observing` or `Consuming`, it is important that the rows on this input are
46
/// received in the order specified by the input.
47
/// - If any output is ordered it is able to propgate on the ordering from any input that is
48
/// `Preserving` or `Observing`. Conversely, if no output is ordered or no input is `Preserving`
49
/// or `Observing`, no input order may be propagated to any of the outputs.
50
#[derive(Debug, Clone)]
51
pub struct PortOrder {
52
pub inputs: UnitVec<InputOrder>,
53
pub output_ordered: UnitVec<bool>,
54
}
55
56
impl PortOrder {
57
pub fn new(
58
inputs: impl IntoIterator<Item = InputOrder>,
59
output_ordered: impl IntoIterator<Item = bool>,
60
) -> Self {
61
Self {
62
inputs: inputs.into_iter().collect(),
63
output_ordered: output_ordered.into_iter().collect(),
64
}
65
}
66
67
fn set_unordered_output(&mut self) {
68
self.output_ordered.iter_mut().for_each(|o| *o = false);
69
}
70
}
71
72
/// Remove ordering from both sides if either side has an undefined order.
73
fn simplify_edge(tx: bool, rx: InputOrder) -> (bool, InputOrder) {
74
use InputOrder as I;
75
match (tx, rx) {
76
(false, _) | (_, I::Unordered) => (false, I::Unordered),
77
(o, i) => (o, i),
78
}
79
}
80
81
fn pushdown_orders(
82
roots: &[Node],
83
ir_arena: &mut Arena<IR>,
84
expr_arena: &Arena<AExpr>,
85
outputs: &mut PlHashMap<Node, UnitVec<Node>>,
86
cache_proxy: &PlHashMap<UniqueId, Vec<Node>>,
87
) -> PlHashMap<Node, PortOrder> {
88
let mut orders: PlHashMap<Node, PortOrder> = PlHashMap::default();
89
let mut node_hits: PlHashMap<Node, Vec<(usize, Node)>> = PlHashMap::default();
90
let mut aexpr_stack = Vec::new();
91
let mut stack = Vec::new();
92
let mut output_port_orderings = Vec::new();
93
94
stack.extend(roots.iter().map(|n| (*n, None)));
95
96
while let Some((node, outgoing)) = stack.pop() {
97
// @Hack. The IR creates caches for every path at the moment. That is super hacky. So is
98
// this, but we need to work around it.
99
let node = match ir_arena.get(node) {
100
IR::Cache { id, .. } => cache_proxy.get(id).unwrap()[0],
101
_ => node,
102
};
103
104
debug_assert!(!orders.contains_key(&node));
105
106
let node_outputs = &outputs[&node];
107
let hits = node_hits.entry(node).or_default();
108
if let Some(outgoing) = outgoing {
109
hits.push(outgoing);
110
if hits.len() < node_outputs.len() {
111
continue;
112
}
113
}
114
115
output_port_orderings.clear();
116
output_port_orderings.extend(
117
hits.iter().map(|(to_input_idx, to_node)| {
118
orders.get_mut(to_node).unwrap().inputs[*to_input_idx]
119
}),
120
);
121
122
let all_outputs_unordered = output_port_orderings
123
.iter()
124
.all(|i| matches!(i, I::Unordered));
125
126
// Pushdown simplification rules.
127
let ir = ir_arena.get_mut(node);
128
use {InputOrder as I, MaintainOrderJoin as MOJ, PortOrder as P};
129
let mut node_ordering = match ir {
130
IR::Cache { .. } if all_outputs_unordered => P::new([I::Unordered], [false]),
131
IR::Cache { .. } => P::new(
132
[I::Preserving],
133
output_port_orderings
134
.iter()
135
.map(|i| !matches!(i, I::Unordered))
136
.collect::<Box<[_]>>(),
137
),
138
IR::Sort { input, slice, .. } if slice.is_none() && all_outputs_unordered => {
139
// _ -> Unordered
140
//
141
// Remove sort.
142
let input = *input;
143
let (to_input_idx, to_node) = outgoing.unwrap();
144
*ir_arena
145
.get_mut(to_node)
146
.inputs_mut()
147
.nth(to_input_idx)
148
.unwrap() = input;
149
150
// @Performance: Linear search
151
*outputs
152
.get_mut(&input)
153
.unwrap()
154
.iter_mut()
155
.find(|o| **o == node)
156
.unwrap() = to_node;
157
158
stack.push((input, outgoing));
159
continue;
160
},
161
IR::Sort {
162
by_column,
163
sort_options,
164
..
165
} => {
166
let input = if sort_options.maintain_order {
167
I::Consuming
168
} else {
169
let mut has_order_sensitive = false;
170
for e in by_column {
171
let aexpr = expr_arena.get(e.node());
172
has_order_sensitive |=
173
is_order_sensitive_amortized(aexpr, expr_arena, &mut aexpr_stack);
174
}
175
176
if has_order_sensitive {
177
I::Consuming
178
} else {
179
I::Unordered
180
}
181
};
182
183
P::new([input], [true])
184
},
185
IR::GroupBy {
186
keys,
187
aggs,
188
maintain_order,
189
apply,
190
options,
191
..
192
} => {
193
*maintain_order &= !all_outputs_unordered;
194
195
let (input, output) = if apply.is_some()
196
|| options.is_dynamic()
197
|| options.is_rolling()
198
|| *maintain_order
199
{
200
(I::Consuming, true)
201
} else {
202
// _ -> Unordered
203
// to
204
// maintain_order = false
205
// and
206
// Unordered -> Unordered (if no order sensitive expressions)
207
208
*maintain_order = false;
209
let mut has_order_sensitive = false;
210
for e in keys.iter().chain(aggs.iter()) {
211
let aexpr = expr_arena.get(e.node());
212
has_order_sensitive |=
213
is_order_sensitive_amortized(aexpr, expr_arena, &mut aexpr_stack);
214
}
215
216
// The auto-implode is also other sensitive.
217
has_order_sensitive |=
218
aggs.iter().any(|agg| !is_scalar_ae(agg.node(), expr_arena));
219
220
(
221
if has_order_sensitive {
222
I::Consuming
223
} else {
224
I::Unordered
225
},
226
false,
227
)
228
};
229
230
P::new([input], [output])
231
},
232
#[cfg(feature = "merge_sorted")]
233
IR::MergeSorted {
234
input_left,
235
input_right,
236
..
237
} => {
238
if all_outputs_unordered {
239
// MergeSorted
240
// (_, _) -> Unordered
241
// to
242
// UnorderedUnion([left, right])
243
*ir = IR::Union {
244
inputs: vec![*input_left, *input_right],
245
options: UnionOptions {
246
maintain_order: false,
247
..Default::default()
248
},
249
};
250
P::new([I::Unordered, I::Unordered], [false])
251
} else {
252
P::new([I::Observing, I::Observing], [true])
253
}
254
},
255
#[cfg(feature = "asof_join")]
256
IR::Join { options, .. } if matches!(options.args.how, JoinType::AsOf(_)) => {
257
P::new([I::Observing, I::Observing], [!all_outputs_unordered])
258
},
259
IR::Join {
260
input_left: _,
261
input_right: _,
262
schema: _,
263
left_on,
264
right_on,
265
options,
266
} if all_outputs_unordered => {
267
// If the join maintains order, but the output has undefined order. Remove the
268
// ordering.
269
if !matches!(options.args.maintain_order, MOJ::None) {
270
let mut new_options = options.as_ref().clone();
271
new_options.args.maintain_order = MOJ::None;
272
*options = Arc::new(new_options);
273
}
274
275
let mut inputs = [I::Consuming, I::Consuming];
276
277
// If either side does not need to maintain order, don't maintain the old on that
278
// side.
279
for (i, on) in [left_on, right_on].iter().enumerate() {
280
let mut has_order_sensitive = false;
281
for e in on.iter() {
282
let aexpr = expr_arena.get(e.node());
283
has_order_sensitive |=
284
is_order_sensitive_amortized(aexpr, expr_arena, &mut aexpr_stack);
285
}
286
287
if !has_order_sensitive {
288
inputs[i] = I::Unordered;
289
}
290
}
291
292
P::new(inputs, [false])
293
},
294
IR::Join {
295
input_left: _,
296
input_right: _,
297
schema: _,
298
left_on,
299
right_on,
300
options,
301
} => {
302
let mut left_has_order_sensitive = false;
303
let mut right_has_order_sensitive = false;
304
305
for e in left_on {
306
let aexpr = expr_arena.get(e.node());
307
left_has_order_sensitive |=
308
is_order_sensitive_amortized(aexpr, expr_arena, &mut aexpr_stack);
309
}
310
for e in right_on {
311
let aexpr = expr_arena.get(e.node());
312
right_has_order_sensitive |=
313
is_order_sensitive_amortized(aexpr, expr_arena, &mut aexpr_stack);
314
}
315
316
use MaintainOrderJoin as M;
317
let left_input = match (
318
options.args.maintain_order,
319
left_has_order_sensitive,
320
options.args.slice.is_some(),
321
) {
322
(M::Left | M::LeftRight, true, _)
323
| (M::Left | M::LeftRight | M::RightLeft, _, true) => I::Observing,
324
(M::Left | M::LeftRight, false, _) => I::Preserving,
325
(M::RightLeft, _, _) | (_, true, _) => I::Consuming,
326
_ => I::Unordered,
327
};
328
let right_input = match (
329
options.args.maintain_order,
330
right_has_order_sensitive,
331
options.args.slice.is_some(),
332
) {
333
(M::Right | M::RightLeft, true, _)
334
| (M::Right | M::LeftRight | M::RightLeft, _, true) => I::Observing,
335
(M::Right | M::RightLeft, false, _) => I::Preserving,
336
(M::LeftRight, _, _) | (_, true, _) => I::Consuming,
337
_ => I::Unordered,
338
};
339
let output = !matches!(options.args.maintain_order, M::None);
340
341
P::new([left_input, right_input], [output])
342
},
343
IR::Distinct { input: _, options } => {
344
options.maintain_order &= !all_outputs_unordered;
345
346
let input = if options.maintain_order
347
|| matches!(
348
options.keep_strategy,
349
UniqueKeepStrategy::First | UniqueKeepStrategy::Last
350
) {
351
I::Observing
352
} else {
353
I::Unordered
354
};
355
P::new([input], [options.maintain_order])
356
},
357
IR::MapFunction { input: _, function } => {
358
let input = if function.is_streamable() {
359
if all_outputs_unordered {
360
I::Unordered
361
} else {
362
I::Preserving
363
}
364
} else {
365
I::Consuming
366
};
367
368
P::new([input], [!all_outputs_unordered])
369
},
370
IR::SimpleProjection { .. } => {
371
let input = if all_outputs_unordered {
372
I::Unordered
373
} else {
374
I::Preserving
375
};
376
P::new([input], [!all_outputs_unordered])
377
},
378
IR::Slice { .. } => P::new([I::Observing], [!all_outputs_unordered]),
379
IR::HStack { exprs, .. } => {
380
let mut has_order_sensitive = false;
381
for e in exprs {
382
let aexpr = expr_arena.get(e.node());
383
has_order_sensitive |=
384
is_order_sensitive_amortized(aexpr, expr_arena, &mut aexpr_stack);
385
}
386
387
let input = if has_order_sensitive {
388
I::Observing
389
} else if all_outputs_unordered {
390
I::Unordered
391
} else {
392
I::Preserving
393
};
394
395
P::new([input], [!all_outputs_unordered])
396
},
397
IR::Select { expr: exprs, .. } => {
398
let mut has_order_sensitive = false;
399
let mut all_scalar = true;
400
401
for e in exprs {
402
let aexpr = expr_arena.get(e.node());
403
has_order_sensitive |=
404
is_order_sensitive_amortized(aexpr, expr_arena, &mut aexpr_stack);
405
all_scalar &= is_scalar_ae(e.node(), expr_arena);
406
}
407
408
let input = if has_order_sensitive {
409
I::Observing
410
} else if all_outputs_unordered {
411
I::Unordered
412
} else {
413
I::Preserving
414
};
415
let output = !all_outputs_unordered && !all_scalar;
416
417
P::new([input], [output])
418
},
419
420
IR::Filter {
421
input: _,
422
predicate,
423
} => {
424
let is_order_sensitive = is_order_sensitive_amortized(
425
expr_arena.get(predicate.node()),
426
expr_arena,
427
&mut aexpr_stack,
428
);
429
430
let input = if is_order_sensitive {
431
I::Observing
432
} else if all_outputs_unordered {
433
I::Unordered
434
} else {
435
I::Preserving
436
};
437
438
P::new([input], [!all_outputs_unordered])
439
},
440
441
IR::Union { inputs, options } => {
442
if options.slice.is_none() && all_outputs_unordered {
443
options.maintain_order = false;
444
}
445
446
let input = match (options.slice.is_some(), options.maintain_order) {
447
(true, true) => I::Observing,
448
(true, false) => I::Consuming,
449
(false, true) => I::Preserving,
450
(false, false) => I::Unordered,
451
};
452
let output = options.maintain_order && !all_outputs_unordered;
453
454
P::new(std::iter::repeat_n(input, inputs.len()), [output])
455
},
456
457
IR::HConcat { inputs, .. } => P::new(
458
std::iter::repeat_n(I::Observing, inputs.len()),
459
[!all_outputs_unordered],
460
),
461
462
#[cfg(feature = "python")]
463
IR::PythonScan { .. } => P::new([], [!all_outputs_unordered]),
464
465
IR::Sink { payload, .. } => {
466
let input = if payload.maintain_order() {
467
I::Consuming
468
} else {
469
I::Unordered
470
};
471
P::new([input], [])
472
},
473
IR::Scan { .. } | IR::DataFrameScan { .. } => P::new([], [!all_outputs_unordered]),
474
475
IR::ExtContext { contexts, .. } => {
476
// This node is nonsense. Just do the most conservative thing you can.
477
P::new(
478
std::iter::repeat_n(I::Consuming, contexts.len() + 1),
479
[!all_outputs_unordered],
480
)
481
},
482
483
IR::SinkMultiple { .. } | IR::Invalid => unreachable!(),
484
};
485
486
// We make the code above simpler by pretending every node except caches always only has
487
// one output. We correct for that here.
488
if output_port_orderings.len() > 1 && node_ordering.output_ordered.len() == 1 {
489
node_ordering.output_ordered = UnitVec::from(vec![
490
node_ordering.output_ordered[0];
491
output_port_orderings.len()
492
])
493
}
494
495
let prev_value = orders.insert(node, node_ordering);
496
assert!(prev_value.is_none());
497
498
stack.extend(
499
ir.inputs()
500
.enumerate()
501
.map(|(to_input_idx, input)| (input, Some((to_input_idx, node)))),
502
);
503
}
504
505
orders
506
}
507
508
fn pullup_orders(
509
leaves: &[Node],
510
ir_arena: &mut Arena<IR>,
511
outputs: &mut PlHashMap<Node, UnitVec<Node>>,
512
orders: &mut PlHashMap<Node, PortOrder>,
513
cache_proxy: &PlHashMap<UniqueId, Vec<Node>>,
514
) {
515
let mut hits: PlHashMap<Node, Vec<(usize, Node)>> = PlHashMap::default();
516
let mut stack = Vec::new();
517
518
let mut txs = Vec::new();
519
520
for leaf in leaves {
521
stack.extend(
522
outputs[leaf]
523
.iter()
524
.enumerate()
525
.map(|(i, v)| (*v, (i, *leaf))),
526
);
527
}
528
529
while let Some((node, outgoing)) = stack.pop() {
530
// @Hack. The IR creates caches for every path at the moment. That is super hacky. So is
531
// this, but we need to work around it.
532
let node = match ir_arena.get(node) {
533
IR::Cache { id, .. } => cache_proxy.get(id).unwrap()[0],
534
_ => node,
535
};
536
537
let hits = hits.entry(node).or_default();
538
hits.push(outgoing);
539
if hits.len() < orders[&node].inputs.len() {
540
continue;
541
}
542
543
let node_outputs = &outputs[&node];
544
let ir = ir_arena.get_mut(node);
545
546
txs.clear();
547
txs.extend(
548
hits.iter()
549
.map(|(to_output_idx, to_node)| orders[to_node].output_ordered[*to_output_idx]),
550
);
551
552
let node_ordering = orders.get_mut(&node).unwrap();
553
assert_eq!(txs.len(), node_ordering.inputs.len());
554
for (tx, rx) in txs.iter().zip(node_ordering.inputs.iter_mut()) {
555
// @NOTE: We don't assign tx back here since it would be redundant.
556
(_, *rx) = simplify_edge(*tx, *rx);
557
}
558
559
// Pullup simplification rules.
560
use {InputOrder as I, MaintainOrderJoin as MOJ};
561
match ir {
562
IR::Sort { sort_options, .. } => {
563
// Unordered -> _ ==> maintain_order=false
564
sort_options.maintain_order &= !matches!(node_ordering.inputs[0], I::Unordered);
565
},
566
IR::GroupBy { maintain_order, .. } => {
567
if matches!(node_ordering.inputs[0], I::Unordered) {
568
// Unordered -> _
569
// to
570
// maintain_order = false
571
// and
572
// Unordered -> Unordered
573
574
*maintain_order = false;
575
node_ordering.set_unordered_output();
576
}
577
},
578
IR::Sink { input: _, payload } => {
579
if matches!(node_ordering.inputs[0], I::Unordered) {
580
// Set maintain order to false if input is unordered
581
match payload {
582
SinkTypeIR::Memory => {},
583
SinkTypeIR::File(s) => s.sink_options.maintain_order = false,
584
SinkTypeIR::Partition(s) => s.sink_options.maintain_order = false,
585
}
586
}
587
},
588
IR::Join { options, .. } => {
589
let left_unordered = matches!(node_ordering.inputs[0], I::Unordered);
590
let right_unordered = matches!(node_ordering.inputs[1], I::Unordered);
591
592
let maintain_order = options.args.maintain_order;
593
594
if (left_unordered
595
&& matches!(maintain_order, MOJ::Left | MOJ::RightLeft | MOJ::LeftRight))
596
|| (right_unordered
597
&& matches!(maintain_order, MOJ::Right | MOJ::RightLeft | MOJ::LeftRight))
598
{
599
// If we are maintaining order of a side, but that input has no guaranteed order,
600
// remove the maintain ordering from that side.
601
602
let mut new_options = options.as_ref().clone();
603
new_options.args.maintain_order = match maintain_order {
604
_ if left_unordered && right_unordered => MOJ::None,
605
MOJ::Left | MOJ::LeftRight if left_unordered => MOJ::None,
606
MOJ::RightLeft if left_unordered => MOJ::Right,
607
MOJ::Right | MOJ::RightLeft if right_unordered => MOJ::None,
608
MOJ::LeftRight if right_unordered => MOJ::Left,
609
_ => unreachable!(),
610
};
611
612
if matches!(new_options.args.maintain_order, MOJ::None) {
613
node_ordering.set_unordered_output();
614
}
615
*options = Arc::new(new_options);
616
}
617
},
618
IR::Distinct { input: _, options } => {
619
if matches!(node_ordering.inputs[0], I::Unordered) {
620
options.maintain_order = false;
621
if options.keep_strategy != UniqueKeepStrategy::None {
622
options.keep_strategy = UniqueKeepStrategy::Any;
623
}
624
node_ordering.set_unordered_output();
625
}
626
},
627
628
#[cfg(feature = "python")]
629
IR::PythonScan { .. } => {},
630
IR::Scan { .. } | IR::DataFrameScan { .. } => {},
631
#[cfg(feature = "merge_sorted")]
632
IR::MergeSorted { .. } => {
633
// An input being unordered is technically valid as it is possible for all values
634
// to be the same in which case the rows are sorted.
635
},
636
IR::Union { options, .. } => {
637
// Even if the inputs are unordered. The output still has an order given by the
638
// order of the inputs.
639
640
if node_ordering
641
.inputs
642
.iter()
643
.all(|i| matches!(i, I::Unordered))
644
{
645
if !options.maintain_order {
646
node_ordering.set_unordered_output();
647
}
648
}
649
},
650
IR::MapFunction { input: _, function } => {
651
if function.is_streamable() && matches!(node_ordering.inputs[0], I::Unordered) {
652
node_ordering.set_unordered_output();
653
}
654
},
655
656
IR::Cache { .. }
657
| IR::SimpleProjection { .. }
658
| IR::Slice { .. }
659
| IR::HStack { .. }
660
| IR::Filter { .. }
661
| IR::Select { .. }
662
| IR::HConcat { .. }
663
| IR::ExtContext { .. } => {
664
if node_ordering
665
.inputs
666
.iter()
667
.all(|i| matches!(i, I::Unordered))
668
{
669
node_ordering.set_unordered_output();
670
}
671
},
672
673
IR::SinkMultiple { .. } | IR::Invalid => unreachable!(),
674
}
675
676
stack.extend(
677
node_outputs
678
.iter()
679
.enumerate()
680
.map(|(i, v)| (*v, (i, node))),
681
);
682
}
683
}
684
685
/// Optimize the orderings used in the IR plan and get the relative orderings of all edges.
686
///
687
/// All roots should be `Sink` nodes and no `SinkMultiple` or `Invalid` are allowed to be part of
688
/// the graph.
689
pub fn simplify_and_fetch_orderings(
690
roots: &[Node],
691
ir_arena: &mut Arena<IR>,
692
expr_arena: &Arena<AExpr>,
693
) -> PlHashMap<Node, PortOrder> {
694
let mut leaves = Vec::new();
695
let mut outputs = PlHashMap::default();
696
let mut cache_proxy = PlHashMap::<UniqueId, Vec<Node>>::default();
697
698
// Get the per-node outputs and leaves
699
{
700
let mut stack = Vec::new();
701
702
for root in roots {
703
assert!(matches!(ir_arena.get(*root), IR::Sink { .. }));
704
outputs.insert(*root, UnitVec::new());
705
stack.extend(ir_arena.get(*root).inputs().map(|node| (*root, node)));
706
}
707
708
while let Some((parent, node)) = stack.pop() {
709
let ir = ir_arena.get(node);
710
let node = match ir {
711
IR::Cache { id, .. } => {
712
let nodes = cache_proxy.entry(*id).or_default();
713
nodes.push(node);
714
nodes[0]
715
},
716
_ => node,
717
};
718
719
let outputs = outputs.entry(node).or_default();
720
let has_been_visisited_before = !outputs.is_empty();
721
outputs.push(parent);
722
723
if has_been_visisited_before {
724
continue;
725
}
726
727
let inputs = ir.inputs();
728
if matches!(inputs, Inputs::Empty) {
729
leaves.push(node);
730
}
731
stack.extend(inputs.map(|input| (node, input)));
732
}
733
}
734
735
// Pushdown and optimize orders from the roots to the leaves.
736
let mut orders = pushdown_orders(roots, ir_arena, expr_arena, &mut outputs, &cache_proxy);
737
// Pullup orders from the leaves to the roots.
738
pullup_orders(&leaves, ir_arena, &mut outputs, &mut orders, &cache_proxy);
739
740
// @Hack. Since not all caches might share the same node and the input of caches might have
741
// been updated, we need to ensure that all caches again have the same input.
742
//
743
// This can be removed when all caches with the same id share the same IR node.
744
for nodes in cache_proxy.into_values() {
745
let updated_node = nodes[0];
746
let order = orders[&updated_node].clone();
747
let IR::Cache {
748
input: updated_input,
749
id: _,
750
} = ir_arena.get(updated_node)
751
else {
752
unreachable!();
753
};
754
let updated_input = *updated_input;
755
for n in &nodes[1..] {
756
let IR::Cache { input, id: _ } = ir_arena.get_mut(*n) else {
757
unreachable!();
758
};
759
760
orders.insert(*n, order.clone());
761
*input = updated_input;
762
}
763
}
764
765
orders
766
}
767
768