Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/optimizer/expand_datasets.rs
6940 views
1
use std::fmt::Debug;
2
use std::sync::Arc;
3
4
use polars_core::config;
5
use polars_core::error::{PolarsResult, polars_bail};
6
use polars_utils::arena::{Arena, Node};
7
use polars_utils::format_pl_smallstr;
8
use polars_utils::pl_str::PlSmallStr;
9
#[cfg(feature = "python")]
10
use polars_utils::python_function::PythonObject;
11
use polars_utils::slice_enum::Slice;
12
13
use super::OptimizationRule;
14
#[cfg(feature = "python")]
15
use crate::dsl::python_dsl::PythonScanSource;
16
use crate::dsl::{DslPlan, FileScanIR, UnifiedScanArgs};
17
use crate::plans::IR;
18
19
/// Note: Currently only used for iceberg. This is so that we can call iceberg to fetch the files
20
/// list with a potential row limit from slice pushdown.
21
///
22
/// In the future this can also apply to hive path expansion with predicates.
23
pub(super) struct ExpandDatasets;
24
25
impl OptimizationRule for ExpandDatasets {
26
fn optimize_plan(
27
&mut self,
28
lp_arena: &mut Arena<IR>,
29
_expr_arena: &mut Arena<crate::prelude::AExpr>,
30
node: Node,
31
) -> PolarsResult<Option<IR>> {
32
// # Note
33
// This function mutates the IR node in-place rather than returning the new IR - the
34
// StackOptimizer will re-call this function otherwise.
35
if let IR::Scan {
36
sources,
37
scan_type,
38
unified_scan_args,
39
40
file_info: _,
41
hive_parts: _,
42
predicate: _,
43
output_schema: _,
44
} = lp_arena.get_mut(node)
45
{
46
let projection = unified_scan_args.projection.clone();
47
let limit = match unified_scan_args.pre_slice.clone() {
48
Some(v @ Slice::Positive { .. }) => Some(v.end_position()),
49
_ => None,
50
};
51
52
match scan_type.as_mut() {
53
#[cfg(feature = "python")]
54
FileScanIR::PythonDataset {
55
dataset_object,
56
cached_ir,
57
} => {
58
let cached_ir = cached_ir.clone();
59
let mut guard = cached_ir.lock().unwrap();
60
61
if config::verbose() {
62
eprintln!(
63
"expand_datasets(): python[{}]: limit: {:?}, project: {}",
64
dataset_object.name(),
65
limit,
66
projection.as_ref().map_or(
67
PlSmallStr::from_static("all"),
68
|x| format_pl_smallstr!("{}", x.len())
69
)
70
)
71
}
72
73
let existing_resolved_version_key = match guard.as_ref() {
74
Some(resolved) => {
75
let ExpandedDataset {
76
version,
77
limit: cached_limit,
78
projection: cached_projection,
79
expanded_dsl: _,
80
python_scan: _,
81
} = resolved;
82
83
(cached_limit == &limit && cached_projection == &projection)
84
.then_some(version.as_str())
85
},
86
87
None => None,
88
};
89
90
if let Some((expanded_dsl, version)) = dataset_object.to_dataset_scan(
91
existing_resolved_version_key,
92
limit,
93
projection.as_deref(),
94
)? {
95
*guard = Some(ExpandedDataset {
96
version,
97
limit,
98
projection,
99
expanded_dsl,
100
python_scan: None,
101
})
102
}
103
104
let ExpandedDataset {
105
version: _,
106
limit: _,
107
projection: _,
108
expanded_dsl,
109
python_scan,
110
} = guard.as_mut().unwrap();
111
112
match expanded_dsl {
113
DslPlan::Scan {
114
sources: resolved_sources,
115
unified_scan_args: resolved_unified_scan_args,
116
scan_type: resolved_scan_type,
117
cached_ir: _,
118
} => {
119
use crate::dsl::FileScanDsl;
120
121
// We only want a few configuration flags from here (e.g. column casting config).
122
// The rest we either expect to be None (e.g. projection / row_index), or ignore.
123
let UnifiedScanArgs {
124
schema: _,
125
cloud_options,
126
hive_options: _,
127
rechunk,
128
cache,
129
glob: _,
130
projection: _projection @ None,
131
column_mapping,
132
default_values,
133
row_index: _row_index @ None,
134
pre_slice: _pre_slice @ None,
135
cast_columns_policy,
136
missing_columns_policy,
137
extra_columns_policy,
138
include_file_paths: _include_file_paths @ None,
139
deletion_files,
140
} = resolved_unified_scan_args.as_ref()
141
else {
142
panic!(
143
"invalid scan args from python dataset resolve: {:?}",
144
&resolved_unified_scan_args
145
)
146
};
147
148
unified_scan_args.cloud_options = cloud_options.clone();
149
unified_scan_args.rechunk = *rechunk;
150
unified_scan_args.cache = *cache;
151
unified_scan_args.cast_columns_policy = cast_columns_policy.clone();
152
unified_scan_args.missing_columns_policy = *missing_columns_policy;
153
unified_scan_args.extra_columns_policy = *extra_columns_policy;
154
unified_scan_args.column_mapping = column_mapping.clone();
155
unified_scan_args.default_values = default_values.clone();
156
unified_scan_args.deletion_files = deletion_files.clone();
157
158
*sources = resolved_sources.clone();
159
160
*scan_type = Box::new(match *resolved_scan_type.clone() {
161
#[cfg(feature = "csv")]
162
FileScanDsl::Csv { options } => FileScanIR::Csv { options },
163
164
#[cfg(feature = "ipc")]
165
FileScanDsl::Ipc { options } => FileScanIR::Ipc {
166
options,
167
metadata: None,
168
},
169
170
#[cfg(feature = "parquet")]
171
FileScanDsl::Parquet { options } => FileScanIR::Parquet {
172
options,
173
metadata: None,
174
},
175
176
#[cfg(feature = "json")]
177
FileScanDsl::NDJson { options } => FileScanIR::NDJson { options },
178
179
#[cfg(feature = "python")]
180
FileScanDsl::PythonDataset { dataset_object } => {
181
FileScanIR::PythonDataset {
182
dataset_object,
183
cached_ir: Default::default(),
184
}
185
},
186
187
FileScanDsl::Anonymous {
188
options,
189
function,
190
file_info: _,
191
} => FileScanIR::Anonymous { options, function },
192
});
193
},
194
195
DslPlan::PythonScan { options } => {
196
*python_scan = Some(ExpandedPythonScan {
197
name: dataset_object.name(),
198
scan_fn: options.scan_fn.clone().unwrap(),
199
variant: options.python_source.clone(),
200
})
201
},
202
203
dsl => {
204
polars_bail!(
205
ComputeError:
206
"unknown DSL when resolving python dataset scan: {}",
207
dsl.display()?
208
)
209
},
210
};
211
},
212
213
_ => {},
214
}
215
}
216
217
Ok(None)
218
}
219
}
220
221
#[derive(Clone)]
222
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
223
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
224
pub struct ExpandedDataset {
225
version: PlSmallStr,
226
limit: Option<usize>,
227
projection: Option<Arc<[PlSmallStr]>>,
228
expanded_dsl: DslPlan,
229
230
/// Fallback python scan
231
#[cfg(feature = "python")]
232
python_scan: Option<ExpandedPythonScan>,
233
}
234
235
#[cfg(feature = "python")]
236
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
237
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
238
#[derive(Clone)]
239
pub struct ExpandedPythonScan {
240
pub name: PlSmallStr,
241
pub scan_fn: PythonObject,
242
pub variant: PythonScanSource,
243
}
244
245
impl ExpandedDataset {
246
#[cfg(feature = "python")]
247
pub fn python_scan(&self) -> Option<&ExpandedPythonScan> {
248
self.python_scan.as_ref()
249
}
250
}
251
252
impl Debug for ExpandedDataset {
253
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254
let ExpandedDataset {
255
version,
256
limit,
257
projection,
258
expanded_dsl,
259
260
#[cfg(feature = "python")]
261
python_scan,
262
} = self;
263
264
return display::ExpandedDataset {
265
version,
266
limit,
267
projection,
268
expanded_dsl: &match expanded_dsl.display() {
269
Ok(v) => v.to_string(),
270
Err(e) => e.to_string(),
271
},
272
#[cfg(feature = "python")]
273
python_scan: python_scan.as_ref().map(
274
|ExpandedPythonScan {
275
name,
276
scan_fn: _,
277
variant,
278
}| {
279
format_pl_smallstr!("python-scan[{} @ {:?}]", name, variant)
280
},
281
),
282
}
283
.fmt(f);
284
285
mod display {
286
use std::fmt::Debug;
287
use std::sync::Arc;
288
289
use polars_utils::pl_str::PlSmallStr;
290
291
#[derive(Debug)]
292
#[expect(unused)]
293
pub struct ExpandedDataset<'a> {
294
pub version: &'a str,
295
pub limit: &'a Option<usize>,
296
pub projection: &'a Option<Arc<[PlSmallStr]>>,
297
pub expanded_dsl: &'a str,
298
299
#[cfg(feature = "python")]
300
pub python_scan: Option<PlSmallStr>,
301
}
302
}
303
}
304
}
305
306