Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/dsl/options/mod.rs
8458 views
1
use std::hash::Hash;
2
#[cfg(feature = "json")]
3
use std::num::NonZeroUsize;
4
use std::sync::Arc;
5
6
pub mod file_provider;
7
pub mod sink;
8
pub use polars_config::Engine;
9
use polars_core::error::PolarsResult;
10
use polars_core::prelude::*;
11
#[cfg(feature = "csv")]
12
use polars_io::csv::write::CsvWriterOptions;
13
#[cfg(feature = "ipc")]
14
use polars_io::ipc::IpcWriterOptions;
15
#[cfg(feature = "json")]
16
use polars_io::ndjson::NDJsonWriterOptions;
17
#[cfg(feature = "parquet")]
18
use polars_io::parquet::write::ParquetWriteOptions;
19
#[cfg(feature = "iejoin")]
20
use polars_ops::frame::IEJoinOptions;
21
use polars_ops::frame::{CrossJoinFilter, CrossJoinOptions, JoinTypeOptions};
22
use polars_ops::prelude::{JoinArgs, JoinType};
23
#[cfg(feature = "dynamic_group_by")]
24
use polars_time::DynamicGroupOptions;
25
#[cfg(feature = "dynamic_group_by")]
26
use polars_time::RollingGroupOptions;
27
use polars_utils::IdxSize;
28
use polars_utils::pl_str::PlSmallStr;
29
#[cfg(feature = "serde")]
30
use serde::{Deserialize, Serialize};
31
pub use sink::{
32
CallbackSinkType, FileSinkOptions, PartitionStrategy, PartitionStrategyIR,
33
PartitionedSinkOptions, PartitionedSinkOptionsIR, SinkDestination, SinkTarget, SinkType,
34
SinkTypeIR, UnifiedSinkArgs,
35
};
36
use strum_macros::IntoStaticStr;
37
38
use super::Expr;
39
use crate::dsl::Selector;
40
use crate::plans::ExprIR;
41
42
#[derive(Copy, Clone, PartialEq, Debug, Eq, Hash)]
43
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
44
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
45
pub struct RollingCovOptions {
46
pub window_size: IdxSize,
47
pub min_periods: IdxSize,
48
pub ddof: u8,
49
}
50
51
#[derive(Clone, PartialEq, Debug, Eq, Hash)]
52
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
53
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
54
pub struct StrptimeOptions {
55
/// Formatting string
56
pub format: Option<PlSmallStr>,
57
/// If set then polars will return an error if any date parsing fails
58
pub strict: bool,
59
/// If polars may parse matches that not contain the whole string
60
/// e.g. "foo-2021-01-01-bar" could match "2021-01-01"
61
pub exact: bool,
62
/// use a cache of unique, converted dates to apply the datetime conversion.
63
pub cache: bool,
64
}
65
66
impl Default for StrptimeOptions {
67
fn default() -> Self {
68
StrptimeOptions {
69
format: None,
70
strict: true,
71
exact: true,
72
cache: true,
73
}
74
}
75
}
76
77
#[derive(Clone, PartialEq, Eq, IntoStaticStr, Debug)]
78
#[cfg_attr(feature = "ir_serde", derive(Serialize, Deserialize))]
79
#[strum(serialize_all = "snake_case")]
80
pub enum JoinTypeOptionsIR {
81
#[cfg(feature = "iejoin")]
82
IEJoin(IEJoinOptions),
83
// Fused cross join and filter (only used in the in-memory engine)
84
CrossAndFilter {
85
predicate: ExprIR, // Must be elementwise.
86
},
87
}
88
89
impl Hash for JoinTypeOptionsIR {
90
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
91
use JoinTypeOptionsIR::*;
92
match self {
93
#[cfg(feature = "iejoin")]
94
IEJoin(opt) => opt.hash(state),
95
CrossAndFilter { predicate } => {
96
predicate.node().hash(state);
97
},
98
}
99
}
100
}
101
102
impl JoinTypeOptionsIR {
103
pub fn compile<C: FnOnce(&ExprIR) -> PolarsResult<Arc<dyn CrossJoinFilter>>>(
104
self,
105
plan: C,
106
) -> PolarsResult<JoinTypeOptions> {
107
use JoinTypeOptionsIR::*;
108
match self {
109
CrossAndFilter { predicate } => {
110
let predicate = plan(&predicate)?;
111
112
Ok(JoinTypeOptions::Cross(CrossJoinOptions { predicate }))
113
},
114
#[cfg(feature = "iejoin")]
115
IEJoin(opt) => Ok(JoinTypeOptions::IEJoin(opt)),
116
}
117
}
118
}
119
120
#[derive(Clone, Debug, PartialEq, Hash)]
121
#[cfg_attr(feature = "ir_serde", derive(Serialize, Deserialize))]
122
pub struct JoinOptionsIR {
123
pub allow_parallel: bool,
124
pub force_parallel: bool,
125
pub args: JoinArgs,
126
pub options: Option<JoinTypeOptionsIR>,
127
}
128
129
impl From<JoinOptions> for JoinOptionsIR {
130
fn from(opts: JoinOptions) -> Self {
131
Self {
132
allow_parallel: opts.allow_parallel,
133
force_parallel: opts.force_parallel,
134
args: opts.args,
135
options: Default::default(),
136
}
137
}
138
}
139
140
#[derive(Clone, Debug, PartialEq, Hash)]
141
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
142
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
143
pub struct JoinOptions {
144
pub allow_parallel: bool,
145
pub force_parallel: bool,
146
pub args: JoinArgs,
147
}
148
149
impl Default for JoinOptions {
150
fn default() -> Self {
151
Self {
152
allow_parallel: true,
153
force_parallel: false,
154
// Todo!: make default
155
args: JoinArgs::new(JoinType::Left),
156
}
157
}
158
}
159
160
impl From<JoinOptionsIR> for JoinOptions {
161
fn from(opts: JoinOptionsIR) -> Self {
162
Self {
163
allow_parallel: opts.allow_parallel,
164
force_parallel: opts.force_parallel,
165
args: opts.args,
166
}
167
}
168
}
169
170
#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Hash, IntoStaticStr)]
171
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
172
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
173
#[strum(serialize_all = "snake_case")]
174
pub enum WindowMapping {
175
/// Map the group values to the position
176
#[default]
177
GroupsToRows,
178
/// Explode the aggregated list and just do a hstack instead of a join
179
/// this requires the groups to be sorted to make any sense
180
Explode,
181
/// Join the groups as 'List<group_dtype>' to the row positions.
182
/// warning: this can be memory intensive
183
Join,
184
}
185
186
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
187
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
188
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
189
pub struct UnpivotArgsDSL {
190
pub on: Option<Selector>,
191
pub index: Selector,
192
pub variable_name: Option<PlSmallStr>,
193
pub value_name: Option<PlSmallStr>,
194
}
195
196
#[derive(Clone, Debug, Copy, Eq, PartialEq, Hash)]
197
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
198
pub struct UnionOptions {
199
pub slice: Option<(i64, usize)>,
200
// known row_output, estimated row output
201
pub rows: (Option<usize>, usize),
202
pub parallel: bool,
203
pub from_partitioned_ds: bool,
204
pub flattened_by_opt: bool,
205
pub rechunk: bool,
206
pub maintain_order: bool,
207
}
208
209
impl Default for UnionOptions {
210
fn default() -> Self {
211
Self {
212
slice: None,
213
rows: (None, 0),
214
parallel: true,
215
from_partitioned_ds: false,
216
flattened_by_opt: false,
217
rechunk: false,
218
maintain_order: true,
219
}
220
}
221
}
222
223
#[derive(Clone, Debug, Copy, Eq, PartialEq, Hash)]
224
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
225
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
226
pub struct HConcatOptions {
227
pub parallel: bool,
228
pub strict: bool,
229
// Treat unit values as scalar.
230
// E.g. broadcast them instead of fill nulls.
231
pub broadcast_unit_length: bool,
232
}
233
234
impl Default for HConcatOptions {
235
fn default() -> Self {
236
Self {
237
parallel: true,
238
strict: false,
239
broadcast_unit_length: false,
240
}
241
}
242
}
243
244
#[derive(Clone, Debug, PartialEq, Eq, Default, Hash)]
245
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
246
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
247
pub struct GroupbyOptions {
248
#[cfg(feature = "dynamic_group_by")]
249
pub dynamic: Option<DynamicGroupOptions>,
250
#[cfg(feature = "dynamic_group_by")]
251
pub rolling: Option<RollingGroupOptions>,
252
/// Take only a slice of the result
253
pub slice: Option<(i64, usize)>,
254
}
255
256
impl GroupbyOptions {
257
pub fn is_rolling(&self) -> bool {
258
#[cfg(feature = "dynamic_group_by")]
259
{
260
self.rolling.is_some()
261
}
262
#[cfg(not(feature = "dynamic_group_by"))]
263
{
264
false
265
}
266
}
267
268
pub fn is_dynamic(&self) -> bool {
269
#[cfg(feature = "dynamic_group_by")]
270
{
271
self.dynamic.is_some()
272
}
273
#[cfg(not(feature = "dynamic_group_by"))]
274
{
275
false
276
}
277
}
278
}
279
280
#[derive(Clone, Debug, Eq, PartialEq, Default, Hash)]
281
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
282
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
283
pub struct DistinctOptionsDSL {
284
/// Subset of columns/expressions that will be taken into account.
285
pub subset: Option<Vec<Expr>>,
286
/// This will maintain the order of the input.
287
/// Note that this is more expensive.
288
/// `maintain_order` is not supported in the streaming
289
/// engine.
290
pub maintain_order: bool,
291
/// Which rows to keep.
292
pub keep_strategy: UniqueKeepStrategy,
293
}
294
295
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
296
pub struct LogicalPlanUdfOptions {
297
/// allow predicate pushdown optimizations
298
pub predicate_pd: bool,
299
/// allow projection pushdown optimizations
300
pub projection_pd: bool,
301
// used for formatting
302
pub fmt_str: &'static str,
303
}
304
305
#[derive(Clone, PartialEq, Eq, Debug, Default, Hash)]
306
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
307
pub struct AnonymousScanOptions {
308
pub skip_rows: Option<usize>,
309
pub fmt_str: &'static str,
310
}
311
312
const _: () = {
313
assert!(std::mem::size_of::<FileWriteFormat>() <= 50);
314
};
315
316
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
317
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
318
#[derive(Clone, Debug, PartialEq, Eq, Hash, strum_macros::IntoStaticStr)]
319
pub enum FileWriteFormat {
320
#[cfg(feature = "parquet")]
321
Parquet(Arc<ParquetWriteOptions>),
322
#[cfg(feature = "ipc")]
323
Ipc(IpcWriterOptions),
324
#[cfg(feature = "csv")]
325
Csv(CsvWriterOptions),
326
#[cfg(feature = "json")]
327
NDJson(NDJsonWriterOptions),
328
}
329
330
impl FileWriteFormat {
331
pub fn extension(&self) -> &'static str {
332
match self {
333
#[cfg(feature = "parquet")]
334
Self::Parquet(_) => "parquet",
335
#[cfg(feature = "ipc")]
336
Self::Ipc(_) => "ipc",
337
#[cfg(feature = "csv")]
338
Self::Csv(_) => "csv",
339
#[cfg(feature = "json")]
340
Self::NDJson(_) => "jsonl",
341
342
#[allow(unreachable_patterns)]
343
_ => unreachable!("enable file type features"),
344
}
345
}
346
}
347
348
//
349
// Arguments given to `concat`. Differs from `UnionOptions` as the latter is IR state.
350
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
351
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
352
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
353
pub struct UnionArgs {
354
pub parallel: bool,
355
pub rechunk: bool,
356
pub to_supertypes: bool,
357
pub diagonal: bool,
358
pub strict: bool,
359
// If it is a union from a scan over multiple files.
360
pub from_partitioned_ds: bool,
361
pub maintain_order: bool,
362
}
363
364
impl Default for UnionArgs {
365
fn default() -> Self {
366
Self {
367
parallel: true,
368
rechunk: false,
369
to_supertypes: false,
370
diagonal: false,
371
// By default, strict should be true in v2.0.0
372
strict: false,
373
from_partitioned_ds: false,
374
maintain_order: true,
375
}
376
}
377
}
378
379
impl From<UnionArgs> for UnionOptions {
380
fn from(args: UnionArgs) -> Self {
381
UnionOptions {
382
slice: None,
383
parallel: args.parallel,
384
rows: (None, 0),
385
from_partitioned_ds: args.from_partitioned_ds,
386
flattened_by_opt: false,
387
rechunk: args.rechunk,
388
maintain_order: args.maintain_order,
389
}
390
}
391
}
392
393
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
394
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
395
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
396
#[cfg(feature = "json")]
397
pub struct NDJsonReadOptions {
398
pub n_threads: Option<usize>,
399
pub infer_schema_length: Option<NonZeroUsize>,
400
pub chunk_size: NonZeroUsize,
401
pub low_memory: bool,
402
pub ignore_errors: bool,
403
pub schema: Option<SchemaRef>,
404
pub schema_overwrite: Option<SchemaRef>,
405
}
406
407