Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/dsl/builder_dsl.rs
6939 views
1
use std::sync::Arc;
2
3
use polars_core::prelude::*;
4
#[cfg(feature = "csv")]
5
use polars_io::csv::read::CsvReadOptions;
6
#[cfg(feature = "ipc")]
7
use polars_io::ipc::IpcScanOptions;
8
#[cfg(feature = "parquet")]
9
use polars_io::parquet::read::ParquetOptions;
10
use polars_utils::unique_id::UniqueId;
11
12
#[cfg(feature = "python")]
13
use crate::dsl::python_dsl::PythonFunction;
14
use crate::prelude::*;
15
16
pub struct DslBuilder(pub DslPlan);
17
18
impl From<DslPlan> for DslBuilder {
19
fn from(lp: DslPlan) -> Self {
20
DslBuilder(lp)
21
}
22
}
23
24
impl DslBuilder {
25
pub fn anonymous_scan(
26
function: Arc<dyn AnonymousScan>,
27
options: AnonymousScanOptions,
28
unified_scan_args: UnifiedScanArgs,
29
) -> PolarsResult<Self> {
30
let schema = unified_scan_args.schema.clone().ok_or_else(|| {
31
polars_err!(
32
ComputeError:
33
"anonymous scan requires schema to be specified in unified_scan_args"
34
)
35
})?;
36
37
Ok(DslPlan::Scan {
38
sources: ScanSources::default(),
39
unified_scan_args: Box::new(unified_scan_args),
40
scan_type: Box::new(FileScanDsl::Anonymous {
41
function,
42
options: Arc::new(options),
43
file_info: FileInfo {
44
schema: schema.clone(),
45
reader_schema: Some(either::Either::Right(schema)),
46
..Default::default()
47
},
48
}),
49
cached_ir: Default::default(),
50
}
51
.into())
52
}
53
54
#[cfg(feature = "parquet")]
55
#[allow(clippy::too_many_arguments)]
56
pub fn scan_parquet(
57
sources: ScanSources,
58
options: ParquetOptions,
59
unified_scan_args: UnifiedScanArgs,
60
) -> PolarsResult<Self> {
61
Ok(DslPlan::Scan {
62
sources,
63
unified_scan_args: Box::new(unified_scan_args),
64
scan_type: Box::new(FileScanDsl::Parquet { options }),
65
cached_ir: Default::default(),
66
}
67
.into())
68
}
69
70
#[cfg(feature = "ipc")]
71
#[allow(clippy::too_many_arguments)]
72
pub fn scan_ipc(
73
sources: ScanSources,
74
options: IpcScanOptions,
75
unified_scan_args: UnifiedScanArgs,
76
) -> PolarsResult<Self> {
77
Ok(DslPlan::Scan {
78
sources,
79
unified_scan_args: Box::new(unified_scan_args),
80
scan_type: Box::new(FileScanDsl::Ipc { options }),
81
cached_ir: Default::default(),
82
}
83
.into())
84
}
85
86
#[allow(clippy::too_many_arguments)]
87
#[cfg(feature = "csv")]
88
pub fn scan_csv(
89
sources: ScanSources,
90
options: CsvReadOptions,
91
unified_scan_args: UnifiedScanArgs,
92
) -> PolarsResult<Self> {
93
Ok(DslPlan::Scan {
94
sources,
95
unified_scan_args: Box::new(unified_scan_args),
96
scan_type: Box::new(FileScanDsl::Csv { options }),
97
cached_ir: Default::default(),
98
}
99
.into())
100
}
101
102
#[cfg(feature = "python")]
103
pub fn scan_python_dataset(
104
dataset_object: polars_utils::python_function::PythonObject,
105
) -> DslBuilder {
106
use super::python_dataset::PythonDatasetProvider;
107
108
DslPlan::Scan {
109
sources: ScanSources::default(),
110
unified_scan_args: Default::default(),
111
scan_type: Box::new(FileScanDsl::PythonDataset {
112
dataset_object: Arc::new(PythonDatasetProvider::new(dataset_object)),
113
}),
114
cached_ir: Default::default(),
115
}
116
.into()
117
}
118
119
pub fn cache(self) -> Self {
120
let input = Arc::new(self.0);
121
DslPlan::Cache {
122
input,
123
id: UniqueId::new(),
124
}
125
.into()
126
}
127
128
pub fn drop(self, columns: Selector) -> Self {
129
self.project(vec![Expr::Selector(!columns)], ProjectionOptions::default())
130
}
131
132
pub fn project(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
133
DslPlan::Select {
134
expr: exprs,
135
input: Arc::new(self.0),
136
options,
137
}
138
.into()
139
}
140
141
pub fn fill_null(self, fill_value: Expr) -> Self {
142
self.project(
143
vec![all().as_expr().fill_null(fill_value)],
144
ProjectionOptions {
145
duplicate_check: false,
146
..Default::default()
147
},
148
)
149
}
150
151
pub fn drop_nans(self, subset: Option<Selector>) -> Self {
152
let is_nan = subset
153
.unwrap_or(DataTypeSelector::Float.as_selector())
154
.as_expr()
155
.is_nan();
156
self.remove(any_horizontal([is_nan]).unwrap())
157
}
158
159
pub fn drop_nulls(self, subset: Option<Selector>) -> Self {
160
let is_not_null = subset.unwrap_or(Selector::Wildcard).as_expr().is_not_null();
161
self.filter(all_horizontal([is_not_null]).unwrap())
162
}
163
164
pub fn fill_nan(self, fill_value: Expr) -> Self {
165
self.map_private(DslFunction::FillNan(fill_value))
166
}
167
168
pub fn with_columns(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
169
if exprs.is_empty() {
170
return self;
171
}
172
173
DslPlan::HStack {
174
input: Arc::new(self.0),
175
exprs,
176
options,
177
}
178
.into()
179
}
180
181
pub fn match_to_schema(
182
self,
183
match_schema: SchemaRef,
184
per_column: Arc<[MatchToSchemaPerColumn]>,
185
extra_columns: ExtraColumnsPolicy,
186
) -> Self {
187
DslPlan::MatchToSchema {
188
input: Arc::new(self.0),
189
match_schema,
190
per_column,
191
extra_columns,
192
}
193
.into()
194
}
195
196
pub fn pipe_with_schema(self, callback: PlanCallback<(DslPlan, Schema), DslPlan>) -> Self {
197
DslPlan::PipeWithSchema {
198
input: Arc::new(self.0),
199
callback,
200
}
201
.into()
202
}
203
204
pub fn with_context(self, contexts: Vec<DslPlan>) -> Self {
205
DslPlan::ExtContext {
206
input: Arc::new(self.0),
207
contexts,
208
}
209
.into()
210
}
211
212
/// Apply a filter predicate, keeping the rows that match it.
213
pub fn filter(self, predicate: Expr) -> Self {
214
DslPlan::Filter {
215
predicate,
216
input: Arc::new(self.0),
217
}
218
.into()
219
}
220
221
/// Remove rows matching a filter predicate (note that rows
222
/// where the predicate resolves to `null` are *not* removed).
223
pub fn remove(self, predicate: Expr) -> Self {
224
DslPlan::Filter {
225
predicate: predicate.neq_missing(lit(true)),
226
input: Arc::new(self.0),
227
}
228
.into()
229
}
230
231
pub fn group_by<E: AsRef<[Expr]>>(
232
self,
233
keys: Vec<Expr>,
234
aggs: E,
235
apply: Option<(PlanCallback<DataFrame, DataFrame>, SchemaRef)>,
236
maintain_order: bool,
237
#[cfg(feature = "dynamic_group_by")] dynamic_options: Option<DynamicGroupOptions>,
238
#[cfg(feature = "dynamic_group_by")] rolling_options: Option<RollingGroupOptions>,
239
) -> Self {
240
let aggs = aggs.as_ref().to_vec();
241
let options = GroupbyOptions {
242
#[cfg(feature = "dynamic_group_by")]
243
dynamic: dynamic_options,
244
#[cfg(feature = "dynamic_group_by")]
245
rolling: rolling_options,
246
slice: None,
247
};
248
249
DslPlan::GroupBy {
250
input: Arc::new(self.0),
251
keys,
252
aggs,
253
apply,
254
maintain_order,
255
options: Arc::new(options),
256
}
257
.into()
258
}
259
260
pub fn build(self) -> DslPlan {
261
self.0
262
}
263
264
pub fn from_existing_df(df: DataFrame) -> Self {
265
let schema = df.schema().clone();
266
DslPlan::DataFrameScan {
267
df: Arc::new(df),
268
schema,
269
}
270
.into()
271
}
272
273
pub fn sort(self, by_column: Vec<Expr>, sort_options: SortMultipleOptions) -> Self {
274
DslPlan::Sort {
275
input: Arc::new(self.0),
276
by_column,
277
slice: None,
278
sort_options,
279
}
280
.into()
281
}
282
283
pub fn explode(self, columns: Selector, allow_empty: bool) -> Self {
284
DslPlan::MapFunction {
285
input: Arc::new(self.0),
286
function: DslFunction::Explode {
287
columns,
288
allow_empty,
289
},
290
}
291
.into()
292
}
293
294
#[cfg(feature = "pivot")]
295
pub fn unpivot(self, args: UnpivotArgsDSL) -> Self {
296
DslPlan::MapFunction {
297
input: Arc::new(self.0),
298
function: DslFunction::Unpivot { args },
299
}
300
.into()
301
}
302
303
pub fn row_index(self, name: PlSmallStr, offset: Option<IdxSize>) -> Self {
304
DslPlan::MapFunction {
305
input: Arc::new(self.0),
306
function: DslFunction::RowIndex { name, offset },
307
}
308
.into()
309
}
310
311
pub fn distinct(self, options: DistinctOptionsDSL) -> Self {
312
DslPlan::Distinct {
313
input: Arc::new(self.0),
314
options,
315
}
316
.into()
317
}
318
319
pub fn slice(self, offset: i64, len: IdxSize) -> Self {
320
DslPlan::Slice {
321
input: Arc::new(self.0),
322
offset,
323
len,
324
}
325
.into()
326
}
327
328
pub fn join(
329
self,
330
other: DslPlan,
331
left_on: Vec<Expr>,
332
right_on: Vec<Expr>,
333
options: Arc<JoinOptions>,
334
) -> Self {
335
DslPlan::Join {
336
input_left: Arc::new(self.0),
337
input_right: Arc::new(other),
338
left_on,
339
right_on,
340
predicates: Default::default(),
341
options,
342
}
343
.into()
344
}
345
pub fn map_private(self, function: DslFunction) -> Self {
346
DslPlan::MapFunction {
347
input: Arc::new(self.0),
348
function,
349
}
350
.into()
351
}
352
353
#[cfg(feature = "python")]
354
pub fn map_python(
355
self,
356
function: PythonFunction,
357
optimizations: AllowedOptimizations,
358
schema: Option<SchemaRef>,
359
validate_output: bool,
360
) -> Self {
361
DslPlan::MapFunction {
362
input: Arc::new(self.0),
363
function: DslFunction::OpaquePython(OpaquePythonUdf {
364
function,
365
schema,
366
predicate_pd: optimizations.contains(OptFlags::PREDICATE_PUSHDOWN),
367
projection_pd: optimizations.contains(OptFlags::PROJECTION_PUSHDOWN),
368
streamable: optimizations.contains(OptFlags::NEW_STREAMING),
369
validate_output,
370
}),
371
}
372
.into()
373
}
374
375
pub fn map<F>(
376
self,
377
function: F,
378
optimizations: AllowedOptimizations,
379
schema: Option<Arc<dyn UdfSchema>>,
380
name: PlSmallStr,
381
) -> Self
382
where
383
F: DataFrameUdf + 'static,
384
{
385
let function = Arc::new(function);
386
387
DslPlan::MapFunction {
388
input: Arc::new(self.0),
389
function: DslFunction::FunctionIR(FunctionIR::Opaque {
390
function,
391
schema,
392
predicate_pd: optimizations.contains(OptFlags::PREDICATE_PUSHDOWN),
393
projection_pd: optimizations.contains(OptFlags::PROJECTION_PUSHDOWN),
394
streamable: optimizations.contains(OptFlags::NEW_STREAMING),
395
fmt_str: name,
396
}),
397
}
398
.into()
399
}
400
}
401
402