Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/optimizer/expand_datasets.rs
8446 views
1
use std::fmt::Debug;
2
use std::sync::Arc;
3
4
use polars_core::config;
5
use polars_core::error::{PolarsResult, polars_bail};
6
use polars_utils::arena::{Arena, Node};
7
use polars_utils::pl_str::PlSmallStr;
8
#[cfg(feature = "python")]
9
use polars_utils::python_function::PythonObject;
10
use polars_utils::slice_enum::Slice;
11
use polars_utils::{format_pl_smallstr, unitvec};
12
13
#[cfg(feature = "python")]
14
use crate::dsl::python_dsl::PythonScanSource;
15
use crate::dsl::{DslPlan, FileScanIR, UnifiedScanArgs};
16
use crate::plans::{AExpr, IR};
17
18
pub(super) fn expand_datasets(
19
root: Node,
20
ir_arena: &mut Arena<IR>,
21
expr_arena: &mut Arena<AExpr>,
22
apply_scan_predicate_to_scan_ir: fn(
23
Node,
24
&mut Arena<IR>,
25
&mut Arena<AExpr>,
26
) -> PolarsResult<()>,
27
) -> PolarsResult<()> {
28
let mut stack = unitvec![root];
29
30
while let Some(node) = stack.pop() {
31
ir_arena.get(node).copy_inputs(&mut stack);
32
33
let IR::Scan {
34
sources,
35
scan_type,
36
unified_scan_args,
37
38
file_info,
39
hive_parts,
40
predicate,
41
predicate_file_skip_applied: _,
42
output_schema: _,
43
} = ir_arena.get_mut(node)
44
else {
45
continue;
46
};
47
48
let mut projection = unified_scan_args.projection.clone();
49
50
if let Some(row_index) = &unified_scan_args.row_index
51
&& let Some(projection) = projection.as_mut()
52
{
53
*projection = projection
54
.iter()
55
.filter(|x| *x != &row_index.name)
56
.cloned()
57
.collect();
58
}
59
60
let limit = match unified_scan_args.pre_slice.clone() {
61
Some(v @ Slice::Positive { .. }) => Some(v.end_position()),
62
_ => None,
63
};
64
65
match scan_type.as_mut() {
66
#[cfg(feature = "python")]
67
FileScanIR::PythonDataset {
68
dataset_object,
69
cached_ir,
70
} => {
71
use crate::plans::pyarrow::predicate_to_pa;
72
73
let cached_ir = cached_ir.clone();
74
let mut guard = cached_ir.lock().unwrap();
75
76
if config::verbose() {
77
eprintln!(
78
"expand_datasets(): python[{}]: limit: {:?}, project: {}",
79
dataset_object.name(),
80
limit,
81
projection.as_ref().map_or(
82
PlSmallStr::from_static("all"),
83
|x| format_pl_smallstr!("{}", x.len())
84
)
85
)
86
}
87
88
// Note
89
// row_index is removed from projection/live_columns set, and is therefore not
90
// considered when comparing cached expansion equality. This is safe as the
91
// `row_index_in_live_filter` variable does not depend on the cached values.
92
93
let mut row_index_in_live_filter = false;
94
95
let live_filter_columns: Option<Arc<[PlSmallStr]>> = predicate.as_ref().map(|x| {
96
use polars_core::prelude::PlHashSet;
97
98
use crate::utils::aexpr_to_leaf_names_iter;
99
100
let mut out: Arc<[PlSmallStr]> =
101
PlHashSet::from_iter(aexpr_to_leaf_names_iter(x.node(), expr_arena))
102
.into_iter()
103
.filter(|&live_col| {
104
if unified_scan_args
105
.row_index
106
.as_ref()
107
.is_some_and(|ri| live_col == &ri.name)
108
{
109
row_index_in_live_filter = true;
110
false
111
} else {
112
true
113
}
114
})
115
.cloned()
116
.collect();
117
118
Arc::get_mut(&mut out).unwrap().sort_unstable();
119
120
out
121
});
122
123
let pyarrow_predicate: Option<String> = if !unified_scan_args
124
.has_row_index_or_slice()
125
&& let Some(predicate) = &predicate
126
{
127
use crate::plans::aexpr::MintermIter;
128
129
// Convert minterms independently, can allow conversion to partially succeed if there are unsupported expressions
130
let parts: Vec<String> = MintermIter::new(predicate.node(), expr_arena)
131
.filter_map(|node| predicate_to_pa(node, expr_arena, Default::default()))
132
.collect();
133
match parts.len() {
134
0 => None,
135
1 => Some(parts.into_iter().next().unwrap()),
136
_ => Some(format!("({})", parts.join(" & "))),
137
}
138
} else {
139
None
140
};
141
142
let existing_resolved_version_key = match guard.as_ref() {
143
Some(resolved) => {
144
let ExpandedDataset {
145
version,
146
limit: cached_limit,
147
projection: cached_projection,
148
live_filter_columns: cached_live_filter_columns,
149
pyarrow_predicate: cached_pyarrow_predicate,
150
expanded_dsl: _,
151
python_scan: _,
152
} = resolved;
153
154
(&limit == cached_limit
155
&& &projection == cached_projection
156
&& &live_filter_columns == cached_live_filter_columns
157
&& &pyarrow_predicate == cached_pyarrow_predicate)
158
.then_some(version.as_str())
159
},
160
161
None => None,
162
};
163
164
if let Some((expanded_dsl, version)) = dataset_object.to_dataset_scan(
165
existing_resolved_version_key,
166
limit,
167
projection.as_deref(),
168
live_filter_columns.as_deref(),
169
pyarrow_predicate.as_deref(),
170
)? {
171
*guard = Some(ExpandedDataset {
172
version,
173
limit,
174
projection,
175
live_filter_columns,
176
pyarrow_predicate,
177
expanded_dsl,
178
python_scan: None,
179
})
180
}
181
182
let ExpandedDataset {
183
version: _,
184
limit: _,
185
projection: _,
186
live_filter_columns: _,
187
pyarrow_predicate: _,
188
expanded_dsl,
189
python_scan,
190
} = guard.as_mut().unwrap();
191
192
match expanded_dsl {
193
DslPlan::Scan {
194
sources: resolved_sources,
195
unified_scan_args: resolved_unified_scan_args,
196
scan_type: resolved_scan_type,
197
cached_ir: _,
198
} => {
199
use crate::dsl::FileScanDsl;
200
201
// We only want a few configuration flags from here (e.g. column casting config).
202
// The rest we either expect to be None (e.g. projection / row_index), or ignore.
203
let UnifiedScanArgs {
204
schema: _,
205
cloud_options,
206
hive_options,
207
rechunk,
208
cache,
209
glob: _,
210
hidden_file_prefix: _hidden_file_prefix @ None,
211
projection: _projection @ None,
212
column_mapping,
213
default_values,
214
row_index: _row_index @ None,
215
pre_slice: _pre_slice @ None,
216
cast_columns_policy,
217
missing_columns_policy,
218
extra_columns_policy,
219
include_file_paths: _include_file_paths @ None,
220
deletion_files,
221
table_statistics,
222
row_count,
223
} = resolved_unified_scan_args.as_ref()
224
else {
225
panic!(
226
"invalid scan args from python dataset resolve: {:?}",
227
&resolved_unified_scan_args
228
)
229
};
230
231
unified_scan_args.cloud_options = cloud_options.clone();
232
unified_scan_args.rechunk = *rechunk;
233
unified_scan_args.cache = *cache;
234
unified_scan_args.cast_columns_policy = cast_columns_policy.clone();
235
unified_scan_args.missing_columns_policy = *missing_columns_policy;
236
unified_scan_args.extra_columns_policy = *extra_columns_policy;
237
unified_scan_args.column_mapping = column_mapping.clone();
238
unified_scan_args.default_values = default_values.clone();
239
unified_scan_args.deletion_files = deletion_files.clone();
240
unified_scan_args.table_statistics = table_statistics.clone();
241
unified_scan_args.row_count = *row_count;
242
243
if row_index_in_live_filter {
244
use polars_core::prelude::{Column, DataType, IdxCa, IntoColumn};
245
use polars_core::series::IntoSeries;
246
247
let row_index_name =
248
&unified_scan_args.row_index.as_ref().unwrap().name;
249
let table_statistics =
250
unified_scan_args.table_statistics.as_mut().unwrap();
251
252
let statistics_df = Arc::make_mut(&mut table_statistics.0);
253
assert!(
254
!statistics_df
255
.schema()
256
.contains(&format_pl_smallstr!("{}_nc", row_index_name))
257
);
258
259
unsafe { statistics_df.columns_mut() }.extend([
260
IdxCa::from_vec(
261
format_pl_smallstr!("{}_nc", row_index_name),
262
vec![0],
263
)
264
.into_series()
265
.into_column()
266
.new_from_index(0, sources.len()),
267
Column::full_null(
268
format_pl_smallstr!("{}_min", row_index_name),
269
sources.len(),
270
&DataType::IDX_DTYPE,
271
),
272
Column::full_null(
273
format_pl_smallstr!("{}_max", row_index_name),
274
sources.len(),
275
&DataType::IDX_DTYPE,
276
),
277
]);
278
}
279
280
*sources = resolved_sources.clone();
281
282
**scan_type = match *resolved_scan_type.clone() {
283
#[cfg(feature = "csv")]
284
FileScanDsl::Csv { options } => FileScanIR::Csv { options },
285
286
#[cfg(feature = "ipc")]
287
FileScanDsl::Ipc { options } => FileScanIR::Ipc {
288
options,
289
metadata: None,
290
},
291
292
#[cfg(feature = "parquet")]
293
FileScanDsl::Parquet { options } => FileScanIR::Parquet {
294
options,
295
metadata: None,
296
},
297
298
#[cfg(feature = "json")]
299
FileScanDsl::NDJson { options } => FileScanIR::NDJson { options },
300
301
#[cfg(feature = "python")]
302
FileScanDsl::PythonDataset { dataset_object } => {
303
FileScanIR::PythonDataset {
304
dataset_object,
305
cached_ir: Default::default(),
306
}
307
},
308
309
#[cfg(feature = "scan_lines")]
310
FileScanDsl::Lines { name } => FileScanIR::Lines { name },
311
312
FileScanDsl::Anonymous {
313
options,
314
function,
315
file_info: _,
316
} => FileScanIR::Anonymous { options, function },
317
};
318
319
if hive_options.enabled == Some(true)
320
&& let Some(paths) = sources.as_paths()
321
{
322
use arrow::Either;
323
324
use crate::plans::hive::hive_partitions_from_paths;
325
326
let owned;
327
328
*hive_parts = hive_partitions_from_paths(
329
paths,
330
hive_options.hive_start_idx,
331
hive_options.schema.clone(),
332
match file_info.reader_schema.as_ref().unwrap() {
333
Either::Left(v) => {
334
use polars_core::schema::{Schema, SchemaExt as _};
335
336
owned = Some(Schema::from_arrow_schema(v.as_ref()));
337
owned.as_ref().unwrap()
338
},
339
Either::Right(v) => v.as_ref(),
340
},
341
hive_options.try_parse_dates,
342
)?;
343
}
344
},
345
346
DslPlan::PythonScan { options } => {
347
*python_scan = Some(ExpandedPythonScan {
348
name: dataset_object.name(),
349
scan_fn: options.scan_fn.clone().unwrap(),
350
variant: options.python_source.clone(),
351
})
352
},
353
354
dsl => {
355
polars_bail!(
356
ComputeError:
357
"unknown DSL when resolving python dataset scan: {}",
358
dsl.display()?
359
)
360
},
361
};
362
},
363
364
_ => {},
365
}
366
367
apply_scan_predicate_to_scan_ir(node, ir_arena, expr_arena)?;
368
}
369
370
Ok(())
371
}
372
373
#[derive(Clone)]
374
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
375
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
376
pub struct ExpandedDataset {
377
version: PlSmallStr,
378
limit: Option<usize>,
379
projection: Option<Arc<[PlSmallStr]>>,
380
live_filter_columns: Option<Arc<[PlSmallStr]>>,
381
pyarrow_predicate: Option<String>,
382
expanded_dsl: DslPlan,
383
384
/// Fallback python scan
385
#[cfg(feature = "python")]
386
python_scan: Option<ExpandedPythonScan>,
387
}
388
389
#[cfg(feature = "python")]
390
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
391
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
392
#[derive(Clone)]
393
pub struct ExpandedPythonScan {
394
pub name: PlSmallStr,
395
pub scan_fn: PythonObject,
396
pub variant: PythonScanSource,
397
}
398
399
impl ExpandedDataset {
400
#[cfg(feature = "python")]
401
pub fn python_scan(&self) -> Option<&ExpandedPythonScan> {
402
self.python_scan.as_ref()
403
}
404
}
405
406
impl Debug for ExpandedDataset {
407
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
408
let ExpandedDataset {
409
version,
410
limit,
411
projection,
412
live_filter_columns,
413
pyarrow_predicate,
414
expanded_dsl,
415
416
#[cfg(feature = "python")]
417
python_scan,
418
} = self;
419
420
return display::ExpandedDataset {
421
version,
422
limit,
423
projection,
424
live_filter_columns,
425
expanded_dsl: &match expanded_dsl.display() {
426
Ok(v) => v.to_string(),
427
Err(e) => e.to_string(),
428
},
429
pyarrow_predicate: if pyarrow_predicate.is_some() {
430
"Some(<redacted>)"
431
} else {
432
"None"
433
},
434
#[cfg(feature = "python")]
435
python_scan: python_scan.as_ref().map(
436
|ExpandedPythonScan {
437
name,
438
scan_fn: _,
439
variant,
440
}| {
441
format_pl_smallstr!("python-scan[{} @ {:?}]", name, variant)
442
},
443
),
444
}
445
.fmt(f);
446
447
mod display {
448
use std::fmt::Debug;
449
use std::sync::Arc;
450
451
use polars_utils::pl_str::PlSmallStr;
452
453
#[derive(Debug)]
454
#[expect(unused)]
455
pub struct ExpandedDataset<'a> {
456
pub version: &'a str,
457
pub limit: &'a Option<usize>,
458
pub projection: &'a Option<Arc<[PlSmallStr]>>,
459
pub live_filter_columns: &'a Option<Arc<[PlSmallStr]>>,
460
pub pyarrow_predicate: &'static str,
461
pub expanded_dsl: &'a str,
462
463
#[cfg(feature = "python")]
464
pub python_scan: Option<PlSmallStr>,
465
}
466
}
467
}
468
}
469
470