Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-mem-engine/src/planner/lp.rs
8430 views
1
use polars_core::POOL;
2
use polars_core::prelude::*;
3
use polars_expr::state::ExecutionState;
4
use polars_plan::plans::expr_ir::ExprIR;
5
use polars_plan::prelude::sink::CallbackSinkType;
6
use polars_utils::unique_id::UniqueId;
7
use recursive::recursive;
8
9
#[cfg(feature = "python")]
10
use self::python_dsl::PythonScanSource;
11
use super::*;
12
use crate::executors::{self, CachePrefiller, Executor, GroupByStreamingExec, SinkExecutor};
13
use crate::scan_predicate::functions::create_scan_predicate;
14
15
pub type StreamingExecutorBuilder =
16
fn(Node, &mut Arena<IR>, &mut Arena<AExpr>) -> PolarsResult<Box<dyn Executor>>;
17
18
fn partitionable_gb(
19
keys: &[ExprIR],
20
aggs: &[ExprIR],
21
input_schema: &Schema,
22
expr_arena: &Arena<AExpr>,
23
apply: &Option<PlanCallback<DataFrame, DataFrame>>,
24
) -> bool {
25
// checks:
26
// 1. complex expressions in the group_by itself are also not partitionable
27
// in this case anything more than col("foo")
28
// 2. a custom function cannot be partitioned
29
// 3. we don't bother with more than 2 keys, as the cardinality likely explodes
30
// by the combinations
31
if !keys.is_empty() && keys.len() < 3 && apply.is_none() {
32
// complex expressions in the group_by itself are also not partitionable
33
// in this case anything more than col("foo")
34
for key in keys {
35
if (expr_arena).iter(key.node()).count() > 1
36
|| has_aexpr(key.node(), expr_arena, |ae| match ae {
37
AExpr::Literal(lv) => !lv.is_scalar(),
38
_ => false,
39
})
40
{
41
return false;
42
}
43
}
44
45
can_pre_agg_exprs(aggs, expr_arena, input_schema)
46
} else {
47
false
48
}
49
}
50
51
#[derive(Clone)]
52
struct ConversionState {
53
has_cache_child: bool,
54
has_cache_parent: bool,
55
}
56
57
impl ConversionState {
58
fn new() -> PolarsResult<Self> {
59
Ok(ConversionState {
60
has_cache_child: false,
61
has_cache_parent: false,
62
})
63
}
64
65
fn with_new_branch<K, F: FnOnce(&mut Self) -> K>(&mut self, func: F) -> K {
66
let mut new_state = self.clone();
67
new_state.has_cache_child = false;
68
let out = func(&mut new_state);
69
self.has_cache_child = new_state.has_cache_child;
70
out
71
}
72
}
73
74
pub fn create_physical_plan(
75
root: Node,
76
lp_arena: &mut Arena<IR>,
77
expr_arena: &mut Arena<AExpr>,
78
build_streaming_executor: Option<StreamingExecutorBuilder>,
79
) -> PolarsResult<Box<dyn Executor>> {
80
let mut state = ConversionState::new()?;
81
let mut cache_nodes = Default::default();
82
let plan = create_physical_plan_impl(
83
root,
84
lp_arena,
85
expr_arena,
86
&mut state,
87
&mut cache_nodes,
88
build_streaming_executor,
89
)?;
90
91
if cache_nodes.is_empty() {
92
Ok(plan)
93
} else {
94
Ok(Box::new(CachePrefiller {
95
caches: cache_nodes,
96
phys_plan: plan,
97
}))
98
}
99
}
100
101
pub struct MultiplePhysicalPlans {
102
pub cache_prefiller: Option<Box<dyn Executor>>,
103
pub physical_plans: Vec<Box<dyn Executor>>,
104
}
105
pub fn create_multiple_physical_plans(
106
roots: &[Node],
107
lp_arena: &mut Arena<IR>,
108
expr_arena: &mut Arena<AExpr>,
109
build_streaming_executor: Option<StreamingExecutorBuilder>,
110
) -> PolarsResult<MultiplePhysicalPlans> {
111
let mut state = ConversionState::new()?;
112
let mut cache_nodes = Default::default();
113
let plans = state.with_new_branch(|new_state| {
114
roots
115
.iter()
116
.map(|&node| {
117
create_physical_plan_impl(
118
node,
119
lp_arena,
120
expr_arena,
121
new_state,
122
&mut cache_nodes,
123
build_streaming_executor,
124
)
125
})
126
.collect::<PolarsResult<Vec<_>>>()
127
})?;
128
129
let cache_prefiller = (!cache_nodes.is_empty()).then(|| {
130
struct Empty;
131
impl Executor for Empty {
132
fn execute(&mut self, _cache: &mut ExecutionState) -> PolarsResult<DataFrame> {
133
Ok(DataFrame::empty())
134
}
135
}
136
Box::new(CachePrefiller {
137
caches: cache_nodes,
138
phys_plan: Box::new(Empty),
139
}) as _
140
});
141
142
Ok(MultiplePhysicalPlans {
143
cache_prefiller,
144
physical_plans: plans,
145
})
146
}
147
148
#[cfg(feature = "python")]
149
#[allow(clippy::type_complexity)]
150
pub fn python_scan_predicate(
151
options: &mut PythonOptions,
152
expr_arena: &mut Arena<AExpr>,
153
state: &mut ExpressionConversionState,
154
) -> PolarsResult<(
155
Option<Arc<dyn polars_expr::prelude::PhysicalExpr>>,
156
Option<Vec<u8>>,
157
)> {
158
let mut predicate_serialized = None;
159
let predicate = if let PythonPredicate::Polars(e) = &options.predicate {
160
// Clone the expression so we can release the borrow on `options`
161
// before mutating `options.predicate` below.
162
let e = e.clone();
163
164
// Convert to a pyarrow eval string.
165
if matches!(options.python_source, PythonScanSource::Pyarrow) {
166
use polars_core::config::verbose_print_sensitive;
167
use polars_plan::plans::MintermIter;
168
169
// Split into AND-minterms and convert each independently.
170
let mut residual_predicate_nodes: Vec<Node> = vec![];
171
let parts: Vec<String> = MintermIter::new(e.node(), expr_arena)
172
.filter_map(|node| {
173
let result = polars_plan::plans::python::pyarrow::predicate_to_pa(
174
node,
175
expr_arena,
176
Default::default(),
177
);
178
if result.is_none() {
179
residual_predicate_nodes.push(node);
180
}
181
result
182
})
183
.collect();
184
185
let predicate_pa = match parts.len() {
186
0 => None,
187
1 => Some(parts.into_iter().next().unwrap()),
188
_ => Some(format!("({})", parts.join(" & "))),
189
};
190
191
let residual_predicate_expr_ir = if let Some(eval_str) = predicate_pa {
192
options.predicate = PythonPredicate::PyArrow(eval_str);
193
194
residual_predicate_nodes
195
.into_iter()
196
.fold(None, |acc, node| {
197
Some(acc.map_or(node, |acc_node| {
198
expr_arena.add(AExpr::BinaryExpr {
199
left: acc_node,
200
op: Operator::And,
201
right: node,
202
})
203
}))
204
})
205
.map(|node| ExprIR::from_node(node, expr_arena))
206
} else {
207
Some(e.clone())
208
};
209
210
verbose_print_sensitive(|| {
211
let predicate_pa_verbose_msg = match &options.predicate {
212
PythonPredicate::PyArrow(p) => p,
213
_ => "<conversion failed>",
214
};
215
216
format!(
217
"python_scan_predicate: \
218
predicate node: {}, \
219
converted pyarrow predicate: {}, \
220
residual predicate: {:?}",
221
ExprIRDisplay::display_node(e.node(), expr_arena),
222
predicate_pa_verbose_msg,
223
residual_predicate_expr_ir
224
.as_ref()
225
.map(|e| ExprIRDisplay::display_node(e.node(), expr_arena)),
226
)
227
});
228
229
residual_predicate_expr_ir
230
.map(|expr_ir| create_physical_expr(&expr_ir, expr_arena, &options.schema, state))
231
.transpose()?
232
}
233
// Convert to physical expression for the case the reader cannot consume the predicate.
234
else {
235
let dsl_expr = e.to_expr(expr_arena);
236
predicate_serialized = polars_plan::plans::python::predicate::serialize(&dsl_expr)?;
237
238
Some(create_physical_expr(
239
&e,
240
expr_arena,
241
&options.schema,
242
state,
243
)?)
244
}
245
} else {
246
None
247
};
248
249
Ok((predicate, predicate_serialized))
250
}
251
252
#[recursive]
253
fn create_physical_plan_impl(
254
root: Node,
255
lp_arena: &mut Arena<IR>,
256
expr_arena: &mut Arena<AExpr>,
257
state: &mut ConversionState,
258
// Cache nodes in order of discovery
259
cache_nodes: &mut PlIndexMap<UniqueId, executors::CachePrefill>,
260
build_streaming_executor: Option<StreamingExecutorBuilder>,
261
) -> PolarsResult<Box<dyn Executor>> {
262
use IR::*;
263
264
let get_streaming_executor_builder = || {
265
build_streaming_executor.expect(
266
"get_streaming_executor_builder() failed (hint: missing feature new-streaming?)",
267
)
268
};
269
270
macro_rules! recurse {
271
($node:expr, $state: expr) => {
272
create_physical_plan_impl(
273
$node,
274
lp_arena,
275
expr_arena,
276
$state,
277
cache_nodes,
278
build_streaming_executor,
279
)
280
};
281
}
282
283
let logical_plan = if state.has_cache_parent
284
|| matches!(
285
lp_arena.get(root),
286
IR::Scan { .. } // Needed for the streaming impl
287
| IR::Cache { .. } // Needed for plans branching from the same cache node
288
| IR::GroupBy { .. } // Needed for the streaming impl
289
| IR::Sink { // Needed for the streaming impl
290
payload:
291
SinkTypeIR::File(_) | SinkTypeIR::Partitioned { .. },
292
..
293
}
294
) {
295
lp_arena.get(root).clone()
296
} else {
297
lp_arena.take(root)
298
};
299
300
match logical_plan {
301
#[cfg(feature = "python")]
302
PythonScan { mut options } => {
303
let mut expr_conv_state = ExpressionConversionState::new(true);
304
let (predicate, predicate_serialized) =
305
python_scan_predicate(&mut options, expr_arena, &mut expr_conv_state)?;
306
Ok(Box::new(executors::PythonScanExec {
307
options,
308
predicate,
309
predicate_serialized,
310
}))
311
},
312
Sink { input, payload } => match payload {
313
SinkTypeIR::Memory => Ok(Box::new(SinkExecutor {
314
input: recurse!(input, state)?,
315
name: PlSmallStr::from_static("mem"),
316
f: Box::new(move |df, _state| Ok(Some(df))),
317
})),
318
SinkTypeIR::Callback(CallbackSinkType {
319
function,
320
maintain_order: _,
321
chunk_size,
322
}) => {
323
let chunk_size = chunk_size.map_or(usize::MAX, Into::into);
324
325
Ok(Box::new(SinkExecutor {
326
input: recurse!(input, state)?,
327
name: PlSmallStr::from_static("batches"),
328
f: Box::new(move |mut buffer, _state| {
329
while buffer.height() > 0 {
330
let df;
331
(df, buffer) = buffer.split_at(buffer.height().min(chunk_size) as i64);
332
let should_stop = function.call(df)?;
333
if should_stop {
334
break;
335
}
336
}
337
Ok(Some(DataFrame::empty()))
338
}),
339
}))
340
},
341
SinkTypeIR::File(_) | SinkTypeIR::Partitioned { .. } => {
342
get_streaming_executor_builder()(root, lp_arena, expr_arena)
343
},
344
},
345
SinkMultiple { .. } => {
346
polars_bail!(InvalidOperation: "lazy multisinks only supported on streaming engine")
347
},
348
Union { inputs, options } => {
349
let inputs = state.with_new_branch(|new_state| {
350
inputs
351
.into_iter()
352
.map(|node| recurse!(node, new_state))
353
.collect::<PolarsResult<Vec<_>>>()
354
});
355
let inputs = inputs?;
356
Ok(Box::new(executors::UnionExec { inputs, options }))
357
},
358
HConcat {
359
inputs, options, ..
360
} => {
361
let inputs = state.with_new_branch(|new_state| {
362
inputs
363
.into_iter()
364
.map(|node| recurse!(node, new_state))
365
.collect::<PolarsResult<Vec<_>>>()
366
});
367
368
let inputs = inputs?;
369
370
Ok(Box::new(executors::HConcatExec { inputs, options }))
371
},
372
Slice { input, offset, len } => {
373
let input = recurse!(input, state)?;
374
Ok(Box::new(executors::SliceExec { input, offset, len }))
375
},
376
Filter { input, predicate } => {
377
let streamable = is_elementwise_rec(predicate.node(), expr_arena);
378
let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
379
let input = recurse!(input, state)?;
380
let mut state = ExpressionConversionState::new(true);
381
let predicate =
382
create_physical_expr(&predicate, expr_arena, &input_schema, &mut state)?;
383
Ok(Box::new(executors::FilterExec::new(
384
predicate,
385
input,
386
state.has_windows,
387
streamable,
388
)))
389
},
390
#[allow(unused_variables)]
391
Scan {
392
sources,
393
file_info,
394
hive_parts,
395
output_schema,
396
scan_type,
397
predicate,
398
predicate_file_skip_applied,
399
unified_scan_args,
400
} => {
401
let mut expr_conversion_state = ExpressionConversionState::new(true);
402
403
let mut create_skip_batch_predicate = unified_scan_args.table_statistics.is_some();
404
#[cfg(feature = "parquet")]
405
{
406
if let FileScanIR::Parquet { options, .. } = scan_type.as_ref() {
407
create_skip_batch_predicate |= options.use_statistics;
408
}
409
}
410
411
let predicate = predicate
412
.map(|predicate| {
413
create_scan_predicate(
414
&predicate,
415
expr_arena,
416
output_schema.as_ref().unwrap_or(&file_info.schema),
417
None, // hive_schema
418
&mut expr_conversion_state,
419
create_skip_batch_predicate,
420
false,
421
)
422
})
423
.transpose()?;
424
425
match *scan_type {
426
FileScanIR::Anonymous { function, .. } => {
427
Ok(Box::new(executors::AnonymousScanExec {
428
function,
429
predicate,
430
unified_scan_args,
431
file_info,
432
output_schema,
433
predicate_has_windows: expr_conversion_state.has_windows,
434
}))
435
},
436
#[cfg_attr(
437
not(any(
438
feature = "parquet",
439
feature = "ipc",
440
feature = "csv",
441
feature = "json",
442
feature = "scan_lines"
443
)),
444
expect(unreachable_patterns)
445
)]
446
_ => get_streaming_executor_builder()(root, lp_arena, expr_arena),
447
}
448
},
449
450
Select {
451
expr,
452
input,
453
schema: _schema,
454
options,
455
..
456
} => {
457
let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
458
let input = recurse!(input, state)?;
459
let mut state = ExpressionConversionState::new(POOL.current_num_threads() > expr.len());
460
let phys_expr =
461
create_physical_expressions_from_irs(&expr, expr_arena, &input_schema, &mut state)?;
462
463
let allow_vertical_parallelism = options.should_broadcast && expr.iter().all(|e| is_elementwise_rec(e.node(), expr_arena))
464
// If all columns are literal we would get a 1 row per thread.
465
&& !phys_expr.iter().all(|p| {
466
p.is_literal()
467
});
468
469
Ok(Box::new(executors::ProjectionExec {
470
input,
471
expr: phys_expr,
472
has_windows: state.has_windows,
473
input_schema,
474
#[cfg(test)]
475
schema: _schema,
476
options,
477
allow_vertical_parallelism,
478
}))
479
},
480
DataFrameScan {
481
df, output_schema, ..
482
} => Ok(Box::new(executors::DataFrameExec {
483
df,
484
projection: output_schema.map(|s| s.iter_names_cloned().collect()),
485
})),
486
Sort {
487
input,
488
by_column,
489
slice,
490
sort_options,
491
} => {
492
debug_assert!(!by_column.is_empty());
493
let input_schema = lp_arena.get(input).schema(lp_arena);
494
let by_column = create_physical_expressions_from_irs(
495
&by_column,
496
expr_arena,
497
input_schema.as_ref(),
498
&mut ExpressionConversionState::new(true),
499
)?;
500
let input = recurse!(input, state)?;
501
Ok(Box::new(executors::SortExec {
502
input,
503
by_column,
504
slice,
505
sort_options,
506
}))
507
},
508
Cache { input, id } => {
509
state.has_cache_parent = true;
510
state.has_cache_child = true;
511
512
if let Some(cache) = cache_nodes.get_mut(&id) {
513
Ok(Box::new(cache.make_exec()))
514
} else {
515
let input = recurse!(input, state)?;
516
517
let mut prefill = executors::CachePrefill::new_cache(input, id);
518
let exec = prefill.make_exec();
519
520
cache_nodes.insert(id, prefill);
521
522
Ok(Box::new(exec))
523
}
524
},
525
Distinct { input, options } => {
526
let input = recurse!(input, state)?;
527
Ok(Box::new(executors::UniqueExec { input, options }))
528
},
529
GroupBy {
530
input,
531
keys,
532
aggs,
533
apply,
534
schema: output_schema,
535
maintain_order,
536
options,
537
} => {
538
let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
539
let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone());
540
let phys_keys = create_physical_expressions_from_irs(
541
&keys,
542
expr_arena,
543
&input_schema,
544
&mut ExpressionConversionState::new(true),
545
)?;
546
let phys_aggs = create_physical_expressions_from_irs(
547
&aggs,
548
expr_arena,
549
&input_schema,
550
&mut ExpressionConversionState::new(true),
551
)?;
552
553
let _slice = options.slice;
554
#[cfg(feature = "dynamic_group_by")]
555
if let Some(options) = options.dynamic {
556
let input = recurse!(input, state)?;
557
return Ok(Box::new(executors::GroupByDynamicExec {
558
input,
559
keys: phys_keys,
560
aggs: phys_aggs,
561
options,
562
input_schema,
563
output_schema,
564
slice: _slice,
565
apply,
566
}));
567
}
568
569
#[cfg(feature = "dynamic_group_by")]
570
if let Some(options) = options.rolling {
571
let input = recurse!(input, state)?;
572
return Ok(Box::new(executors::GroupByRollingExec {
573
input,
574
keys: phys_keys,
575
aggs: phys_aggs,
576
options,
577
input_schema,
578
output_schema,
579
slice: _slice,
580
apply,
581
}));
582
}
583
584
// We first check if we can partition the group_by on the latest moment.
585
let partitionable = partitionable_gb(&keys, &aggs, &input_schema, expr_arena, &apply);
586
if partitionable {
587
let from_partitioned_ds = lp_arena.iter(input).any(|(_, lp)| {
588
if let Union { options, .. } = lp {
589
options.from_partitioned_ds
590
} else {
591
false
592
}
593
});
594
let builder = get_streaming_executor_builder();
595
596
let input = recurse!(input, state)?;
597
598
let gb_root = if state.has_cache_parent {
599
lp_arena.add(lp_arena.get(root).clone())
600
} else {
601
root
602
};
603
604
let executor = Box::new(GroupByStreamingExec::new(
605
input,
606
builder,
607
gb_root,
608
lp_arena,
609
expr_arena,
610
phys_keys,
611
phys_aggs,
612
maintain_order,
613
output_schema,
614
_slice,
615
from_partitioned_ds,
616
));
617
618
Ok(executor)
619
} else {
620
let input = recurse!(input, state)?;
621
Ok(Box::new(executors::GroupByExec::new(
622
input,
623
phys_keys,
624
phys_aggs,
625
apply,
626
maintain_order,
627
input_schema,
628
output_schema,
629
options.slice,
630
)))
631
}
632
},
633
Join {
634
input_left,
635
input_right,
636
left_on,
637
right_on,
638
options,
639
schema,
640
..
641
} => {
642
let schema_left = lp_arena.get(input_left).schema(lp_arena).into_owned();
643
let schema_right = lp_arena.get(input_right).schema(lp_arena).into_owned();
644
645
let (input_left, input_right) = state.with_new_branch(|new_state| {
646
(
647
recurse!(input_left, new_state),
648
recurse!(input_right, new_state),
649
)
650
});
651
let input_left = input_left?;
652
let input_right = input_right?;
653
654
// Todo! remove the force option. It can deadlock.
655
let parallel = if options.force_parallel {
656
true
657
} else {
658
options.allow_parallel
659
};
660
661
let left_on = create_physical_expressions_from_irs(
662
&left_on,
663
expr_arena,
664
&schema_left,
665
&mut ExpressionConversionState::new(true),
666
)?;
667
let right_on = create_physical_expressions_from_irs(
668
&right_on,
669
expr_arena,
670
&schema_right,
671
&mut ExpressionConversionState::new(true),
672
)?;
673
let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone());
674
675
// Convert the join options, to the physical join options. This requires the physical
676
// planner, so we do this last minute.
677
let join_type_options = options
678
.options
679
.map(|o| {
680
o.compile(|e| {
681
let phys_expr = create_physical_expr(
682
e,
683
expr_arena,
684
&schema,
685
&mut ExpressionConversionState::new(false),
686
)?;
687
688
let execution_state = ExecutionState::default();
689
690
Ok(Arc::new(move |df: DataFrame| {
691
let mask = phys_expr.evaluate(&df, &execution_state)?;
692
let mask = mask.as_materialized_series();
693
let mask = mask.bool()?;
694
df.filter_seq(mask)
695
}))
696
})
697
})
698
.transpose()?;
699
700
Ok(Box::new(executors::JoinExec::new(
701
input_left,
702
input_right,
703
left_on,
704
right_on,
705
parallel,
706
options.args,
707
join_type_options,
708
)))
709
},
710
HStack {
711
input,
712
exprs,
713
schema: output_schema,
714
options,
715
} => {
716
let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
717
let input = recurse!(input, state)?;
718
719
let allow_vertical_parallelism = options.should_broadcast
720
&& exprs
721
.iter()
722
.all(|e| is_elementwise_rec(e.node(), expr_arena));
723
724
let mut state =
725
ExpressionConversionState::new(POOL.current_num_threads() > exprs.len());
726
727
let phys_exprs = create_physical_expressions_from_irs(
728
&exprs,
729
expr_arena,
730
&input_schema,
731
&mut state,
732
)?;
733
Ok(Box::new(executors::StackExec {
734
input,
735
has_windows: state.has_windows,
736
exprs: phys_exprs,
737
input_schema,
738
output_schema,
739
options,
740
allow_vertical_parallelism,
741
}))
742
},
743
MapFunction {
744
input, function, ..
745
} => {
746
let input = recurse!(input, state)?;
747
Ok(Box::new(executors::UdfExec { input, function }))
748
},
749
ExtContext {
750
input, contexts, ..
751
} => {
752
let input = recurse!(input, state)?;
753
let contexts = contexts
754
.into_iter()
755
.map(|node| recurse!(node, state))
756
.collect::<PolarsResult<_>>()?;
757
Ok(Box::new(executors::ExternalContext { input, contexts }))
758
},
759
SimpleProjection { input, columns } => {
760
let input = recurse!(input, state)?;
761
let exec = executors::ProjectionSimple { input, columns };
762
Ok(Box::new(exec))
763
},
764
#[cfg(feature = "merge_sorted")]
765
MergeSorted {
766
input_left,
767
input_right,
768
key,
769
} => {
770
let (input_left, input_right) = state.with_new_branch(|new_state| {
771
(
772
recurse!(input_left, new_state),
773
recurse!(input_right, new_state),
774
)
775
});
776
let input_left = input_left?;
777
let input_right = input_right?;
778
779
let exec = executors::MergeSorted {
780
input_left,
781
input_right,
782
key,
783
};
784
Ok(Box::new(exec))
785
},
786
Invalid => unreachable!(),
787
}
788
}
789
790
#[cfg(test)]
791
mod tests {
792
use super::*;
793
794
#[test]
795
fn test_create_multiple_physical_plans_reused_cache() {
796
// Check that reusing the same cache node doesn't panic.
797
// CSE creates duplicate cache nodes with the same ID, but cloud reuses them.
798
799
let mut ir = Arena::new();
800
801
let schema = Schema::from_iter([(PlSmallStr::from_static("x"), DataType::Float32)]);
802
let scan = ir.add(IR::DataFrameScan {
803
df: Arc::new(DataFrame::empty_with_schema(&schema)),
804
schema: Arc::new(schema),
805
output_schema: None,
806
});
807
808
let cache = ir.add(IR::Cache {
809
input: scan,
810
id: UniqueId::new(),
811
});
812
813
let left_sink = ir.add(IR::Sink {
814
input: cache,
815
payload: SinkTypeIR::Memory,
816
});
817
let right_sink = ir.add(IR::Sink {
818
input: cache,
819
payload: SinkTypeIR::Memory,
820
});
821
822
let _multiplan = create_multiple_physical_plans(
823
&[left_sink, right_sink],
824
&mut ir,
825
&mut Arena::new(),
826
None,
827
)
828
.unwrap();
829
}
830
}
831
832