Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/dsl/builder_dsl.rs
8431 views
1
use std::sync::Arc;
2
3
use polars_core::prelude::*;
4
#[cfg(feature = "csv")]
5
use polars_io::csv::read::CsvReadOptions;
6
#[cfg(feature = "ipc")]
7
use polars_io::ipc::IpcScanOptions;
8
#[cfg(feature = "parquet")]
9
use polars_io::parquet::read::ParquetOptions;
10
use polars_utils::unique_id::UniqueId;
11
12
use crate::dsl::functions::lit;
13
#[cfg(feature = "python")]
14
use crate::dsl::python_dsl::PythonFunction;
15
use crate::prelude::*;
16
pub struct DslBuilder(pub DslPlan);
17
18
impl From<DslPlan> for DslBuilder {
19
fn from(lp: DslPlan) -> Self {
20
DslBuilder(lp)
21
}
22
}
23
24
impl DslBuilder {
25
pub fn anonymous_scan(
26
function: Arc<dyn AnonymousScan>,
27
options: AnonymousScanOptions,
28
unified_scan_args: UnifiedScanArgs,
29
) -> PolarsResult<Self> {
30
let schema = unified_scan_args.schema.clone().ok_or_else(|| {
31
polars_err!(
32
ComputeError:
33
"anonymous scan requires schema to be specified in unified_scan_args"
34
)
35
})?;
36
37
Ok(DslPlan::Scan {
38
sources: ScanSources::default(),
39
unified_scan_args: Box::new(unified_scan_args),
40
scan_type: Box::new(FileScanDsl::Anonymous {
41
function,
42
options: Arc::new(options),
43
file_info: FileInfo {
44
schema: schema.clone(),
45
reader_schema: Some(either::Either::Right(schema)),
46
..Default::default()
47
},
48
}),
49
cached_ir: Default::default(),
50
}
51
.into())
52
}
53
54
#[cfg(feature = "parquet")]
55
pub fn scan_parquet(
56
sources: ScanSources,
57
options: ParquetOptions,
58
unified_scan_args: UnifiedScanArgs,
59
) -> PolarsResult<Self> {
60
Ok(DslPlan::Scan {
61
sources,
62
unified_scan_args: Box::new(unified_scan_args),
63
scan_type: Box::new(FileScanDsl::Parquet { options }),
64
cached_ir: Default::default(),
65
}
66
.into())
67
}
68
69
#[cfg(feature = "ipc")]
70
pub fn scan_ipc(
71
sources: ScanSources,
72
options: IpcScanOptions,
73
unified_scan_args: UnifiedScanArgs,
74
) -> PolarsResult<Self> {
75
Ok(DslPlan::Scan {
76
sources,
77
unified_scan_args: Box::new(unified_scan_args),
78
scan_type: Box::new(FileScanDsl::Ipc { options }),
79
cached_ir: Default::default(),
80
}
81
.into())
82
}
83
84
#[cfg(feature = "scan_lines")]
85
pub fn scan_lines(
86
sources: ScanSources,
87
unified_scan_args: UnifiedScanArgs,
88
name: PlSmallStr,
89
) -> PolarsResult<Self> {
90
Ok(DslPlan::Scan {
91
sources,
92
unified_scan_args: Box::new(unified_scan_args),
93
scan_type: Box::new(FileScanDsl::Lines { name }),
94
cached_ir: Default::default(),
95
}
96
.into())
97
}
98
99
#[allow(clippy::too_many_arguments)]
100
#[cfg(feature = "csv")]
101
pub fn scan_csv(
102
sources: ScanSources,
103
options: impl Into<Arc<CsvReadOptions>>,
104
unified_scan_args: UnifiedScanArgs,
105
) -> PolarsResult<Self> {
106
Ok(DslPlan::Scan {
107
sources,
108
unified_scan_args: Box::new(unified_scan_args),
109
scan_type: Box::new(FileScanDsl::Csv {
110
options: options.into(),
111
}),
112
cached_ir: Default::default(),
113
}
114
.into())
115
}
116
117
#[cfg(feature = "python")]
118
pub fn scan_python_dataset(
119
dataset_object: polars_utils::python_function::PythonObject,
120
) -> DslBuilder {
121
use super::python_dataset::PythonDatasetProvider;
122
123
DslPlan::Scan {
124
sources: ScanSources::default(),
125
unified_scan_args: Default::default(),
126
scan_type: Box::new(FileScanDsl::PythonDataset {
127
dataset_object: Arc::new(PythonDatasetProvider::new(dataset_object)),
128
}),
129
cached_ir: Default::default(),
130
}
131
.into()
132
}
133
134
pub fn cache(self) -> Self {
135
let input = Arc::new(self.0);
136
DslPlan::Cache {
137
input,
138
id: UniqueId::new(),
139
}
140
.into()
141
}
142
143
pub fn drop(self, columns: Selector) -> Self {
144
self.project(vec![Expr::Selector(!columns)], ProjectionOptions::default())
145
}
146
147
pub fn project(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
148
DslPlan::Select {
149
expr: exprs,
150
input: Arc::new(self.0),
151
options,
152
}
153
.into()
154
}
155
156
pub fn fill_null(self, fill_value: Expr) -> Self {
157
self.project(
158
vec![functions::all().as_expr().fill_null(fill_value)],
159
ProjectionOptions {
160
duplicate_check: false,
161
..Default::default()
162
},
163
)
164
}
165
166
pub fn drop_nans(self, subset: Option<Selector>) -> Self {
167
let is_nan = subset
168
.unwrap_or(DataTypeSelector::Float.as_selector())
169
.as_expr()
170
.is_nan();
171
self.remove(functions::any_horizontal([is_nan]).unwrap())
172
}
173
174
pub fn drop_nulls(self, subset: Option<Selector>) -> Self {
175
let is_not_null = subset.unwrap_or(Selector::Wildcard).as_expr().is_not_null();
176
self.filter(functions::all_horizontal([is_not_null]).unwrap())
177
}
178
179
pub fn fill_nan(self, fill_value: Expr) -> Self {
180
self.map_private(DslFunction::FillNan(fill_value))
181
}
182
183
pub fn with_columns(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
184
if exprs.is_empty() {
185
return self;
186
}
187
188
DslPlan::HStack {
189
input: Arc::new(self.0),
190
exprs,
191
options,
192
}
193
.into()
194
}
195
196
pub fn match_to_schema(
197
self,
198
match_schema: SchemaRef,
199
per_column: Arc<[MatchToSchemaPerColumn]>,
200
extra_columns: ExtraColumnsPolicy,
201
) -> Self {
202
DslPlan::MatchToSchema {
203
input: Arc::new(self.0),
204
match_schema,
205
per_column,
206
extra_columns,
207
}
208
.into()
209
}
210
211
pub fn pipe_with_schema(
212
self,
213
others: Vec<DslPlan>,
214
callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
215
) -> Self {
216
let mut input = vec![self.0];
217
input.extend(others);
218
DslPlan::PipeWithSchema {
219
input: Arc::from(input),
220
callback,
221
}
222
.into()
223
}
224
225
pub fn with_context(self, contexts: Vec<DslPlan>) -> Self {
226
DslPlan::ExtContext {
227
input: Arc::new(self.0),
228
contexts,
229
}
230
.into()
231
}
232
233
/// Apply a filter predicate, keeping the rows that match it.
234
pub fn filter(self, predicate: Expr) -> Self {
235
DslPlan::Filter {
236
predicate,
237
input: Arc::new(self.0),
238
}
239
.into()
240
}
241
242
/// Remove rows matching a filter predicate (note that rows
243
/// where the predicate resolves to `null` are *not* removed).
244
pub fn remove(self, predicate: Expr) -> Self {
245
DslPlan::Filter {
246
predicate: predicate.neq_missing(lit(true)),
247
input: Arc::new(self.0),
248
}
249
.into()
250
}
251
252
#[allow(clippy::too_many_arguments)]
253
pub fn group_by<E: AsRef<[Expr]>>(
254
self,
255
keys: Vec<Expr>,
256
predicates: Vec<Expr>,
257
aggs: E,
258
apply: Option<(PlanCallback<DataFrame, DataFrame>, SchemaRef)>,
259
maintain_order: bool,
260
#[cfg(feature = "dynamic_group_by")] dynamic_options: Option<DynamicGroupOptions>,
261
#[cfg(feature = "dynamic_group_by")] rolling_options: Option<RollingGroupOptions>,
262
) -> Self {
263
let aggs = aggs.as_ref().to_vec();
264
let options = GroupbyOptions {
265
#[cfg(feature = "dynamic_group_by")]
266
dynamic: dynamic_options,
267
#[cfg(feature = "dynamic_group_by")]
268
rolling: rolling_options,
269
slice: None,
270
};
271
272
DslPlan::GroupBy {
273
input: Arc::new(self.0),
274
keys,
275
predicates,
276
aggs,
277
apply,
278
maintain_order,
279
options: Arc::new(options),
280
}
281
.into()
282
}
283
284
pub fn build(self) -> DslPlan {
285
self.0
286
}
287
288
pub fn from_existing_df(df: DataFrame) -> Self {
289
let schema = df.schema().clone();
290
DslPlan::DataFrameScan {
291
df: Arc::new(df),
292
schema,
293
}
294
.into()
295
}
296
297
pub fn sort(self, by_column: Vec<Expr>, sort_options: SortMultipleOptions) -> Self {
298
DslPlan::Sort {
299
input: Arc::new(self.0),
300
by_column,
301
slice: None,
302
sort_options,
303
}
304
.into()
305
}
306
307
pub fn explode(self, columns: Selector, options: ExplodeOptions, allow_empty: bool) -> Self {
308
DslPlan::MapFunction {
309
input: Arc::new(self.0),
310
function: DslFunction::Explode {
311
columns,
312
options,
313
allow_empty,
314
},
315
}
316
.into()
317
}
318
319
#[cfg(feature = "pivot")]
320
#[expect(clippy::too_many_arguments)]
321
pub fn pivot(
322
self,
323
on: Selector,
324
on_columns: Arc<DataFrame>,
325
index: Selector,
326
values: Selector,
327
agg: Expr,
328
maintain_order: bool,
329
separator: PlSmallStr,
330
) -> Self {
331
DslPlan::Pivot {
332
input: Arc::new(self.0),
333
on,
334
on_columns,
335
index,
336
values,
337
agg,
338
maintain_order,
339
separator,
340
}
341
.into()
342
}
343
344
#[cfg(feature = "pivot")]
345
pub fn unpivot(self, args: UnpivotArgsDSL) -> Self {
346
DslPlan::MapFunction {
347
input: Arc::new(self.0),
348
function: DslFunction::Unpivot { args },
349
}
350
.into()
351
}
352
353
pub fn row_index(self, name: PlSmallStr, offset: Option<IdxSize>) -> Self {
354
DslPlan::MapFunction {
355
input: Arc::new(self.0),
356
function: DslFunction::RowIndex { name, offset },
357
}
358
.into()
359
}
360
361
pub fn distinct(self, options: DistinctOptionsDSL) -> Self {
362
DslPlan::Distinct {
363
input: Arc::new(self.0),
364
options,
365
}
366
.into()
367
}
368
369
pub fn slice(self, offset: i64, len: IdxSize) -> Self {
370
DslPlan::Slice {
371
input: Arc::new(self.0),
372
offset,
373
len,
374
}
375
.into()
376
}
377
378
pub fn join(
379
self,
380
other: DslPlan,
381
left_on: Vec<Expr>,
382
right_on: Vec<Expr>,
383
options: Arc<JoinOptions>,
384
) -> Self {
385
DslPlan::Join {
386
input_left: Arc::new(self.0),
387
input_right: Arc::new(other),
388
left_on,
389
right_on,
390
predicates: Default::default(),
391
options,
392
}
393
.into()
394
}
395
pub fn map_private(self, function: DslFunction) -> Self {
396
DslPlan::MapFunction {
397
input: Arc::new(self.0),
398
function,
399
}
400
.into()
401
}
402
403
#[cfg(feature = "python")]
404
pub fn map_python(
405
self,
406
function: PythonFunction,
407
optimizations: AllowedOptimizations,
408
schema: Option<SchemaRef>,
409
validate_output: bool,
410
) -> Self {
411
DslPlan::MapFunction {
412
input: Arc::new(self.0),
413
function: DslFunction::OpaquePython(OpaquePythonUdf {
414
function,
415
schema,
416
predicate_pd: optimizations.contains(OptFlags::PREDICATE_PUSHDOWN),
417
projection_pd: optimizations.contains(OptFlags::PROJECTION_PUSHDOWN),
418
streamable: optimizations.contains(OptFlags::NEW_STREAMING),
419
validate_output,
420
}),
421
}
422
.into()
423
}
424
425
pub fn map<F>(
426
self,
427
function: F,
428
optimizations: AllowedOptimizations,
429
schema: Option<Arc<dyn UdfSchema>>,
430
name: PlSmallStr,
431
) -> Self
432
where
433
F: DataFrameUdf + 'static,
434
{
435
let function = Arc::new(function);
436
437
DslPlan::MapFunction {
438
input: Arc::new(self.0),
439
function: DslFunction::FunctionIR(FunctionIR::Opaque {
440
function,
441
schema,
442
predicate_pd: optimizations.contains(OptFlags::PREDICATE_PUSHDOWN),
443
projection_pd: optimizations.contains(OptFlags::PROJECTION_PUSHDOWN),
444
streamable: optimizations.contains(OptFlags::NEW_STREAMING),
445
fmt_str: name,
446
}),
447
}
448
.into()
449
}
450
}
451
452