Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/physical_plan/to_graph.rs
8458 views
1
use std::sync::{Arc, OnceLock};
2
3
use num_traits::AsPrimitive;
4
use parking_lot::Mutex;
5
use polars_core::prelude::PlRandomState;
6
use polars_core::schema::{Schema, SchemaRef};
7
use polars_core::{POOL, config};
8
use polars_error::{PolarsResult, polars_bail, polars_ensure, polars_err};
9
use polars_expr::groups::new_hash_grouper;
10
use polars_expr::planner::{ExpressionConversionState, create_physical_expr};
11
use polars_expr::reduce::into_reduction;
12
use polars_expr::state::ExecutionState;
13
use polars_mem_engine::create_physical_plan;
14
use polars_mem_engine::scan_predicate::create_scan_predicate;
15
use polars_plan::dsl::{
16
FileSinkOptions, JoinOptionsIR, PartitionStrategyIR, PartitionedSinkOptionsIR, ScanSources,
17
};
18
use polars_plan::plans::expr_ir::ExprIR;
19
use polars_plan::plans::{AExpr, ArenaExprIter, IR, IRAggExpr};
20
use polars_plan::prelude::FunctionFlags;
21
use polars_utils::arena::{Arena, Node};
22
use polars_utils::format_pl_smallstr;
23
use polars_utils::itertools::Itertools;
24
use polars_utils::pl_path::PlRefPath;
25
use polars_utils::pl_str::PlSmallStr;
26
use polars_utils::relaxed_cell::RelaxedCell;
27
use recursive::recursive;
28
use slotmap::{SecondaryMap, SlotMap};
29
30
use super::{PhysNode, PhysNodeKey, PhysNodeKind};
31
use crate::execute::StreamingExecutionState;
32
use crate::expression::StreamExpr;
33
use crate::graph::{Graph, GraphNodeKey};
34
use crate::morsel::{MorselSeq, get_ideal_morsel_size};
35
use crate::nodes;
36
use crate::nodes::io_sources::multi_scan::config::MultiScanConfig;
37
use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;
38
use crate::nodes::io_sources::multi_scan::reader_interface::capabilities::ReaderCapabilities;
39
use crate::nodes::joins::merge_join::MergeJoinNode;
40
use crate::physical_plan::lower_expr::compute_output_schema;
41
use crate::utils::late_materialized_df::LateMaterializedDataFrame;
42
43
fn has_potential_recurring_entrance(node: Node, arena: &Arena<AExpr>) -> bool {
44
arena.iter(node).any(|(_n, ae)| match ae {
45
AExpr::Function { options, .. } | AExpr::AnonymousFunction { options, .. } => {
46
options.flags.contains(FunctionFlags::OPTIONAL_RE_ENTRANT)
47
},
48
_ => false,
49
})
50
}
51
52
fn create_stream_expr(
53
expr_ir: &ExprIR,
54
ctx: &mut GraphConversionContext<'_>,
55
schema: &Arc<Schema>,
56
) -> PolarsResult<StreamExpr> {
57
let reentrant = has_potential_recurring_entrance(expr_ir.node(), ctx.expr_arena);
58
let phys = create_physical_expr(
59
expr_ir,
60
ctx.expr_arena,
61
schema,
62
&mut ctx.expr_conversion_state,
63
)?;
64
Ok(StreamExpr::new(phys, reentrant))
65
}
66
67
struct GraphConversionContext<'a> {
68
phys_sm: &'a SlotMap<PhysNodeKey, PhysNode>,
69
expr_arena: &'a mut Arena<AExpr>,
70
graph: Graph,
71
phys_to_graph: SecondaryMap<PhysNodeKey, GraphNodeKey>,
72
expr_conversion_state: ExpressionConversionState,
73
num_pipelines: usize,
74
}
75
76
pub fn physical_plan_to_graph(
77
root: PhysNodeKey,
78
phys_sm: &SlotMap<PhysNodeKey, PhysNode>,
79
expr_arena: &mut Arena<AExpr>,
80
) -> PolarsResult<(Graph, SecondaryMap<PhysNodeKey, GraphNodeKey>)> {
81
// Get the number of threads from the rayon thread-pool as that respects our config.
82
let num_pipelines = POOL.current_num_threads();
83
let mut ctx = GraphConversionContext {
84
phys_sm,
85
expr_arena,
86
graph: Graph::with_capacity(phys_sm.len()),
87
phys_to_graph: SecondaryMap::with_capacity(phys_sm.len()),
88
expr_conversion_state: ExpressionConversionState::new(false),
89
num_pipelines,
90
};
91
92
to_graph_rec(root, &mut ctx)?;
93
94
Ok((ctx.graph, ctx.phys_to_graph))
95
}
96
97
#[recursive]
98
fn to_graph_rec<'a>(
99
phys_node_key: PhysNodeKey,
100
ctx: &mut GraphConversionContext<'a>,
101
) -> PolarsResult<GraphNodeKey> {
102
// This will ensure we create a proper acyclic directed graph instead of a tree.
103
if let Some(graph_key) = ctx.phys_to_graph.get(phys_node_key) {
104
return Ok(*graph_key);
105
}
106
107
use PhysNodeKind::*;
108
let node = &ctx.phys_sm[phys_node_key];
109
let graph_key = match &node.kind {
110
InMemorySource {
111
df,
112
disable_morsel_split,
113
} => ctx.graph.add_node(
114
if *disable_morsel_split {
115
nodes::in_memory_source::InMemorySourceNode::new_no_morsel_split(
116
df.clone(),
117
MorselSeq::default(),
118
)
119
} else {
120
nodes::in_memory_source::InMemorySourceNode::new(df.clone(), MorselSeq::default())
121
},
122
[],
123
),
124
SinkMultiple { sinks } => {
125
// @NOTE: This is always the root node and gets ignored by the physical_plan anyway so
126
// we give one of the inputs back.
127
let node = to_graph_rec(sinks[0], ctx)?;
128
for sink in &sinks[1..] {
129
to_graph_rec(*sink, ctx)?;
130
}
131
return Ok(node);
132
},
133
134
StreamingSlice {
135
input,
136
offset,
137
length,
138
} => {
139
let input_key = to_graph_rec(input.node, ctx)?;
140
ctx.graph.add_node(
141
nodes::streaming_slice::StreamingSliceNode::new(*offset, *length),
142
[(input_key, input.port)],
143
)
144
},
145
146
NegativeSlice {
147
input,
148
offset,
149
length,
150
} => {
151
let input_key = to_graph_rec(input.node, ctx)?;
152
ctx.graph.add_node(
153
nodes::negative_slice::NegativeSliceNode::new(*offset, *length),
154
[(input_key, input.port)],
155
)
156
},
157
158
DynamicSlice {
159
input,
160
offset,
161
length,
162
} => {
163
let input_key = to_graph_rec(input.node, ctx)?;
164
let offset_key = to_graph_rec(offset.node, ctx)?;
165
let length_key = to_graph_rec(length.node, ctx)?;
166
let offset_schema = ctx.phys_sm[offset.node].output_schema.clone();
167
let length_schema = ctx.phys_sm[length.node].output_schema.clone();
168
ctx.graph.add_node(
169
nodes::dynamic_slice::DynamicSliceNode::new(offset_schema, length_schema),
170
[
171
(input_key, input.port),
172
(offset_key, offset.port),
173
(length_key, length.port),
174
],
175
)
176
},
177
178
Shift {
179
input,
180
offset,
181
fill,
182
} => {
183
let input_schema = ctx.phys_sm[input.node].output_schema.clone();
184
let offset_schema = ctx.phys_sm[offset.node].output_schema.clone();
185
let input_key = to_graph_rec(input.node, ctx)?;
186
let offset_key = to_graph_rec(offset.node, ctx)?;
187
if let Some(fill) = fill {
188
let fill_key = to_graph_rec(fill.node, ctx)?;
189
ctx.graph.add_node(
190
nodes::shift::ShiftNode::new(input_schema, offset_schema, true),
191
[
192
(input_key, input.port),
193
(offset_key, offset.port),
194
(fill_key, fill.port),
195
],
196
)
197
} else {
198
ctx.graph.add_node(
199
nodes::shift::ShiftNode::new(input_schema, offset_schema, false),
200
[(input_key, input.port), (offset_key, offset.port)],
201
)
202
}
203
},
204
205
Filter { predicate, input } => {
206
let input_schema = &ctx.phys_sm[input.node].output_schema;
207
let phys_predicate_expr = create_stream_expr(predicate, ctx, input_schema)?;
208
let input_key = to_graph_rec(input.node, ctx)?;
209
ctx.graph.add_node(
210
nodes::filter::FilterNode::new(phys_predicate_expr),
211
[(input_key, input.port)],
212
)
213
},
214
215
Select {
216
selectors,
217
input,
218
extend_original,
219
} => {
220
let input_schema = &ctx.phys_sm[input.node].output_schema;
221
let phys_selectors = selectors
222
.iter()
223
.map(|selector| create_stream_expr(selector, ctx, input_schema))
224
.collect::<PolarsResult<_>>()?;
225
let input_key = to_graph_rec(input.node, ctx)?;
226
ctx.graph.add_node(
227
nodes::select::SelectNode::new(
228
phys_selectors,
229
node.output_schema.clone(),
230
*extend_original,
231
),
232
[(input_key, input.port)],
233
)
234
},
235
236
WithRowIndex {
237
input,
238
name,
239
offset,
240
} => {
241
let input_key = to_graph_rec(input.node, ctx)?;
242
ctx.graph.add_node(
243
nodes::with_row_index::WithRowIndexNode::new(name.clone(), *offset),
244
[(input_key, input.port)],
245
)
246
},
247
248
InputIndependentSelect { selectors } => {
249
let empty_schema = Default::default();
250
let phys_selectors = selectors
251
.iter()
252
.map(|selector| create_stream_expr(selector, ctx, &empty_schema))
253
.collect::<PolarsResult<_>>()?;
254
ctx.graph.add_node(
255
nodes::input_independent_select::InputIndependentSelectNode::new(phys_selectors),
256
[],
257
)
258
},
259
260
Reduce { input, exprs } => {
261
let input_key = to_graph_rec(input.node, ctx)?;
262
let input_schema = &ctx.phys_sm[input.node].output_schema;
263
264
let mut reductions = Vec::with_capacity(exprs.len());
265
let mut inputs = Vec::with_capacity(reductions.len());
266
267
for e in exprs {
268
let (red, input_nodes) =
269
into_reduction(e.node(), ctx.expr_arena, input_schema, false)?;
270
reductions.push(red);
271
272
let input_phys_exprs = input_nodes
273
.iter()
274
.map(|node| {
275
create_stream_expr(
276
&ExprIR::from_node(*node, ctx.expr_arena),
277
ctx,
278
input_schema,
279
)
280
})
281
.try_collect_vec()?;
282
283
inputs.push(input_phys_exprs)
284
}
285
286
ctx.graph.add_node(
287
nodes::reduce::ReduceNode::new(inputs, reductions, node.output_schema.clone()),
288
[(input_key, input.port)],
289
)
290
},
291
SimpleProjection { input, columns } => {
292
let input_schema = ctx.phys_sm[input.node].output_schema.clone();
293
let input_key = to_graph_rec(input.node, ctx)?;
294
ctx.graph.add_node(
295
nodes::simple_projection::SimpleProjectionNode::new(columns.clone(), input_schema),
296
[(input_key, input.port)],
297
)
298
},
299
300
InMemorySink { input } => {
301
let input_schema = ctx.phys_sm[input.node].output_schema.clone();
302
let input_key = to_graph_rec(input.node, ctx)?;
303
ctx.graph.add_node(
304
nodes::in_memory_sink::InMemorySinkNode::new(input_schema),
305
[(input_key, input.port)],
306
)
307
},
308
309
CallbackSink {
310
input,
311
function,
312
maintain_order,
313
chunk_size,
314
} => {
315
let input_key = to_graph_rec(input.node, ctx)?;
316
ctx.graph.add_node(
317
nodes::callback_sink::CallbackSinkNode::new(
318
function.clone(),
319
*maintain_order,
320
*chunk_size,
321
),
322
[(input_key, input.port)],
323
)
324
},
325
326
FileSink {
327
input,
328
options:
329
FileSinkOptions {
330
target,
331
file_format,
332
unified_sink_args,
333
},
334
} => {
335
use crate::nodes::io_sinks::IOSinkNode;
336
use crate::nodes::io_sinks::config::{IOSinkNodeConfig, IOSinkTarget};
337
338
let input_schema = ctx.phys_sm[input.node].output_schema.clone();
339
let input_key = to_graph_rec(input.node, ctx)?;
340
341
let target = IOSinkTarget::File(target.clone());
342
343
let config = IOSinkNodeConfig {
344
file_format: file_format.clone(),
345
target,
346
unified_sink_args: unified_sink_args.clone(),
347
input_schema,
348
};
349
350
ctx.graph
351
.add_node(IOSinkNode::new(config), [(input_key, input.port)])
352
},
353
354
PartitionedSink {
355
input,
356
options:
357
PartitionedSinkOptionsIR {
358
base_path,
359
file_path_provider,
360
partition_strategy,
361
file_format,
362
unified_sink_args,
363
max_rows_per_file,
364
approximate_bytes_per_file,
365
},
366
} => {
367
use crate::nodes::io_sinks::IOSinkNode;
368
use crate::nodes::io_sinks::components::exclude_keys_projection::ExcludeKeysProjection;
369
use crate::nodes::io_sinks::components::hstack_columns::HStackColumns;
370
use crate::nodes::io_sinks::components::partitioner::{KeyedPartitioner, Partitioner};
371
use crate::nodes::io_sinks::components::size::{
372
NonZeroRowCountAndSize, RowCountAndSize,
373
};
374
use crate::nodes::io_sinks::config::{
375
IOSinkNodeConfig, IOSinkTarget, PartitionedTarget,
376
};
377
378
let input_schema = ctx.phys_sm[input.node].output_schema.clone();
379
let input_key = to_graph_rec(input.node, ctx)?;
380
381
let file_schema: SchemaRef;
382
let mut hstack_keys: Option<HStackColumns> = None;
383
let mut include_keys_in_file = false;
384
385
let partitioner: Partitioner = match partition_strategy {
386
PartitionStrategyIR::Keyed {
387
keys,
388
include_keys,
389
keys_pre_grouped: _,
390
} => {
391
include_keys_in_file = *include_keys;
392
393
let mut key_schema = Schema::with_capacity(keys.len());
394
395
let key_exprs = keys
396
.iter()
397
.map(|e| {
398
let field = e.field(input_schema.as_ref(), ctx.expr_arena)?;
399
key_schema.extend([field]);
400
401
create_stream_expr(e, ctx, &input_schema)
402
})
403
.collect::<PolarsResult<_>>()?;
404
405
let exclude_keys_projection: Arc<[usize]> = input_schema
406
.iter_names()
407
.enumerate()
408
.filter_map(|(i, name)| (!key_schema.contains(name)).then_some(i))
409
.collect::<Arc<[_]>>();
410
411
let exclude_keys_projection =
412
if exclude_keys_projection.len() == input_schema.len() {
413
ExcludeKeysProjection::Width(exclude_keys_projection.len())
414
} else {
415
ExcludeKeysProjection::Indices(exclude_keys_projection)
416
};
417
418
let schema_excluding_keys: Schema = exclude_keys_projection
419
.iter_indices()
420
.map(|i| {
421
let (name, dtype) = input_schema.get_at_index(i).unwrap();
422
(name.clone(), dtype.clone())
423
})
424
.collect();
425
426
let mut schema_including_keys = Arc::unwrap_or_clone(input_schema.clone());
427
428
for (name, dtype) in key_schema.iter() {
429
schema_including_keys.with_column(name.clone(), dtype.clone());
430
}
431
432
let schema_including_keys = Arc::new(schema_including_keys);
433
434
hstack_keys = Some(HStackColumns::new(
435
&schema_including_keys,
436
&schema_excluding_keys,
437
&key_schema,
438
));
439
440
file_schema = if *include_keys {
441
Arc::clone(&schema_including_keys)
442
} else {
443
Arc::new(schema_excluding_keys)
444
};
445
446
let keyed = KeyedPartitioner {
447
key_exprs,
448
exclude_keys_projection: Some(exclude_keys_projection),
449
};
450
451
Partitioner::Keyed(keyed)
452
},
453
PartitionStrategyIR::FileSize => {
454
file_schema = input_schema.clone();
455
Partitioner::FileSize
456
},
457
};
458
459
let mut file_size_limit = RowCountAndSize::MAX;
460
461
if *max_rows_per_file > 0 {
462
file_size_limit.num_rows = *max_rows_per_file
463
}
464
465
if *approximate_bytes_per_file > 0 {
466
file_size_limit.num_bytes = *approximate_bytes_per_file
467
}
468
469
let file_size_limit = (file_size_limit != RowCountAndSize::MAX)
470
.then_some(NonZeroRowCountAndSize::new(file_size_limit).unwrap());
471
472
let target = IOSinkTarget::Partitioned(Box::new(PartitionedTarget {
473
base_path: base_path.clone(),
474
file_path_provider: file_path_provider.clone(),
475
partitioner,
476
hstack_keys,
477
include_keys_in_file,
478
file_schema,
479
file_size_limit,
480
}));
481
482
let config = IOSinkNodeConfig {
483
file_format: file_format.clone(),
484
target,
485
unified_sink_args: unified_sink_args.clone(),
486
input_schema,
487
};
488
489
ctx.graph
490
.add_node(IOSinkNode::new(config), [(input_key, input.port)])
491
},
492
493
InMemoryMap {
494
input,
495
map,
496
format_str: _,
497
} => {
498
let input_schema = ctx.phys_sm[input.node].output_schema.clone();
499
let input_key = to_graph_rec(input.node, ctx)?;
500
ctx.graph.add_node(
501
nodes::in_memory_map::InMemoryMapNode::new(input_schema, map.clone()),
502
[(input_key, input.port)],
503
)
504
},
505
506
Map {
507
input,
508
map,
509
format_str: _,
510
} => {
511
let input_key = to_graph_rec(input.node, ctx)?;
512
ctx.graph.add_node(
513
nodes::map::MapNode::new(map.clone()),
514
[(input_key, input.port)],
515
)
516
},
517
518
SortedGroupBy {
519
input,
520
key,
521
aggs,
522
slice,
523
} => {
524
let input_schema = ctx.phys_sm[input.node].output_schema.clone();
525
let input_key = to_graph_rec(input.node, ctx)?;
526
let aggs = aggs
527
.iter()
528
.map(|e| {
529
Ok((
530
e.output_name().clone(),
531
create_stream_expr(e, ctx, &input_schema)?,
532
))
533
})
534
.collect::<PolarsResult<Arc<[_]>>>()?;
535
536
ctx.graph.add_node(
537
nodes::sorted_group_by::SortedGroupBy::new(key.clone(), aggs, *slice, input_schema),
538
[(input_key, input.port)],
539
)
540
},
541
542
Sort {
543
input,
544
by_column,
545
slice,
546
sort_options,
547
} => {
548
let input_schema = ctx.phys_sm[input.node].output_schema.clone();
549
let lmdf = Arc::new(LateMaterializedDataFrame::default());
550
let mut lp_arena = Arena::default();
551
let df_node = lp_arena.add(lmdf.clone().as_ir_node(input_schema.clone()));
552
let sort_node = lp_arena.add(IR::Sort {
553
input: df_node,
554
by_column: by_column.clone(),
555
slice: slice.map(|t| (t.0, t.1, None)),
556
sort_options: sort_options.clone(),
557
});
558
let executor = Mutex::new(create_physical_plan(
559
sort_node,
560
&mut lp_arena,
561
ctx.expr_arena,
562
Some(crate::dispatch::build_streaming_query_executor),
563
)?);
564
565
let input_key = to_graph_rec(input.node, ctx)?;
566
ctx.graph.add_node(
567
nodes::in_memory_map::InMemoryMapNode::new(
568
input_schema,
569
Arc::new(move |df| {
570
lmdf.set_materialized_dataframe(df);
571
let mut state = ExecutionState::new();
572
executor.lock().execute(&mut state)
573
}),
574
),
575
[(input_key, input.port)],
576
)
577
},
578
579
TopK {
580
input,
581
k,
582
by_column,
583
reverse,
584
nulls_last,
585
dyn_pred,
586
} => {
587
let input_key = to_graph_rec(input.node, ctx)?;
588
let k_key = to_graph_rec(k.node, ctx)?;
589
590
let k_schema = ctx.phys_sm[k.node].output_schema.clone();
591
let input_schema = &ctx.phys_sm[input.node].output_schema;
592
let key_schema = compute_output_schema(input_schema, by_column, ctx.expr_arena)?;
593
594
let key_selectors = by_column
595
.iter()
596
.map(|e| create_stream_expr(e, ctx, input_schema))
597
.try_collect_vec()?;
598
599
ctx.graph.add_node(
600
nodes::top_k::TopKNode::new(
601
k_schema,
602
reverse.clone(),
603
nulls_last.clone(),
604
key_schema,
605
key_selectors,
606
dyn_pred.clone(),
607
),
608
[(input_key, input.port), (k_key, k.port)],
609
)
610
},
611
612
Repeat { value, repeats } => {
613
let value_key = to_graph_rec(value.node, ctx)?;
614
let repeats_key = to_graph_rec(repeats.node, ctx)?;
615
let value_schema = ctx.phys_sm[value.node].output_schema.clone();
616
let repeats_schema = ctx.phys_sm[repeats.node].output_schema.clone();
617
ctx.graph.add_node(
618
nodes::repeat::RepeatNode::new(value_schema, repeats_schema),
619
[(value_key, value.port), (repeats_key, repeats.port)],
620
)
621
},
622
623
#[cfg(feature = "cum_agg")]
624
CumAgg { input, kind } => {
625
let input_key = to_graph_rec(input.node, ctx)?;
626
ctx.graph.add_node(
627
nodes::cum_agg::CumAggNode::new(*kind),
628
[(input_key, input.port)],
629
)
630
},
631
632
GatherEvery { input, n, offset } => {
633
let (n, offset) = (*n, *offset);
634
let input_key = to_graph_rec(input.node, ctx)?;
635
ctx.graph.add_node(
636
nodes::gather_every::GatherEveryNode::new(n, offset)?,
637
[(input_key, input.port)],
638
)
639
},
640
641
Rle(input) => {
642
let input_key = to_graph_rec(input.node, ctx)?;
643
let input_schema = &ctx.phys_sm[input.node].output_schema;
644
assert_eq!(input_schema.len(), 1);
645
let (name, dtype) = input_schema.get_at_index(0).unwrap();
646
ctx.graph.add_node(
647
nodes::rle::RleNode::new(name.clone(), dtype.clone()),
648
[(input_key, input.port)],
649
)
650
},
651
652
RleId(input) => {
653
let input_key = to_graph_rec(input.node, ctx)?;
654
let input_schema = &ctx.phys_sm[input.node].output_schema;
655
assert_eq!(input_schema.len(), 1);
656
let (_, dtype) = input_schema.get_at_index(0).unwrap();
657
ctx.graph.add_node(
658
nodes::rle_id::RleIdNode::new(dtype.clone()),
659
[(input_key, input.port)],
660
)
661
},
662
663
PeakMinMax { input, is_peak_max } => {
664
let input_key = to_graph_rec(input.node, ctx)?;
665
ctx.graph.add_node(
666
nodes::peak_minmax::PeakMinMaxNode::new(*is_peak_max),
667
[(input_key, input.port)],
668
)
669
},
670
671
OrderedUnion { inputs } => {
672
let input_keys = inputs
673
.iter()
674
.map(|i| PolarsResult::Ok((to_graph_rec(i.node, ctx)?, i.port)))
675
.try_collect_vec()?;
676
ctx.graph.add_node(
677
nodes::ordered_union::OrderedUnionNode::new(node.output_schema.clone()),
678
input_keys,
679
)
680
},
681
682
UnorderedUnion { inputs } => {
683
let input_keys = inputs
684
.iter()
685
.map(|i| PolarsResult::Ok((to_graph_rec(i.node, ctx)?, i.port)))
686
.try_collect_vec()?;
687
ctx.graph.add_node(
688
nodes::unordered_union::UnorderedUnionNode::new(node.output_schema.clone()),
689
input_keys,
690
)
691
},
692
693
Zip {
694
inputs,
695
zip_behavior,
696
} => {
697
let input_schemas = inputs
698
.iter()
699
.map(|i| ctx.phys_sm[i.node].output_schema.clone())
700
.collect_vec();
701
let input_keys = inputs
702
.iter()
703
.map(|i| PolarsResult::Ok((to_graph_rec(i.node, ctx)?, i.port)))
704
.try_collect_vec()?;
705
ctx.graph.add_node(
706
nodes::zip::ZipNode::new(*zip_behavior, input_schemas),
707
input_keys,
708
)
709
},
710
711
Multiplexer { input } => {
712
let input_key = to_graph_rec(input.node, ctx)?;
713
ctx.graph.add_node(
714
nodes::multiplexer::MultiplexerNode::new(),
715
[(input_key, input.port)],
716
)
717
},
718
719
MultiScan {
720
scan_sources,
721
file_reader_builder,
722
cloud_options,
723
file_projection_builder,
724
output_schema,
725
row_index,
726
pre_slice,
727
predicate,
728
predicate_file_skip_applied,
729
hive_parts,
730
missing_columns_policy,
731
cast_columns_policy,
732
include_file_paths,
733
forbid_extra_columns,
734
deletion_files,
735
table_statistics,
736
file_schema,
737
disable_morsel_split,
738
} => {
739
let hive_parts = hive_parts.clone();
740
741
let predicate = predicate
742
.as_ref()
743
.map(|pred| {
744
create_scan_predicate(
745
pred,
746
ctx.expr_arena,
747
output_schema,
748
hive_parts.as_ref().map(|hp| hp.df().schema().as_ref()),
749
&mut ctx.expr_conversion_state,
750
true, // create_skip_batch_predicate
751
file_reader_builder
752
.reader_capabilities()
753
.contains(ReaderCapabilities::PARTIAL_FILTER), // create_column_predicates
754
)
755
})
756
.transpose()?
757
.map(|p| p.to_io(None, file_schema.clone()));
758
let predicate_file_skip_applied = *predicate_file_skip_applied;
759
760
let sources = scan_sources.clone();
761
let file_reader_builder = file_reader_builder.clone();
762
let cloud_options = cloud_options.clone();
763
764
let final_output_schema = output_schema.clone();
765
let file_projection_builder = file_projection_builder.clone();
766
767
let row_index = row_index.clone();
768
let pre_slice = pre_slice.clone();
769
let hive_parts = hive_parts.map(Arc::new);
770
let include_file_paths = include_file_paths.clone();
771
let missing_columns_policy = *missing_columns_policy;
772
let forbid_extra_columns = forbid_extra_columns.clone();
773
let cast_columns_policy = cast_columns_policy.clone();
774
let deletion_files = deletion_files.clone();
775
let table_statistics = table_statistics.clone();
776
let disable_morsel_split = *disable_morsel_split;
777
778
let verbose = config::verbose();
779
780
ctx.graph.add_node(
781
nodes::io_sources::multi_scan::MultiScan::new(Arc::new(MultiScanConfig {
782
sources,
783
file_reader_builder,
784
cloud_options,
785
final_output_schema,
786
file_projection_builder,
787
row_index,
788
pre_slice,
789
predicate,
790
predicate_file_skip_applied,
791
hive_parts,
792
include_file_paths,
793
missing_columns_policy,
794
forbid_extra_columns,
795
cast_columns_policy,
796
deletion_files,
797
table_statistics,
798
// Initialized later
799
num_pipelines: RelaxedCell::new_usize(0),
800
n_readers_pre_init: RelaxedCell::new_usize(0),
801
max_concurrent_scans: RelaxedCell::new_usize(0),
802
disable_morsel_split,
803
io_metrics: OnceLock::default(),
804
verbose,
805
})),
806
[],
807
)
808
},
809
810
GroupBy {
811
inputs,
812
key_per_input,
813
aggs_per_input,
814
} => {
815
let mut key_ports = Vec::new();
816
let mut key_schema_per_input = Vec::new();
817
let mut key_selectors_per_input = Vec::new();
818
let mut reductions_per_input = Vec::new();
819
let mut grouped_reductions = Vec::new();
820
let mut grouped_reduction_cols = Vec::new();
821
let mut has_order_sensitive_agg = false;
822
for ((input, key), aggs) in inputs.iter().zip(key_per_input).zip(aggs_per_input) {
823
let input_key = to_graph_rec(input.node, ctx)?;
824
key_ports.push((input_key, input.port));
825
826
let input_schema = &ctx.phys_sm[input.node].output_schema;
827
let key_schema = compute_output_schema(input_schema, key, ctx.expr_arena)?;
828
key_schema_per_input.push(key_schema);
829
830
let key_selectors = key
831
.iter()
832
.map(|e| create_stream_expr(e, ctx, input_schema))
833
.try_collect_vec()?;
834
key_selectors_per_input.push(key_selectors);
835
836
let mut reductions_for_this_input = Vec::new();
837
for agg in aggs {
838
has_order_sensitive_agg |= matches!(
839
ctx.expr_arena.get(agg.node()),
840
AExpr::Agg(
841
IRAggExpr::First(_)
842
| IRAggExpr::FirstNonNull(_)
843
| IRAggExpr::Last(_)
844
| IRAggExpr::LastNonNull(_)
845
)
846
);
847
let (reduction, input_nodes) =
848
into_reduction(agg.node(), ctx.expr_arena, input_schema, true)?;
849
let cols = input_nodes
850
.iter()
851
.map(|node| {
852
let AExpr::Column(col) = ctx.expr_arena.get(*node) else {
853
unreachable!()
854
};
855
col.clone()
856
})
857
.collect();
858
reductions_for_this_input.push(grouped_reductions.len());
859
grouped_reductions.push(reduction);
860
grouped_reduction_cols.push(cols);
861
}
862
863
reductions_per_input.push(reductions_for_this_input);
864
}
865
866
let key_schema = key_schema_per_input.swap_remove(0);
867
assert!(key_schema_per_input.iter().all(|s| **s == *key_schema));
868
869
let grouper = new_hash_grouper(key_schema.clone());
870
ctx.graph.add_node(
871
nodes::group_by::GroupByNode::new(
872
key_schema,
873
key_selectors_per_input,
874
reductions_per_input,
875
grouper,
876
grouped_reduction_cols,
877
grouped_reductions,
878
node.output_schema.clone(),
879
PlRandomState::default(),
880
ctx.num_pipelines,
881
has_order_sensitive_agg,
882
),
883
key_ports,
884
)
885
},
886
887
#[cfg(feature = "dynamic_group_by")]
888
DynamicGroupBy {
889
input,
890
options,
891
aggs,
892
slice,
893
} => {
894
let input_schema = &ctx.phys_sm[input.node].output_schema;
895
let input_key = to_graph_rec(input.node, ctx)?;
896
let aggs = aggs
897
.iter()
898
.map(|e| {
899
Ok((
900
e.output_name().clone(),
901
create_stream_expr(e, ctx, input_schema)?,
902
))
903
})
904
.collect::<PolarsResult<Arc<[_]>>>()?;
905
ctx.graph.add_node(
906
nodes::dynamic_group_by::DynamicGroupBy::new(
907
input_schema.clone(),
908
options.clone(),
909
aggs,
910
*slice,
911
)?,
912
[(input_key, input.port)],
913
)
914
},
915
#[cfg(feature = "dynamic_group_by")]
916
RollingGroupBy {
917
input,
918
index_column,
919
period,
920
offset,
921
closed,
922
slice,
923
aggs,
924
} => {
925
let input_schema = &ctx.phys_sm[input.node].output_schema;
926
let input_key = to_graph_rec(input.node, ctx)?;
927
let aggs = aggs
928
.iter()
929
.map(|e| {
930
Ok((
931
e.output_name().clone(),
932
create_stream_expr(e, ctx, input_schema)?,
933
))
934
})
935
.collect::<PolarsResult<Arc<[_]>>>()?;
936
ctx.graph.add_node(
937
nodes::rolling_group_by::RollingGroupBy::new(
938
input_schema.clone(),
939
index_column.clone(),
940
*period,
941
*offset,
942
*closed,
943
*slice,
944
aggs,
945
)?,
946
[(input_key, input.port)],
947
)
948
},
949
950
InMemoryJoin {
951
input_left,
952
input_right,
953
left_on,
954
right_on,
955
args,
956
options,
957
} => {
958
let left_input_key = to_graph_rec(input_left.node, ctx)?;
959
let right_input_key = to_graph_rec(input_right.node, ctx)?;
960
let left_input_schema = ctx.phys_sm[input_left.node].output_schema.clone();
961
let right_input_schema = ctx.phys_sm[input_right.node].output_schema.clone();
962
963
let mut lp_arena = Arena::default();
964
let left_lmdf = Arc::new(LateMaterializedDataFrame::default());
965
let right_lmdf = Arc::new(LateMaterializedDataFrame::default());
966
967
let left_node = lp_arena.add(left_lmdf.clone().as_ir_node(left_input_schema.clone()));
968
let right_node =
969
lp_arena.add(right_lmdf.clone().as_ir_node(right_input_schema.clone()));
970
let join_node = lp_arena.add(IR::Join {
971
input_left: left_node,
972
input_right: right_node,
973
schema: node.output_schema.clone(),
974
left_on: left_on.clone(),
975
right_on: right_on.clone(),
976
options: Arc::new(JoinOptionsIR {
977
allow_parallel: true,
978
force_parallel: false,
979
args: args.clone(),
980
options: options.clone(),
981
}),
982
});
983
984
let executor = Mutex::new(create_physical_plan(
985
join_node,
986
&mut lp_arena,
987
ctx.expr_arena,
988
Some(crate::dispatch::build_streaming_query_executor),
989
)?);
990
991
ctx.graph.add_node(
992
nodes::joins::in_memory::InMemoryJoinNode::new(
993
left_input_schema,
994
right_input_schema,
995
Arc::new(move |left, right| {
996
left_lmdf.set_materialized_dataframe(left);
997
right_lmdf.set_materialized_dataframe(right);
998
let mut state = ExecutionState::new();
999
executor.lock().execute(&mut state)
1000
}),
1001
),
1002
[
1003
(left_input_key, input_left.port),
1004
(right_input_key, input_right.port),
1005
],
1006
)
1007
},
1008
1009
EquiJoin {
1010
input_left,
1011
input_right,
1012
left_on,
1013
right_on,
1014
args,
1015
}
1016
| SemiAntiJoin {
1017
input_left,
1018
input_right,
1019
left_on,
1020
right_on,
1021
args,
1022
output_bool: _,
1023
} => {
1024
let args = args.clone();
1025
let left_input_key = to_graph_rec(input_left.node, ctx)?;
1026
let right_input_key = to_graph_rec(input_right.node, ctx)?;
1027
let left_input_schema = ctx.phys_sm[input_left.node].output_schema.clone();
1028
let right_input_schema = ctx.phys_sm[input_right.node].output_schema.clone();
1029
1030
let left_key_schema =
1031
compute_output_schema(&left_input_schema, left_on, ctx.expr_arena)?;
1032
let right_key_schema =
1033
compute_output_schema(&right_input_schema, right_on, ctx.expr_arena)?;
1034
1035
// We want to make sure here that the key types match otherwise we get out garbage out
1036
// since the hashes will be calculated differently.
1037
polars_ensure!(
1038
left_on.len() == right_on.len() &&
1039
left_on.iter().zip(right_on.iter()).all(|(l, r)| {
1040
let l_dtype = left_key_schema.get(l.output_name()).unwrap();
1041
let r_dtype = right_key_schema.get(r.output_name()).unwrap();
1042
l_dtype == r_dtype
1043
}),
1044
SchemaMismatch: "join received different key types on left and right side"
1045
);
1046
1047
// We use key columns entirely by position, and allow duplicate names in key selectors,
1048
// so just assign arbitrary unique names for the selectors.
1049
let unique_left_on = left_on
1050
.iter()
1051
.enumerate()
1052
.map(|(i, expr)| expr.with_alias(format_pl_smallstr!("__POLARS_KEYCOL_{i}")))
1053
.collect_vec();
1054
let unique_right_on = right_on
1055
.iter()
1056
.enumerate()
1057
.map(|(i, expr)| expr.with_alias(format_pl_smallstr!("__POLARS_KEYCOL_{i}")))
1058
.collect_vec();
1059
1060
let left_key_selectors = unique_left_on
1061
.iter()
1062
.map(|e| create_stream_expr(e, ctx, &left_input_schema))
1063
.try_collect_vec()?;
1064
let right_key_selectors = unique_right_on
1065
.iter()
1066
.map(|e| create_stream_expr(e, ctx, &right_input_schema))
1067
.try_collect_vec()?;
1068
1069
let unique_key_schema =
1070
compute_output_schema(&right_input_schema, &unique_left_on, ctx.expr_arena)?;
1071
1072
match node.kind {
1073
#[cfg(feature = "semi_anti_join")]
1074
SemiAntiJoin { output_bool, .. } => ctx.graph.add_node(
1075
nodes::joins::semi_anti_join::SemiAntiJoinNode::new(
1076
unique_key_schema,
1077
left_key_selectors,
1078
right_key_selectors,
1079
args,
1080
output_bool,
1081
ctx.num_pipelines,
1082
)?,
1083
[
1084
(left_input_key, input_left.port),
1085
(right_input_key, input_right.port),
1086
],
1087
),
1088
_ => ctx.graph.add_node(
1089
nodes::joins::equi_join::EquiJoinNode::new(
1090
left_input_schema,
1091
right_input_schema,
1092
left_key_schema,
1093
right_key_schema,
1094
unique_key_schema,
1095
left_key_selectors,
1096
right_key_selectors,
1097
args,
1098
ctx.num_pipelines,
1099
)?,
1100
[
1101
(left_input_key, input_left.port),
1102
(right_input_key, input_right.port),
1103
],
1104
),
1105
}
1106
},
1107
1108
MergeJoin {
1109
input_left,
1110
input_right,
1111
left_on,
1112
right_on,
1113
tmp_left_key_col,
1114
tmp_right_key_col,
1115
descending,
1116
nulls_last,
1117
keys_row_encoded,
1118
args,
1119
} => {
1120
let args = args.clone();
1121
let left_input_key = to_graph_rec(input_left.node, ctx)?;
1122
let right_input_key = to_graph_rec(input_right.node, ctx)?;
1123
let left_input_schema = ctx.phys_sm[input_left.node].output_schema.clone();
1124
let right_input_schema = ctx.phys_sm[input_right.node].output_schema.clone();
1125
let output_schema = node.output_schema.clone();
1126
1127
ctx.graph.add_node(
1128
MergeJoinNode::new(
1129
left_input_schema,
1130
right_input_schema,
1131
output_schema,
1132
left_on.clone(),
1133
right_on.clone(),
1134
tmp_left_key_col.clone(),
1135
tmp_right_key_col.clone(),
1136
*descending,
1137
*nulls_last,
1138
*keys_row_encoded,
1139
args,
1140
)?,
1141
[
1142
(left_input_key, input_left.port),
1143
(right_input_key, input_right.port),
1144
],
1145
)
1146
},
1147
1148
CrossJoin {
1149
input_left,
1150
input_right,
1151
args,
1152
} => {
1153
let args = args.clone();
1154
let left_input_key = to_graph_rec(input_left.node, ctx)?;
1155
let right_input_key = to_graph_rec(input_right.node, ctx)?;
1156
let left_input_schema = ctx.phys_sm[input_left.node].output_schema.clone();
1157
let right_input_schema = ctx.phys_sm[input_right.node].output_schema.clone();
1158
1159
ctx.graph.add_node(
1160
nodes::joins::cross_join::CrossJoinNode::new(
1161
left_input_schema,
1162
right_input_schema,
1163
&args,
1164
),
1165
[
1166
(left_input_key, input_left.port),
1167
(right_input_key, input_right.port),
1168
],
1169
)
1170
},
1171
1172
AsOfJoin {
1173
input_left,
1174
input_right,
1175
left_on,
1176
right_on,
1177
tmp_left_key_col,
1178
tmp_right_key_col,
1179
args,
1180
} => {
1181
let args = args.clone();
1182
let left_input_key = to_graph_rec(input_left.node, ctx)?;
1183
let right_input_key = to_graph_rec(input_right.node, ctx)?;
1184
let left_input_schema = ctx.phys_sm[input_left.node].output_schema.clone();
1185
let right_input_schema = ctx.phys_sm[input_right.node].output_schema.clone();
1186
#[cfg(feature = "asof_join")]
1187
{
1188
ctx.graph.add_node(
1189
nodes::joins::asof_join::AsOfJoinNode::new(
1190
left_input_schema,
1191
right_input_schema,
1192
left_on.clone(),
1193
right_on.clone(),
1194
tmp_left_key_col.clone(),
1195
tmp_right_key_col.clone(),
1196
args,
1197
),
1198
[
1199
(left_input_key, input_left.port),
1200
(right_input_key, input_right.port),
1201
],
1202
)
1203
}
1204
#[cfg(not(feature = "asof_join"))]
1205
{
1206
unreachable!("asof_join feature is disabled")
1207
}
1208
},
1209
1210
#[cfg(feature = "merge_sorted")]
1211
MergeSorted {
1212
input_left,
1213
input_right,
1214
} => {
1215
let left_input_key = to_graph_rec(input_left.node, ctx)?;
1216
let right_input_key = to_graph_rec(input_right.node, ctx)?;
1217
ctx.graph.add_node(
1218
nodes::merge_sorted::MergeSortedNode::new(),
1219
[
1220
(left_input_key, input_left.port),
1221
(right_input_key, input_right.port),
1222
],
1223
)
1224
},
1225
1226
#[cfg(feature = "python")]
1227
PythonScan { options } => {
1228
use polars_buffer::Buffer;
1229
use polars_plan::dsl::python_dsl::PythonScanSource as S;
1230
use polars_plan::plans::PythonPredicate;
1231
use polars_utils::relaxed_cell::RelaxedCell;
1232
use pyo3::exceptions::PyStopIteration;
1233
use pyo3::prelude::*;
1234
use pyo3::types::{PyBytes, PyNone};
1235
use pyo3::{IntoPyObjectExt, PyTypeInfo, intern};
1236
1237
let mut options = options.clone();
1238
let with_columns = options.with_columns.take();
1239
let n_rows = options.n_rows.take();
1240
1241
let python_scan_function = options.scan_fn.take().unwrap().0;
1242
1243
let with_columns = with_columns.map(|cols| cols.iter().cloned().collect::<Vec<_>>());
1244
1245
let (pl_predicate, predicate_serialized) = polars_mem_engine::python_scan_predicate(
1246
&mut options,
1247
ctx.expr_arena,
1248
&mut ctx.expr_conversion_state,
1249
)?;
1250
1251
let output_schema = options.output_schema.unwrap_or(options.schema);
1252
let validate_schema = options.validate_schema;
1253
1254
let simple_projection = with_columns.as_ref().and_then(|with_columns| {
1255
(with_columns
1256
.iter()
1257
.zip(output_schema.iter_names())
1258
.any(|(a, b)| a != b))
1259
.then(|| output_schema.clone())
1260
});
1261
1262
let (name, get_batch_fn) = match options.python_source {
1263
S::Pyarrow => todo!(),
1264
S::Cuda => todo!(),
1265
S::IOPlugin => {
1266
let batch_size = Some(get_ideal_morsel_size());
1267
let output_schema = output_schema.clone();
1268
1269
let with_columns = with_columns.map(|x| {
1270
x.into_iter()
1271
.map(|x| x.to_string())
1272
.collect::<Vec<String>>()
1273
});
1274
1275
// Setup the IO plugin generator.
1276
let (generator, can_parse_predicate) = {
1277
Python::attach(|py| {
1278
let pl = PyModule::import(py, intern!(py, "polars")).unwrap();
1279
let utils = pl.getattr(intern!(py, "_utils")).unwrap();
1280
let callable =
1281
utils.getattr(intern!(py, "_execute_from_rust")).unwrap();
1282
1283
let mut could_serialize_predicate = true;
1284
let predicate = match &options.predicate {
1285
PythonPredicate::PyArrow(s) => s.into_bound_py_any(py).unwrap(),
1286
PythonPredicate::None => None::<()>.into_bound_py_any(py).unwrap(),
1287
PythonPredicate::Polars(_) => {
1288
assert!(pl_predicate.is_some(), "should be set");
1289
match &predicate_serialized {
1290
None => {
1291
could_serialize_predicate = false;
1292
PyNone::get(py).to_owned().into_any()
1293
},
1294
Some(buf) => PyBytes::new(py, buf).into_any(),
1295
}
1296
},
1297
};
1298
1299
let args = (
1300
python_scan_function,
1301
with_columns,
1302
predicate,
1303
n_rows,
1304
batch_size,
1305
);
1306
1307
let generator_init = callable.call1(args)?;
1308
let generator = generator_init.get_item(0).map_err(
1309
|_| polars_err!(ComputeError: "expected tuple got {generator_init}"),
1310
)?;
1311
let can_parse_predicate = generator_init.get_item(1).map_err(
1312
|_| polars_err!(ComputeError: "expected tuple got {generator}"),
1313
)?;
1314
let can_parse_predicate = can_parse_predicate.extract::<bool>().map_err(
1315
|_| polars_err!(ComputeError: "expected bool got {can_parse_predicate}"),
1316
)? && could_serialize_predicate;
1317
1318
let generator = generator.into_py_any(py).map_err(
1319
|_| polars_err!(ComputeError: "unable to grab reference to IO plugin generator"),
1320
)?;
1321
1322
PolarsResult::Ok((generator, can_parse_predicate))
1323
})
1324
}?;
1325
1326
let get_batch_fn = Box::new(move |state: &StreamingExecutionState| {
1327
let df = Python::attach(|py| {
1328
match generator.bind(py).call_method0(intern!(py, "__next__")) {
1329
Ok(out) => polars_plan::plans::python_df_to_rust(py, out).map(Some),
1330
Err(err)
1331
if err.matches(py, PyStopIteration::type_object(py))? =>
1332
{
1333
Ok(None)
1334
},
1335
Err(err) => polars_bail!(
1336
ComputeError: "caught exception during execution of a Python source, exception: {err}"
1337
),
1338
}
1339
})?;
1340
1341
let Some(mut df) = df else { return Ok(None) };
1342
1343
if let Some(simple_projection) = &simple_projection {
1344
df = unsafe {
1345
df.select_unchecked(simple_projection.iter_names())?
1346
.with_schema(simple_projection.clone())
1347
};
1348
}
1349
1350
if validate_schema {
1351
polars_ensure!(
1352
df.schema() == &output_schema,
1353
SchemaMismatch: "user provided schema: {:?} doesn't match the DataFrame schema: {:?}",
1354
output_schema, df.schema()
1355
);
1356
}
1357
1358
// TODO: Move this to a FilterNode so that it happens in parallel. We may need
1359
// to move all of the enclosing code to `lower_ir` for this.
1360
if let (Some(pred), false) = (&pl_predicate, can_parse_predicate) {
1361
let mask = pred.evaluate(&df, &state.in_memory_exec_state)?;
1362
df = df.filter(mask.bool()?)?;
1363
}
1364
1365
Ok(Some(df))
1366
}) as Box<_>;
1367
1368
(PlSmallStr::from_static("io_plugin"), get_batch_fn)
1369
},
1370
};
1371
1372
use polars_plan::dsl::{CastColumnsPolicy, MissingColumnsPolicy};
1373
1374
use crate::nodes::io_sources::batch::builder::BatchFnReaderBuilder;
1375
use crate::nodes::io_sources::batch::{BatchFnReader, GetBatchState};
1376
use crate::nodes::io_sources::multi_scan::components::projection::builder::ProjectionBuilder;
1377
1378
let reader = BatchFnReader {
1379
name: name.clone(),
1380
// If validate_schema is false, the schema of the morsels may not match the
1381
// configured schema. In this case we set this to `None` and the reader will
1382
// retrieve the schema from the first morsel.
1383
output_schema: validate_schema.then(|| output_schema.clone()),
1384
get_batch_state: Some(GetBatchState::from(get_batch_fn)),
1385
execution_state: None,
1386
verbose: config::verbose(),
1387
};
1388
1389
let file_reader_builder = Arc::new(BatchFnReaderBuilder {
1390
name,
1391
reader: std::sync::Mutex::new(Some(reader)),
1392
execution_state: Default::default(),
1393
}) as Arc<dyn FileReaderBuilder>;
1394
1395
// Give multiscan a single scan source. (It doesn't actually read from this).
1396
let sources = ScanSources::Paths(Buffer::from_iter([PlRefPath::new("python-scan-0")]));
1397
let cloud_options = None;
1398
let final_output_schema = output_schema.clone();
1399
let file_projection_builder = ProjectionBuilder::new(output_schema, None, None);
1400
let row_index = None;
1401
let pre_slice = None;
1402
let predicate = None;
1403
let predicate_file_skip_applied = None;
1404
let hive_parts = None;
1405
let include_file_paths = None;
1406
let missing_columns_policy = MissingColumnsPolicy::Raise;
1407
let forbid_extra_columns = None;
1408
let cast_columns_policy = CastColumnsPolicy::ERROR_ON_MISMATCH;
1409
let deletion_files = None;
1410
let table_statistics = None;
1411
let disable_morsel_split = false;
1412
let verbose = config::verbose();
1413
1414
ctx.graph.add_node(
1415
nodes::io_sources::multi_scan::MultiScan::new(Arc::new(MultiScanConfig {
1416
sources,
1417
file_reader_builder,
1418
cloud_options,
1419
final_output_schema,
1420
file_projection_builder,
1421
row_index,
1422
pre_slice,
1423
predicate,
1424
predicate_file_skip_applied,
1425
hive_parts,
1426
include_file_paths,
1427
missing_columns_policy,
1428
forbid_extra_columns,
1429
cast_columns_policy,
1430
deletion_files,
1431
table_statistics,
1432
// Initialized later
1433
num_pipelines: RelaxedCell::new_usize(0),
1434
n_readers_pre_init: RelaxedCell::new_usize(0),
1435
max_concurrent_scans: RelaxedCell::new_usize(0),
1436
disable_morsel_split,
1437
io_metrics: OnceLock::default(),
1438
verbose,
1439
})),
1440
[],
1441
)
1442
},
1443
1444
#[cfg(feature = "ewma")]
1445
ewm_variant @ EwmMean { input, options }
1446
| ewm_variant @ EwmVar { input, options }
1447
| ewm_variant @ EwmStd { input, options } => {
1448
use nodes::ewm::EwmNode;
1449
use polars_compute::ewm::mean::EwmMeanState;
1450
use polars_compute::ewm::{EwmCovState, EwmStateUpdate, EwmStdState, EwmVarState};
1451
use polars_core::with_match_physical_float_type;
1452
1453
let input_key = to_graph_rec(input.node, ctx)?;
1454
let input_schema = &ctx.phys_sm[input.node].output_schema;
1455
let (_, dtype) = input_schema.get_at_index(0).unwrap();
1456
1457
let state: Box<dyn EwmStateUpdate + Send> = match ewm_variant {
1458
EwmMean { .. } => {
1459
with_match_physical_float_type!(dtype, |$T| {
1460
let state: EwmMeanState<$T> = EwmMeanState::new(
1461
AsPrimitive::<$T>::as_(options.alpha),
1462
options.adjust,
1463
options.min_periods,
1464
options.ignore_nulls,
1465
);
1466
1467
Box::new(state)
1468
})
1469
},
1470
_ => with_match_physical_float_type!(dtype, |$T| {
1471
let state: EwmCovState<$T> = EwmCovState::new(
1472
AsPrimitive::<$T>::as_(options.alpha),
1473
options.adjust,
1474
options.bias,
1475
options.min_periods,
1476
options.ignore_nulls,
1477
);
1478
1479
match ewm_variant {
1480
EwmVar { .. } => Box::new(EwmVarState::new(state)),
1481
EwmStd { .. } => Box::new(EwmStdState::new(state)),
1482
_ => unreachable!(),
1483
}
1484
}),
1485
};
1486
1487
let name = match ewm_variant {
1488
EwmMean { .. } => "ewm-mean",
1489
EwmVar { .. } => "ewm-var",
1490
EwmStd { .. } => "ewm-std",
1491
_ => unreachable!(),
1492
};
1493
1494
let node = EwmNode::new(name, state);
1495
1496
ctx.graph.add_node(node, [(input_key, input.port)])
1497
},
1498
};
1499
1500
ctx.phys_to_graph.insert(phys_node_key, graph_key);
1501
Ok(graph_key)
1502
}
1503
1504