Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-lazy/src/frame/mod.rs
6939 views
1
//! Lazy variant of a [DataFrame].
2
#[cfg(feature = "python")]
3
mod python;
4
5
mod cached_arenas;
6
mod err;
7
#[cfg(not(target_arch = "wasm32"))]
8
mod exitable;
9
#[cfg(feature = "pivot")]
10
pub mod pivot;
11
12
use std::sync::{Arc, Mutex};
13
14
pub use anonymous_scan::*;
15
#[cfg(feature = "csv")]
16
pub use csv::*;
17
#[cfg(not(target_arch = "wasm32"))]
18
pub use exitable::*;
19
pub use file_list_reader::*;
20
#[cfg(feature = "ipc")]
21
pub use ipc::*;
22
#[cfg(feature = "json")]
23
pub use ndjson::*;
24
#[cfg(feature = "parquet")]
25
pub use parquet::*;
26
use polars_compute::rolling::QuantileMethod;
27
use polars_core::POOL;
28
use polars_core::error::feature_gated;
29
use polars_core::prelude::*;
30
use polars_expr::{ExpressionConversionState, create_physical_expr};
31
use polars_io::RowIndex;
32
use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
33
use polars_ops::frame::{JoinCoalesce, MaintainOrderJoin};
34
#[cfg(feature = "is_between")]
35
use polars_ops::prelude::ClosedInterval;
36
pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
37
use polars_utils::pl_str::PlSmallStr;
38
use polars_utils::plpath::PlPath;
39
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
40
41
use crate::frame::cached_arenas::CachedArena;
42
use crate::prelude::*;
43
44
pub trait IntoLazy {
45
fn lazy(self) -> LazyFrame;
46
}
47
48
impl IntoLazy for DataFrame {
49
/// Convert the `DataFrame` into a `LazyFrame`
50
fn lazy(self) -> LazyFrame {
51
let lp = DslBuilder::from_existing_df(self).build();
52
LazyFrame {
53
logical_plan: lp,
54
opt_state: Default::default(),
55
cached_arena: Default::default(),
56
}
57
}
58
}
59
60
impl IntoLazy for LazyFrame {
61
fn lazy(self) -> LazyFrame {
62
self
63
}
64
}
65
66
/// Lazy abstraction over an eager `DataFrame`.
67
///
68
/// It really is an abstraction over a logical plan. The methods of this struct will incrementally
69
/// modify a logical plan until output is requested (via [`collect`](crate::frame::LazyFrame::collect)).
70
#[derive(Clone, Default)]
71
#[must_use]
72
pub struct LazyFrame {
73
pub logical_plan: DslPlan,
74
pub(crate) opt_state: OptFlags,
75
pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
76
}
77
78
impl From<DslPlan> for LazyFrame {
79
fn from(plan: DslPlan) -> Self {
80
Self {
81
logical_plan: plan,
82
opt_state: OptFlags::default(),
83
cached_arena: Default::default(),
84
}
85
}
86
}
87
88
impl LazyFrame {
89
pub(crate) fn from_inner(
90
logical_plan: DslPlan,
91
opt_state: OptFlags,
92
cached_arena: Arc<Mutex<Option<CachedArena>>>,
93
) -> Self {
94
Self {
95
logical_plan,
96
opt_state,
97
cached_arena,
98
}
99
}
100
101
pub(crate) fn get_plan_builder(self) -> DslBuilder {
102
DslBuilder::from(self.logical_plan)
103
}
104
105
fn get_opt_state(&self) -> OptFlags {
106
self.opt_state
107
}
108
109
fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
110
LazyFrame {
111
logical_plan,
112
opt_state,
113
cached_arena: Default::default(),
114
}
115
}
116
117
/// Get current optimizations.
118
pub fn get_current_optimizations(&self) -> OptFlags {
119
self.opt_state
120
}
121
122
/// Set allowed optimizations.
123
pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
124
self.opt_state = opt_state;
125
self
126
}
127
128
/// Turn off all optimizations.
129
pub fn without_optimizations(self) -> Self {
130
self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
131
}
132
133
/// Toggle projection pushdown optimization.
134
pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
135
self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
136
self
137
}
138
139
/// Toggle cluster with columns optimization.
140
pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
141
self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
142
self
143
}
144
145
/// Toggle collapse joins optimization.
146
pub fn with_collapse_joins(mut self, toggle: bool) -> Self {
147
self.opt_state.set(OptFlags::COLLAPSE_JOINS, toggle);
148
self
149
}
150
151
/// Check if operations are order dependent and unset maintaining_order if
152
/// the order would not be observed.
153
pub fn with_check_order(mut self, toggle: bool) -> Self {
154
self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
155
self
156
}
157
158
/// Toggle predicate pushdown optimization.
159
pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
160
self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
161
self
162
}
163
164
/// Toggle type coercion optimization.
165
pub fn with_type_coercion(mut self, toggle: bool) -> Self {
166
self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
167
self
168
}
169
170
/// Toggle type check optimization.
171
pub fn with_type_check(mut self, toggle: bool) -> Self {
172
self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
173
self
174
}
175
176
/// Toggle expression simplification optimization on or off.
177
pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
178
self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
179
self
180
}
181
182
/// Toggle common subplan elimination optimization on or off
183
#[cfg(feature = "cse")]
184
pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
185
self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
186
self
187
}
188
189
/// Toggle common subexpression elimination optimization on or off
190
#[cfg(feature = "cse")]
191
pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
192
self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
193
self
194
}
195
196
/// Toggle slice pushdown optimization.
197
pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
198
self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
199
self
200
}
201
202
#[cfg(feature = "new_streaming")]
203
pub fn with_new_streaming(mut self, toggle: bool) -> Self {
204
self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
205
self
206
}
207
208
/// Try to estimate the number of rows so that joins can determine which side to keep in memory.
209
pub fn with_row_estimate(mut self, toggle: bool) -> Self {
210
self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
211
self
212
}
213
214
/// Run every node eagerly. This turns off multi-node optimizations.
215
pub fn _with_eager(mut self, toggle: bool) -> Self {
216
self.opt_state.set(OptFlags::EAGER, toggle);
217
self
218
}
219
220
/// Return a String describing the naive (un-optimized) logical plan.
221
pub fn describe_plan(&self) -> PolarsResult<String> {
222
Ok(self.clone().to_alp()?.describe())
223
}
224
225
/// Return a String describing the naive (un-optimized) logical plan in tree format.
226
pub fn describe_plan_tree(&self) -> PolarsResult<String> {
227
Ok(self.clone().to_alp()?.describe_tree_format())
228
}
229
230
/// Return a String describing the optimized logical plan.
231
///
232
/// Returns `Err` if optimizing the logical plan fails.
233
pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
234
Ok(self.clone().to_alp_optimized()?.describe())
235
}
236
237
/// Return a String describing the optimized logical plan in tree format.
238
///
239
/// Returns `Err` if optimizing the logical plan fails.
240
pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
241
Ok(self.clone().to_alp_optimized()?.describe_tree_format())
242
}
243
244
/// Return a String describing the logical plan.
245
///
246
/// If `optimized` is `true`, explains the optimized plan. If `optimized` is `false`,
247
/// explains the naive, un-optimized plan.
248
pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
249
if optimized {
250
self.describe_optimized_plan()
251
} else {
252
self.describe_plan()
253
}
254
}
255
256
/// Add a sort operation to the logical plan.
257
///
258
/// Sorts the LazyFrame by the column name specified using the provided options.
259
///
260
/// # Example
261
///
262
/// Sort DataFrame by 'sepal_width' column:
263
/// ```rust
264
/// # use polars_core::prelude::*;
265
/// # use polars_lazy::prelude::*;
266
/// fn sort_by_a(df: DataFrame) -> LazyFrame {
267
/// df.lazy().sort(["sepal_width"], Default::default())
268
/// }
269
/// ```
270
/// Sort by a single column with specific order:
271
/// ```
272
/// # use polars_core::prelude::*;
273
/// # use polars_lazy::prelude::*;
274
/// fn sort_with_specific_order(df: DataFrame, descending: bool) -> LazyFrame {
275
/// df.lazy().sort(
276
/// ["sepal_width"],
277
/// SortMultipleOptions::new()
278
/// .with_order_descending(descending)
279
/// )
280
/// }
281
/// ```
282
/// Sort by multiple columns with specifying order for each column:
283
/// ```
284
/// # use polars_core::prelude::*;
285
/// # use polars_lazy::prelude::*;
286
/// fn sort_by_multiple_columns_with_specific_order(df: DataFrame) -> LazyFrame {
287
/// df.lazy().sort(
288
/// ["sepal_width", "sepal_length"],
289
/// SortMultipleOptions::new()
290
/// .with_order_descending_multi([false, true])
291
/// )
292
/// }
293
/// ```
294
/// See [`SortMultipleOptions`] for more options.
295
pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
296
let opt_state = self.get_opt_state();
297
let lp = self
298
.get_plan_builder()
299
.sort(by.into_vec().into_iter().map(col).collect(), sort_options)
300
.build();
301
Self::from_logical_plan(lp, opt_state)
302
}
303
304
/// Add a sort operation to the logical plan.
305
///
306
/// Sorts the LazyFrame by the provided list of expressions, which will be turned into
307
/// concrete columns before sorting.
308
///
309
/// See [`SortMultipleOptions`] for more options.
310
///
311
/// # Example
312
///
313
/// ```rust
314
/// use polars_core::prelude::*;
315
/// use polars_lazy::prelude::*;
316
///
317
/// /// Sort DataFrame by 'sepal_width' column
318
/// fn example(df: DataFrame) -> LazyFrame {
319
/// df.lazy()
320
/// .sort_by_exprs(vec![col("sepal_width")], Default::default())
321
/// }
322
/// ```
323
pub fn sort_by_exprs<E: AsRef<[Expr]>>(
324
self,
325
by_exprs: E,
326
sort_options: SortMultipleOptions,
327
) -> Self {
328
let by_exprs = by_exprs.as_ref().to_vec();
329
if by_exprs.is_empty() {
330
self
331
} else {
332
let opt_state = self.get_opt_state();
333
let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
334
Self::from_logical_plan(lp, opt_state)
335
}
336
}
337
338
pub fn top_k<E: AsRef<[Expr]>>(
339
self,
340
k: IdxSize,
341
by_exprs: E,
342
sort_options: SortMultipleOptions,
343
) -> Self {
344
// this will optimize to top-k
345
self.sort_by_exprs(
346
by_exprs,
347
sort_options.with_order_reversed().with_nulls_last(true),
348
)
349
.slice(0, k)
350
}
351
352
pub fn bottom_k<E: AsRef<[Expr]>>(
353
self,
354
k: IdxSize,
355
by_exprs: E,
356
sort_options: SortMultipleOptions,
357
) -> Self {
358
// this will optimize to bottom-k
359
self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
360
.slice(0, k)
361
}
362
363
/// Reverse the `DataFrame` from top to bottom.
364
///
365
/// Row `i` becomes row `number_of_rows - i - 1`.
366
///
367
/// # Example
368
///
369
/// ```rust
370
/// use polars_core::prelude::*;
371
/// use polars_lazy::prelude::*;
372
///
373
/// fn example(df: DataFrame) -> LazyFrame {
374
/// df.lazy()
375
/// .reverse()
376
/// }
377
/// ```
378
pub fn reverse(self) -> Self {
379
self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
380
}
381
382
/// Rename columns in the DataFrame.
383
///
384
/// `existing` and `new` are iterables of the same length containing the old and
385
/// corresponding new column names. Renaming happens to all `existing` columns
386
/// simultaneously, not iteratively. If `strict` is true, all columns in `existing`
387
/// must be present in the `LazyFrame` when `rename` is called; otherwise, only
388
/// those columns that are actually found will be renamed (others will be ignored).
389
pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
390
where
391
I: IntoIterator<Item = T>,
392
J: IntoIterator<Item = S>,
393
T: AsRef<str>,
394
S: AsRef<str>,
395
{
396
let iter = existing.into_iter();
397
let cap = iter.size_hint().0;
398
let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
399
let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
400
401
// TODO! should this error if `existing` and `new` have different lengths?
402
// Currently, the longer of the two is truncated.
403
for (existing, new) in iter.zip(new) {
404
let existing = existing.as_ref();
405
let new = new.as_ref();
406
if new != existing {
407
existing_vec.push(existing.into());
408
new_vec.push(new.into());
409
}
410
}
411
412
self.map_private(DslFunction::Rename {
413
existing: existing_vec.into(),
414
new: new_vec.into(),
415
strict,
416
})
417
}
418
419
/// Removes columns from the DataFrame.
420
/// Note that it's better to only select the columns you need
421
/// and let the projection pushdown optimize away the unneeded columns.
422
///
423
/// Any given columns that are not in the schema will give a [`PolarsError::ColumnNotFound`]
424
/// error while materializing the [`LazyFrame`].
425
pub fn drop(self, columns: Selector) -> Self {
426
let opt_state = self.get_opt_state();
427
let lp = self.get_plan_builder().drop(columns).build();
428
Self::from_logical_plan(lp, opt_state)
429
}
430
431
/// Shift the values by a given period and fill the parts that will be empty due to this operation
432
/// with `Nones`.
433
///
434
/// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
435
pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
436
self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
437
}
438
439
/// Shift the values by a given period and fill the parts that will be empty due to this operation
440
/// with the result of the `fill_value` expression.
441
///
442
/// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
443
pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
444
self.select(vec![
445
col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
446
])
447
}
448
449
/// Fill None values in the DataFrame with an expression.
450
pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
451
let opt_state = self.get_opt_state();
452
let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
453
Self::from_logical_plan(lp, opt_state)
454
}
455
456
/// Fill NaN values in the DataFrame with an expression.
457
pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
458
let opt_state = self.get_opt_state();
459
let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
460
Self::from_logical_plan(lp, opt_state)
461
}
462
463
/// Caches the result into a new LazyFrame.
464
///
465
/// This should be used to prevent computations running multiple times.
466
pub fn cache(self) -> Self {
467
let opt_state = self.get_opt_state();
468
let lp = self.get_plan_builder().cache().build();
469
Self::from_logical_plan(lp, opt_state)
470
}
471
472
/// Cast named frame columns, resulting in a new LazyFrame with updated dtypes
473
pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
474
let cast_cols: Vec<Expr> = dtypes
475
.into_iter()
476
.map(|(name, dt)| {
477
let name = PlSmallStr::from_str(name);
478
479
if strict {
480
col(name).strict_cast(dt)
481
} else {
482
col(name).cast(dt)
483
}
484
})
485
.collect();
486
487
if cast_cols.is_empty() {
488
self
489
} else {
490
self.with_columns(cast_cols)
491
}
492
}
493
494
/// Cast all frame columns to the given dtype, resulting in a new LazyFrame
495
pub fn cast_all(self, dtype: impl Into<DataTypeExpr>, strict: bool) -> Self {
496
self.with_columns(vec![if strict {
497
col(PlSmallStr::from_static("*")).strict_cast(dtype)
498
} else {
499
col(PlSmallStr::from_static("*")).cast(dtype)
500
}])
501
}
502
503
pub fn optimize(
504
self,
505
lp_arena: &mut Arena<IR>,
506
expr_arena: &mut Arena<AExpr>,
507
) -> PolarsResult<Node> {
508
self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![])
509
}
510
511
pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
512
let (mut lp_arena, mut expr_arena) = self.get_arenas();
513
let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![])?;
514
515
Ok(IRPlan::new(node, lp_arena, expr_arena))
516
}
517
518
pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
519
let (mut lp_arena, mut expr_arena) = self.get_arenas();
520
let node = to_alp(
521
self.logical_plan,
522
&mut expr_arena,
523
&mut lp_arena,
524
&mut self.opt_state,
525
)?;
526
let plan = IRPlan::new(node, lp_arena, expr_arena);
527
Ok(plan)
528
}
529
530
pub(crate) fn optimize_with_scratch(
531
self,
532
lp_arena: &mut Arena<IR>,
533
expr_arena: &mut Arena<AExpr>,
534
scratch: &mut Vec<Node>,
535
) -> PolarsResult<Node> {
536
#[allow(unused_mut)]
537
let mut opt_state = self.opt_state;
538
let new_streaming = self.opt_state.contains(OptFlags::NEW_STREAMING);
539
540
#[cfg(feature = "cse")]
541
if new_streaming {
542
// The new streaming engine can't deal with the way the common
543
// subexpression elimination adds length-incorrect with_columns.
544
opt_state &= !OptFlags::COMM_SUBEXPR_ELIM;
545
}
546
547
let lp_top = optimize(
548
self.logical_plan,
549
opt_state,
550
lp_arena,
551
expr_arena,
552
scratch,
553
Some(&|expr, expr_arena, schema| {
554
let phys_expr = create_physical_expr(
555
expr,
556
Context::Default,
557
expr_arena,
558
schema,
559
&mut ExpressionConversionState::new(true),
560
)
561
.ok()?;
562
let io_expr = phys_expr_to_io_expr(phys_expr);
563
Some(io_expr)
564
}),
565
)?;
566
567
Ok(lp_top)
568
}
569
570
fn prepare_collect_post_opt<P>(
571
mut self,
572
check_sink: bool,
573
query_start: Option<std::time::Instant>,
574
post_opt: P,
575
) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
576
where
577
P: FnOnce(
578
Node,
579
&mut Arena<IR>,
580
&mut Arena<AExpr>,
581
Option<std::time::Duration>,
582
) -> PolarsResult<()>,
583
{
584
let (mut lp_arena, mut expr_arena) = self.get_arenas();
585
586
let mut scratch = vec![];
587
let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch)?;
588
589
post_opt(
590
lp_top,
591
&mut lp_arena,
592
&mut expr_arena,
593
// Post optimization callback gets the time since the
594
// query was started as its "base" timepoint.
595
query_start.map(|s| s.elapsed()),
596
)?;
597
598
// sink should be replaced
599
let no_file_sink = if check_sink {
600
!matches!(
601
lp_arena.get(lp_top),
602
IR::Sink {
603
payload: SinkTypeIR::File { .. } | SinkTypeIR::Partition { .. },
604
..
605
}
606
)
607
} else {
608
true
609
};
610
let physical_plan = create_physical_plan(
611
lp_top,
612
&mut lp_arena,
613
&mut expr_arena,
614
BUILD_STREAMING_EXECUTOR,
615
)?;
616
617
let state = ExecutionState::new();
618
Ok((state, physical_plan, no_file_sink))
619
}
620
621
// post_opt: A function that is called after optimization. This can be used to modify the IR jit.
622
pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
623
where
624
P: FnOnce(
625
Node,
626
&mut Arena<IR>,
627
&mut Arena<AExpr>,
628
Option<std::time::Duration>,
629
) -> PolarsResult<()>,
630
{
631
let (mut state, mut physical_plan, _) =
632
self.prepare_collect_post_opt(false, None, post_opt)?;
633
physical_plan.execute(&mut state)
634
}
635
636
#[allow(unused_mut)]
637
fn prepare_collect(
638
self,
639
check_sink: bool,
640
query_start: Option<std::time::Instant>,
641
) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
642
self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
643
}
644
645
/// Execute all the lazy operations and collect them into a [`DataFrame`] using a specified
646
/// `engine`.
647
///
648
/// The query is optimized prior to execution.
649
pub fn collect_with_engine(mut self, mut engine: Engine) -> PolarsResult<DataFrame> {
650
let payload = if let DslPlan::Sink { payload, .. } = &self.logical_plan {
651
payload.clone()
652
} else {
653
self.logical_plan = DslPlan::Sink {
654
input: Arc::new(self.logical_plan),
655
payload: SinkType::Memory,
656
};
657
SinkType::Memory
658
};
659
660
// Default engine for collect is InMemory, sink_* is Streaming
661
if engine == Engine::Auto {
662
engine = match payload {
663
#[cfg(feature = "new_streaming")]
664
SinkType::File { .. } | SinkType::Partition { .. } => Engine::Streaming,
665
_ => Engine::InMemory,
666
};
667
}
668
// Gpu uses some hacks to dispatch.
669
if engine == Engine::Gpu {
670
engine = Engine::InMemory;
671
}
672
673
#[cfg(feature = "new_streaming")]
674
{
675
if let Some(result) = self.try_new_streaming_if_requested() {
676
return result.map(|v| v.unwrap_single());
677
}
678
}
679
680
match engine {
681
Engine::Auto => unreachable!(),
682
Engine::Streaming => {
683
feature_gated!("new_streaming", self = self.with_new_streaming(true))
684
},
685
_ => {},
686
}
687
let mut alp_plan = self.clone().to_alp_optimized()?;
688
689
match engine {
690
Engine::Auto | Engine::Streaming => feature_gated!("new_streaming", {
691
let result = polars_stream::run_query(
692
alp_plan.lp_top,
693
&mut alp_plan.lp_arena,
694
&mut alp_plan.expr_arena,
695
);
696
result.map(|v| v.unwrap_single())
697
}),
698
Engine::Gpu => {
699
Err(polars_err!(InvalidOperation: "sink is not supported for the gpu engine"))
700
},
701
Engine::InMemory => {
702
let mut physical_plan = create_physical_plan(
703
alp_plan.lp_top,
704
&mut alp_plan.lp_arena,
705
&mut alp_plan.expr_arena,
706
BUILD_STREAMING_EXECUTOR,
707
)?;
708
let mut state = ExecutionState::new();
709
physical_plan.execute(&mut state)
710
},
711
}
712
}
713
714
pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
715
let sink_multiple = LazyFrame {
716
logical_plan: DslPlan::SinkMultiple { inputs: plans },
717
opt_state,
718
cached_arena: Default::default(),
719
};
720
sink_multiple.explain(true)
721
}
722
723
pub fn collect_all_with_engine(
724
plans: Vec<DslPlan>,
725
mut engine: Engine,
726
opt_state: OptFlags,
727
) -> PolarsResult<Vec<DataFrame>> {
728
if plans.is_empty() {
729
return Ok(Vec::new());
730
}
731
732
// Default engine for collect_all is InMemory
733
if engine == Engine::Auto {
734
engine = Engine::InMemory;
735
}
736
// Gpu uses some hacks to dispatch.
737
if engine == Engine::Gpu {
738
engine = Engine::InMemory;
739
}
740
741
let mut sink_multiple = LazyFrame {
742
logical_plan: DslPlan::SinkMultiple { inputs: plans },
743
opt_state,
744
cached_arena: Default::default(),
745
};
746
747
#[cfg(feature = "new_streaming")]
748
{
749
if let Some(result) = sink_multiple.try_new_streaming_if_requested() {
750
return result.map(|v| v.unwrap_multiple());
751
}
752
}
753
754
match engine {
755
Engine::Auto => unreachable!(),
756
Engine::Streaming => {
757
feature_gated!(
758
"new_streaming",
759
sink_multiple = sink_multiple.with_new_streaming(true)
760
)
761
},
762
_ => {},
763
}
764
let mut alp_plan = sink_multiple.to_alp_optimized()?;
765
766
if engine == Engine::Streaming {
767
feature_gated!("new_streaming", {
768
let result = polars_stream::run_query(
769
alp_plan.lp_top,
770
&mut alp_plan.lp_arena,
771
&mut alp_plan.expr_arena,
772
);
773
return result.map(|v| v.unwrap_multiple());
774
});
775
}
776
777
let IR::SinkMultiple { inputs } = alp_plan.root() else {
778
unreachable!()
779
};
780
781
let mut multiplan = create_multiple_physical_plans(
782
inputs.clone().as_slice(),
783
&mut alp_plan.lp_arena,
784
&mut alp_plan.expr_arena,
785
BUILD_STREAMING_EXECUTOR,
786
)?;
787
788
match engine {
789
Engine::Gpu => polars_bail!(
790
InvalidOperation: "collect_all is not supported for the gpu engine"
791
),
792
Engine::InMemory => {
793
// We don't use par_iter directly because the LP may also start threads for every LP (for instance scan_csv)
794
// this might then lead to a rayon SO. So we take a multitude of the threads to keep work stealing
795
// within bounds
796
let mut state = ExecutionState::new();
797
if let Some(mut cache_prefiller) = multiplan.cache_prefiller {
798
cache_prefiller.execute(&mut state)?;
799
}
800
let out = POOL.install(|| {
801
multiplan
802
.physical_plans
803
.chunks_mut(POOL.current_num_threads() * 3)
804
.map(|chunk| {
805
chunk
806
.into_par_iter()
807
.enumerate()
808
.map(|(idx, input)| {
809
let mut input = std::mem::take(input);
810
let mut state = state.split();
811
state.branch_idx += idx;
812
813
let df = input.execute(&mut state)?;
814
Ok(df)
815
})
816
.collect::<PolarsResult<Vec<_>>>()
817
})
818
.collect::<PolarsResult<Vec<_>>>()
819
});
820
Ok(out?.into_iter().flatten().collect())
821
},
822
_ => unreachable!(),
823
}
824
}
825
826
/// Execute all the lazy operations and collect them into a [`DataFrame`].
827
///
828
/// The query is optimized prior to execution.
829
///
830
/// # Example
831
///
832
/// ```rust
833
/// use polars_core::prelude::*;
834
/// use polars_lazy::prelude::*;
835
///
836
/// fn example(df: DataFrame) -> PolarsResult<DataFrame> {
837
/// df.lazy()
838
/// .group_by([col("foo")])
839
/// .agg([col("bar").sum(), col("ham").mean().alias("avg_ham")])
840
/// .collect()
841
/// }
842
/// ```
843
pub fn collect(self) -> PolarsResult<DataFrame> {
844
self.collect_with_engine(Engine::InMemory)
845
}
846
847
// post_opt: A function that is called after optimization. This can be used to modify the IR jit.
848
// This version does profiling of the node execution.
849
pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
850
where
851
P: FnOnce(
852
Node,
853
&mut Arena<IR>,
854
&mut Arena<AExpr>,
855
Option<std::time::Duration>,
856
) -> PolarsResult<()>,
857
{
858
let query_start = std::time::Instant::now();
859
let (mut state, mut physical_plan, _) =
860
self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
861
state.time_nodes(query_start);
862
let out = physical_plan.execute(&mut state)?;
863
let timer_df = state.finish_timer()?;
864
Ok((out, timer_df))
865
}
866
867
/// Profile a LazyFrame.
868
///
869
/// This will run the query and return a tuple
870
/// containing the materialized DataFrame and a DataFrame that contains profiling information
871
/// of each node that is executed.
872
///
873
/// The units of the timings are microseconds.
874
pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
875
self._profile_post_opt(|_, _, _, _| Ok(()))
876
}
877
878
/// Stream a query result into a parquet file. This is useful if the final result doesn't fit
879
/// into memory. This methods will return an error if the query cannot be completely done in a
880
/// streaming fashion.
881
#[cfg(feature = "parquet")]
882
pub fn sink_parquet(
883
self,
884
target: SinkTarget,
885
options: ParquetWriteOptions,
886
cloud_options: Option<polars_io::cloud::CloudOptions>,
887
sink_options: SinkOptions,
888
) -> PolarsResult<Self> {
889
self.sink(SinkType::File(FileSinkType {
890
target,
891
sink_options,
892
file_type: FileType::Parquet(options),
893
cloud_options,
894
}))
895
}
896
897
/// Stream a query result into an ipc/arrow file. This is useful if the final result doesn't fit
898
/// into memory. This methods will return an error if the query cannot be completely done in a
899
/// streaming fashion.
900
#[cfg(feature = "ipc")]
901
pub fn sink_ipc(
902
self,
903
target: SinkTarget,
904
options: IpcWriterOptions,
905
cloud_options: Option<polars_io::cloud::CloudOptions>,
906
sink_options: SinkOptions,
907
) -> PolarsResult<Self> {
908
self.sink(SinkType::File(FileSinkType {
909
target,
910
sink_options,
911
file_type: FileType::Ipc(options),
912
cloud_options,
913
}))
914
}
915
916
/// Stream a query result into an csv file. This is useful if the final result doesn't fit
917
/// into memory. This methods will return an error if the query cannot be completely done in a
918
/// streaming fashion.
919
#[cfg(feature = "csv")]
920
pub fn sink_csv(
921
self,
922
target: SinkTarget,
923
options: CsvWriterOptions,
924
cloud_options: Option<polars_io::cloud::CloudOptions>,
925
sink_options: SinkOptions,
926
) -> PolarsResult<Self> {
927
self.sink(SinkType::File(FileSinkType {
928
target,
929
sink_options,
930
file_type: FileType::Csv(options),
931
cloud_options,
932
}))
933
}
934
935
/// Stream a query result into a JSON file. This is useful if the final result doesn't fit
936
/// into memory. This methods will return an error if the query cannot be completely done in a
937
/// streaming fashion.
938
#[cfg(feature = "json")]
939
pub fn sink_json(
940
self,
941
target: SinkTarget,
942
options: JsonWriterOptions,
943
cloud_options: Option<polars_io::cloud::CloudOptions>,
944
sink_options: SinkOptions,
945
) -> PolarsResult<Self> {
946
self.sink(SinkType::File(FileSinkType {
947
target,
948
sink_options,
949
file_type: FileType::Json(options),
950
cloud_options,
951
}))
952
}
953
954
/// Stream a query result into a parquet file in a partitioned manner. This is useful if the
955
/// final result doesn't fit into memory. This methods will return an error if the query cannot
956
/// be completely done in a streaming fashion.
957
#[cfg(feature = "parquet")]
958
#[allow(clippy::too_many_arguments)]
959
pub fn sink_parquet_partitioned(
960
self,
961
base_path: Arc<PlPath>,
962
file_path_cb: Option<PartitionTargetCallback>,
963
variant: PartitionVariant,
964
options: ParquetWriteOptions,
965
cloud_options: Option<polars_io::cloud::CloudOptions>,
966
sink_options: SinkOptions,
967
per_partition_sort_by: Option<Vec<SortColumn>>,
968
finish_callback: Option<SinkFinishCallback>,
969
) -> PolarsResult<Self> {
970
self.sink(SinkType::Partition(PartitionSinkType {
971
base_path,
972
file_path_cb,
973
sink_options,
974
variant,
975
file_type: FileType::Parquet(options),
976
cloud_options,
977
per_partition_sort_by,
978
finish_callback,
979
}))
980
}
981
982
/// Stream a query result into an ipc/arrow file in a partitioned manner. This is useful if the
983
/// final result doesn't fit into memory. This methods will return an error if the query cannot
984
/// be completely done in a streaming fashion.
985
#[cfg(feature = "ipc")]
986
#[allow(clippy::too_many_arguments)]
987
pub fn sink_ipc_partitioned(
988
self,
989
base_path: Arc<PlPath>,
990
file_path_cb: Option<PartitionTargetCallback>,
991
variant: PartitionVariant,
992
options: IpcWriterOptions,
993
cloud_options: Option<polars_io::cloud::CloudOptions>,
994
sink_options: SinkOptions,
995
per_partition_sort_by: Option<Vec<SortColumn>>,
996
finish_callback: Option<SinkFinishCallback>,
997
) -> PolarsResult<Self> {
998
self.sink(SinkType::Partition(PartitionSinkType {
999
base_path,
1000
file_path_cb,
1001
sink_options,
1002
variant,
1003
file_type: FileType::Ipc(options),
1004
cloud_options,
1005
per_partition_sort_by,
1006
finish_callback,
1007
}))
1008
}
1009
1010
/// Stream a query result into an csv file in a partitioned manner. This is useful if the final
1011
/// result doesn't fit into memory. This methods will return an error if the query cannot be
1012
/// completely done in a streaming fashion.
1013
#[cfg(feature = "csv")]
1014
#[allow(clippy::too_many_arguments)]
1015
pub fn sink_csv_partitioned(
1016
self,
1017
base_path: Arc<PlPath>,
1018
file_path_cb: Option<PartitionTargetCallback>,
1019
variant: PartitionVariant,
1020
options: CsvWriterOptions,
1021
cloud_options: Option<polars_io::cloud::CloudOptions>,
1022
sink_options: SinkOptions,
1023
per_partition_sort_by: Option<Vec<SortColumn>>,
1024
finish_callback: Option<SinkFinishCallback>,
1025
) -> PolarsResult<Self> {
1026
self.sink(SinkType::Partition(PartitionSinkType {
1027
base_path,
1028
file_path_cb,
1029
sink_options,
1030
variant,
1031
file_type: FileType::Csv(options),
1032
cloud_options,
1033
per_partition_sort_by,
1034
finish_callback,
1035
}))
1036
}
1037
1038
/// Stream a query result into a JSON file in a partitioned manner. This is useful if the final
1039
/// result doesn't fit into memory. This methods will return an error if the query cannot be
1040
/// completely done in a streaming fashion.
1041
#[cfg(feature = "json")]
1042
#[allow(clippy::too_many_arguments)]
1043
pub fn sink_json_partitioned(
1044
self,
1045
base_path: Arc<PlPath>,
1046
file_path_cb: Option<PartitionTargetCallback>,
1047
variant: PartitionVariant,
1048
options: JsonWriterOptions,
1049
cloud_options: Option<polars_io::cloud::CloudOptions>,
1050
sink_options: SinkOptions,
1051
per_partition_sort_by: Option<Vec<SortColumn>>,
1052
finish_callback: Option<SinkFinishCallback>,
1053
) -> PolarsResult<Self> {
1054
self.sink(SinkType::Partition(PartitionSinkType {
1055
base_path,
1056
file_path_cb,
1057
sink_options,
1058
variant,
1059
file_type: FileType::Json(options),
1060
cloud_options,
1061
per_partition_sort_by,
1062
finish_callback,
1063
}))
1064
}
1065
1066
#[cfg(feature = "new_streaming")]
1067
pub fn try_new_streaming_if_requested(
1068
&mut self,
1069
) -> Option<PolarsResult<polars_stream::QueryResult>> {
1070
let auto_new_streaming = std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1");
1071
let force_new_streaming = std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1");
1072
1073
if auto_new_streaming || force_new_streaming {
1074
// Try to run using the new streaming engine, falling back
1075
// if it fails in a todo!() error if auto_new_streaming is set.
1076
let mut new_stream_lazy = self.clone();
1077
new_stream_lazy.opt_state |= OptFlags::NEW_STREAMING;
1078
let mut alp_plan = match new_stream_lazy.to_alp_optimized() {
1079
Ok(v) => v,
1080
Err(e) => return Some(Err(e)),
1081
};
1082
1083
let f = || {
1084
polars_stream::run_query(
1085
alp_plan.lp_top,
1086
&mut alp_plan.lp_arena,
1087
&mut alp_plan.expr_arena,
1088
)
1089
};
1090
1091
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
1092
Ok(v) => return Some(v),
1093
Err(e) => {
1094
// Fallback to normal engine if error is due to not being implemented
1095
// and auto_new_streaming is set, otherwise propagate error.
1096
if !force_new_streaming
1097
&& auto_new_streaming
1098
&& e.downcast_ref::<&str>()
1099
.map(|s| s.starts_with("not yet implemented"))
1100
.unwrap_or(false)
1101
{
1102
if polars_core::config::verbose() {
1103
eprintln!(
1104
"caught unimplemented error in new streaming engine, falling back to normal engine"
1105
);
1106
}
1107
} else {
1108
std::panic::resume_unwind(e);
1109
}
1110
},
1111
}
1112
}
1113
1114
None
1115
}
1116
1117
fn sink(mut self, payload: SinkType) -> Result<LazyFrame, PolarsError> {
1118
polars_ensure!(
1119
!matches!(self.logical_plan, DslPlan::Sink { .. }),
1120
InvalidOperation: "cannot create a sink on top of another sink"
1121
);
1122
self.logical_plan = DslPlan::Sink {
1123
input: Arc::new(self.logical_plan),
1124
payload,
1125
};
1126
Ok(self)
1127
}
1128
1129
/// Filter frame rows that match a predicate expression.
1130
///
1131
/// The expression must yield boolean values (note that rows where the
1132
/// predicate resolves to `null` are *not* included in the resulting frame).
1133
///
1134
/// # Example
1135
///
1136
/// ```rust
1137
/// use polars_core::prelude::*;
1138
/// use polars_lazy::prelude::*;
1139
///
1140
/// fn example(df: DataFrame) -> LazyFrame {
1141
/// df.lazy()
1142
/// .filter(col("sepal_width").is_not_null())
1143
/// .select([col("sepal_width"), col("sepal_length")])
1144
/// }
1145
/// ```
1146
pub fn filter(self, predicate: Expr) -> Self {
1147
let opt_state = self.get_opt_state();
1148
let lp = self.get_plan_builder().filter(predicate).build();
1149
Self::from_logical_plan(lp, opt_state)
1150
}
1151
1152
/// Remove frame rows that match a predicate expression.
1153
///
1154
/// The expression must yield boolean values (note that rows where the
1155
/// predicate resolves to `null` are *not* removed from the resulting frame).
1156
///
1157
/// # Example
1158
///
1159
/// ```rust
1160
/// use polars_core::prelude::*;
1161
/// use polars_lazy::prelude::*;
1162
///
1163
/// fn example(df: DataFrame) -> LazyFrame {
1164
/// df.lazy()
1165
/// .remove(col("sepal_width").is_null())
1166
/// .select([col("sepal_width"), col("sepal_length")])
1167
/// }
1168
/// ```
1169
pub fn remove(self, predicate: Expr) -> Self {
1170
self.filter(predicate.neq_missing(lit(true)))
1171
}
1172
1173
/// Select (and optionally rename, with [`alias`](crate::dsl::Expr::alias)) columns from the query.
1174
///
1175
/// Columns can be selected with [`col`];
1176
/// If you want to select all columns use `col(PlSmallStr::from_static("*"))`.
1177
///
1178
/// # Example
1179
///
1180
/// ```rust
1181
/// use polars_core::prelude::*;
1182
/// use polars_lazy::prelude::*;
1183
///
1184
/// /// This function selects column "foo" and column "bar".
1185
/// /// Column "bar" is renamed to "ham".
1186
/// fn example(df: DataFrame) -> LazyFrame {
1187
/// df.lazy()
1188
/// .select([col("foo"),
1189
/// col("bar").alias("ham")])
1190
/// }
1191
///
1192
/// /// This function selects all columns except "foo"
1193
/// fn exclude_a_column(df: DataFrame) -> LazyFrame {
1194
/// df.lazy()
1195
/// .select([all().exclude_cols(["foo"]).as_expr()])
1196
/// }
1197
/// ```
1198
pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1199
let exprs = exprs.as_ref().to_vec();
1200
self.select_impl(
1201
exprs,
1202
ProjectionOptions {
1203
run_parallel: true,
1204
duplicate_check: true,
1205
should_broadcast: true,
1206
},
1207
)
1208
}
1209
1210
pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1211
let exprs = exprs.as_ref().to_vec();
1212
self.select_impl(
1213
exprs,
1214
ProjectionOptions {
1215
run_parallel: false,
1216
duplicate_check: true,
1217
should_broadcast: true,
1218
},
1219
)
1220
}
1221
1222
fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1223
let opt_state = self.get_opt_state();
1224
let lp = self.get_plan_builder().project(exprs, options).build();
1225
Self::from_logical_plan(lp, opt_state)
1226
}
1227
1228
/// Performs a "group-by" on a `LazyFrame`, producing a [`LazyGroupBy`], which can subsequently be aggregated.
1229
///
1230
/// Takes a list of expressions to group on.
1231
///
1232
/// # Example
1233
///
1234
/// ```rust
1235
/// use polars_core::prelude::*;
1236
/// use polars_lazy::prelude::*;
1237
///
1238
/// fn example(df: DataFrame) -> LazyFrame {
1239
/// df.lazy()
1240
/// .group_by([col("date")])
1241
/// .agg([
1242
/// col("rain").min().alias("min_rain"),
1243
/// col("rain").sum().alias("sum_rain"),
1244
/// col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
1245
/// ])
1246
/// }
1247
/// ```
1248
pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1249
let keys = by
1250
.as_ref()
1251
.iter()
1252
.map(|e| e.clone().into())
1253
.collect::<Vec<_>>();
1254
let opt_state = self.get_opt_state();
1255
1256
#[cfg(feature = "dynamic_group_by")]
1257
{
1258
LazyGroupBy {
1259
logical_plan: self.logical_plan,
1260
opt_state,
1261
keys,
1262
maintain_order: false,
1263
dynamic_options: None,
1264
rolling_options: None,
1265
}
1266
}
1267
1268
#[cfg(not(feature = "dynamic_group_by"))]
1269
{
1270
LazyGroupBy {
1271
logical_plan: self.logical_plan,
1272
opt_state,
1273
keys,
1274
maintain_order: false,
1275
}
1276
}
1277
}
1278
1279
/// Create rolling groups based on a time column.
1280
///
1281
/// Also works for index values of type UInt32, UInt64, Int32, or Int64.
1282
///
1283
/// Different from a [`group_by_dynamic`][`Self::group_by_dynamic`], the windows are now determined by the
1284
/// individual values and are not of constant intervals. For constant intervals use
1285
/// *group_by_dynamic*
1286
#[cfg(feature = "dynamic_group_by")]
1287
pub fn rolling<E: AsRef<[Expr]>>(
1288
mut self,
1289
index_column: Expr,
1290
group_by: E,
1291
mut options: RollingGroupOptions,
1292
) -> LazyGroupBy {
1293
if let Expr::Column(name) = index_column {
1294
options.index_column = name;
1295
} else {
1296
let output_field = index_column
1297
.to_field(&self.collect_schema().unwrap())
1298
.unwrap();
1299
return self.with_column(index_column).rolling(
1300
Expr::Column(output_field.name().clone()),
1301
group_by,
1302
options,
1303
);
1304
}
1305
let opt_state = self.get_opt_state();
1306
LazyGroupBy {
1307
logical_plan: self.logical_plan,
1308
opt_state,
1309
keys: group_by.as_ref().to_vec(),
1310
maintain_order: true,
1311
dynamic_options: None,
1312
rolling_options: Some(options),
1313
}
1314
}
1315
1316
/// Group based on a time value (or index value of type Int32, Int64).
1317
///
1318
/// Time windows are calculated and rows are assigned to windows. Different from a
1319
/// normal group_by is that a row can be member of multiple groups. The time/index
1320
/// window could be seen as a rolling window, with a window size determined by
1321
/// dates/times/values instead of slots in the DataFrame.
1322
///
1323
/// A window is defined by:
1324
///
1325
/// - every: interval of the window
1326
/// - period: length of the window
1327
/// - offset: offset of the window
1328
///
1329
/// The `group_by` argument should be empty `[]` if you don't want to combine this
1330
/// with a ordinary group_by on these keys.
1331
#[cfg(feature = "dynamic_group_by")]
1332
pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1333
mut self,
1334
index_column: Expr,
1335
group_by: E,
1336
mut options: DynamicGroupOptions,
1337
) -> LazyGroupBy {
1338
if let Expr::Column(name) = index_column {
1339
options.index_column = name;
1340
} else {
1341
let output_field = index_column
1342
.to_field(&self.collect_schema().unwrap())
1343
.unwrap();
1344
return self.with_column(index_column).group_by_dynamic(
1345
Expr::Column(output_field.name().clone()),
1346
group_by,
1347
options,
1348
);
1349
}
1350
let opt_state = self.get_opt_state();
1351
LazyGroupBy {
1352
logical_plan: self.logical_plan,
1353
opt_state,
1354
keys: group_by.as_ref().to_vec(),
1355
maintain_order: true,
1356
dynamic_options: Some(options),
1357
rolling_options: None,
1358
}
1359
}
1360
1361
/// Similar to [`group_by`][`Self::group_by`], but order of the DataFrame is maintained.
1362
pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1363
let keys = by
1364
.as_ref()
1365
.iter()
1366
.map(|e| e.clone().into())
1367
.collect::<Vec<_>>();
1368
let opt_state = self.get_opt_state();
1369
1370
#[cfg(feature = "dynamic_group_by")]
1371
{
1372
LazyGroupBy {
1373
logical_plan: self.logical_plan,
1374
opt_state,
1375
keys,
1376
maintain_order: true,
1377
dynamic_options: None,
1378
rolling_options: None,
1379
}
1380
}
1381
1382
#[cfg(not(feature = "dynamic_group_by"))]
1383
{
1384
LazyGroupBy {
1385
logical_plan: self.logical_plan,
1386
opt_state,
1387
keys,
1388
maintain_order: true,
1389
}
1390
}
1391
}
1392
1393
/// Left anti join this query with another lazy query.
1394
///
1395
/// Matches on the values of the expressions `left_on` and `right_on`. For more
1396
/// flexible join logic, see [`join`](LazyFrame::join) or
1397
/// [`join_builder`](LazyFrame::join_builder).
1398
///
1399
/// # Example
1400
///
1401
/// ```rust
1402
/// use polars_core::prelude::*;
1403
/// use polars_lazy::prelude::*;
1404
/// fn anti_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1405
/// ldf
1406
/// .anti_join(other, col("foo"), col("bar").cast(DataType::String))
1407
/// }
1408
/// ```
1409
#[cfg(feature = "semi_anti_join")]
1410
pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1411
self.join(
1412
other,
1413
[left_on.into()],
1414
[right_on.into()],
1415
JoinArgs::new(JoinType::Anti),
1416
)
1417
}
1418
1419
/// Creates the Cartesian product from both frames, preserving the order of the left keys.
1420
#[cfg(feature = "cross_join")]
1421
pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1422
self.join(
1423
other,
1424
vec![],
1425
vec![],
1426
JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1427
)
1428
}
1429
1430
/// Left outer join this query with another lazy query.
1431
///
1432
/// Matches on the values of the expressions `left_on` and `right_on`. For more
1433
/// flexible join logic, see [`join`](LazyFrame::join) or
1434
/// [`join_builder`](LazyFrame::join_builder).
1435
///
1436
/// # Example
1437
///
1438
/// ```rust
1439
/// use polars_core::prelude::*;
1440
/// use polars_lazy::prelude::*;
1441
/// fn left_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1442
/// ldf
1443
/// .left_join(other, col("foo"), col("bar"))
1444
/// }
1445
/// ```
1446
pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1447
self.join(
1448
other,
1449
[left_on.into()],
1450
[right_on.into()],
1451
JoinArgs::new(JoinType::Left),
1452
)
1453
}
1454
1455
/// Inner join this query with another lazy query.
1456
///
1457
/// Matches on the values of the expressions `left_on` and `right_on`. For more
1458
/// flexible join logic, see [`join`](LazyFrame::join) or
1459
/// [`join_builder`](LazyFrame::join_builder).
1460
///
1461
/// # Example
1462
///
1463
/// ```rust
1464
/// use polars_core::prelude::*;
1465
/// use polars_lazy::prelude::*;
1466
/// fn inner_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1467
/// ldf
1468
/// .inner_join(other, col("foo"), col("bar").cast(DataType::String))
1469
/// }
1470
/// ```
1471
pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1472
self.join(
1473
other,
1474
[left_on.into()],
1475
[right_on.into()],
1476
JoinArgs::new(JoinType::Inner),
1477
)
1478
}
1479
1480
/// Full outer join this query with another lazy query.
1481
///
1482
/// Matches on the values of the expressions `left_on` and `right_on`. For more
1483
/// flexible join logic, see [`join`](LazyFrame::join) or
1484
/// [`join_builder`](LazyFrame::join_builder).
1485
///
1486
/// # Example
1487
///
1488
/// ```rust
1489
/// use polars_core::prelude::*;
1490
/// use polars_lazy::prelude::*;
1491
/// fn full_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1492
/// ldf
1493
/// .full_join(other, col("foo"), col("bar"))
1494
/// }
1495
/// ```
1496
pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1497
self.join(
1498
other,
1499
[left_on.into()],
1500
[right_on.into()],
1501
JoinArgs::new(JoinType::Full),
1502
)
1503
}
1504
1505
/// Left semi join this query with another lazy query.
1506
///
1507
/// Matches on the values of the expressions `left_on` and `right_on`. For more
1508
/// flexible join logic, see [`join`](LazyFrame::join) or
1509
/// [`join_builder`](LazyFrame::join_builder).
1510
///
1511
/// # Example
1512
///
1513
/// ```rust
1514
/// use polars_core::prelude::*;
1515
/// use polars_lazy::prelude::*;
1516
/// fn semi_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1517
/// ldf
1518
/// .semi_join(other, col("foo"), col("bar").cast(DataType::String))
1519
/// }
1520
/// ```
1521
#[cfg(feature = "semi_anti_join")]
1522
pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1523
self.join(
1524
other,
1525
[left_on.into()],
1526
[right_on.into()],
1527
JoinArgs::new(JoinType::Semi),
1528
)
1529
}
1530
1531
/// Generic function to join two LazyFrames.
1532
///
1533
/// `join` can join on multiple columns, given as two list of expressions, and with a
1534
/// [`JoinType`] specified by `how`. Non-joined column names in the right DataFrame
1535
/// that already exist in this DataFrame are suffixed with `"_right"`. For control
1536
/// over how columns are renamed and parallelization options, use
1537
/// [`join_builder`](LazyFrame::join_builder).
1538
///
1539
/// Any provided `args.slice` parameter is not considered, but set by the internal optimizer.
1540
///
1541
/// # Example
1542
///
1543
/// ```rust
1544
/// use polars_core::prelude::*;
1545
/// use polars_lazy::prelude::*;
1546
///
1547
/// fn example(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1548
/// ldf
1549
/// .join(other, [col("foo"), col("bar")], [col("foo"), col("bar")], JoinArgs::new(JoinType::Inner))
1550
/// }
1551
/// ```
1552
pub fn join<E: AsRef<[Expr]>>(
1553
self,
1554
other: LazyFrame,
1555
left_on: E,
1556
right_on: E,
1557
args: JoinArgs,
1558
) -> LazyFrame {
1559
let left_on = left_on.as_ref().to_vec();
1560
let right_on = right_on.as_ref().to_vec();
1561
1562
self._join_impl(other, left_on, right_on, args)
1563
}
1564
1565
fn _join_impl(
1566
self,
1567
other: LazyFrame,
1568
left_on: Vec<Expr>,
1569
right_on: Vec<Expr>,
1570
args: JoinArgs,
1571
) -> LazyFrame {
1572
let JoinArgs {
1573
how,
1574
validation,
1575
suffix,
1576
slice,
1577
nulls_equal,
1578
coalesce,
1579
maintain_order,
1580
} = args;
1581
1582
if slice.is_some() {
1583
panic!("impl error: slice is not handled")
1584
}
1585
1586
let mut builder = self
1587
.join_builder()
1588
.with(other)
1589
.left_on(left_on)
1590
.right_on(right_on)
1591
.how(how)
1592
.validate(validation)
1593
.join_nulls(nulls_equal)
1594
.coalesce(coalesce)
1595
.maintain_order(maintain_order);
1596
1597
if let Some(suffix) = suffix {
1598
builder = builder.suffix(suffix);
1599
}
1600
1601
// Note: args.slice is set by the optimizer
1602
builder.finish()
1603
}
1604
1605
/// Consume `self` and return a [`JoinBuilder`] to customize a join on this LazyFrame.
1606
///
1607
/// After the `JoinBuilder` has been created and set up, calling
1608
/// [`finish()`](JoinBuilder::finish) on it will give back the `LazyFrame`
1609
/// representing the `join` operation.
1610
pub fn join_builder(self) -> JoinBuilder {
1611
JoinBuilder::new(self)
1612
}
1613
1614
/// Add or replace a column, given as an expression, to a DataFrame.
1615
///
1616
/// # Example
1617
///
1618
/// ```rust
1619
/// use polars_core::prelude::*;
1620
/// use polars_lazy::prelude::*;
1621
/// fn add_column(df: DataFrame) -> LazyFrame {
1622
/// df.lazy()
1623
/// .with_column(
1624
/// when(col("sepal_length").lt(lit(5.0)))
1625
/// .then(lit(10))
1626
/// .otherwise(lit(1))
1627
/// .alias("new_column_name"),
1628
/// )
1629
/// }
1630
/// ```
1631
pub fn with_column(self, expr: Expr) -> LazyFrame {
1632
let opt_state = self.get_opt_state();
1633
let lp = self
1634
.get_plan_builder()
1635
.with_columns(
1636
vec![expr],
1637
ProjectionOptions {
1638
run_parallel: false,
1639
duplicate_check: true,
1640
should_broadcast: true,
1641
},
1642
)
1643
.build();
1644
Self::from_logical_plan(lp, opt_state)
1645
}
1646
1647
/// Add or replace multiple columns, given as expressions, to a DataFrame.
1648
///
1649
/// # Example
1650
///
1651
/// ```rust
1652
/// use polars_core::prelude::*;
1653
/// use polars_lazy::prelude::*;
1654
/// fn add_columns(df: DataFrame) -> LazyFrame {
1655
/// df.lazy()
1656
/// .with_columns(
1657
/// vec![lit(10).alias("foo"), lit(100).alias("bar")]
1658
/// )
1659
/// }
1660
/// ```
1661
pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1662
let exprs = exprs.as_ref().to_vec();
1663
self.with_columns_impl(
1664
exprs,
1665
ProjectionOptions {
1666
run_parallel: true,
1667
duplicate_check: true,
1668
should_broadcast: true,
1669
},
1670
)
1671
}
1672
1673
/// Add or replace multiple columns to a DataFrame, but evaluate them sequentially.
1674
pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1675
let exprs = exprs.as_ref().to_vec();
1676
self.with_columns_impl(
1677
exprs,
1678
ProjectionOptions {
1679
run_parallel: false,
1680
duplicate_check: true,
1681
should_broadcast: true,
1682
},
1683
)
1684
}
1685
1686
/// Match or evolve to a certain schema.
1687
pub fn match_to_schema(
1688
self,
1689
schema: SchemaRef,
1690
per_column: Arc<[MatchToSchemaPerColumn]>,
1691
extra_columns: ExtraColumnsPolicy,
1692
) -> LazyFrame {
1693
let opt_state = self.get_opt_state();
1694
let lp = self
1695
.get_plan_builder()
1696
.match_to_schema(schema, per_column, extra_columns)
1697
.build();
1698
Self::from_logical_plan(lp, opt_state)
1699
}
1700
1701
pub fn pipe_with_schema(self, callback: PlanCallback<(DslPlan, Schema), DslPlan>) -> Self {
1702
let opt_state = self.get_opt_state();
1703
let lp = self.get_plan_builder().pipe_with_schema(callback).build();
1704
Self::from_logical_plan(lp, opt_state)
1705
}
1706
1707
fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1708
let opt_state = self.get_opt_state();
1709
let lp = self.get_plan_builder().with_columns(exprs, options).build();
1710
Self::from_logical_plan(lp, opt_state)
1711
}
1712
1713
pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1714
let contexts = contexts
1715
.as_ref()
1716
.iter()
1717
.map(|lf| lf.logical_plan.clone())
1718
.collect();
1719
let opt_state = self.get_opt_state();
1720
let lp = self.get_plan_builder().with_context(contexts).build();
1721
Self::from_logical_plan(lp, opt_state)
1722
}
1723
1724
/// Aggregate all the columns as their maximum values.
1725
///
1726
/// Aggregated columns will have the same names as the original columns.
1727
pub fn max(self) -> Self {
1728
self.map_private(DslFunction::Stats(StatsFunction::Max))
1729
}
1730
1731
/// Aggregate all the columns as their minimum values.
1732
///
1733
/// Aggregated columns will have the same names as the original columns.
1734
pub fn min(self) -> Self {
1735
self.map_private(DslFunction::Stats(StatsFunction::Min))
1736
}
1737
1738
/// Aggregate all the columns as their sum values.
1739
///
1740
/// Aggregated columns will have the same names as the original columns.
1741
///
1742
/// - Boolean columns will sum to a `u32` containing the number of `true`s.
1743
/// - For integer columns, the ordinary checks for overflow are performed:
1744
/// if running in `debug` mode, overflows will panic, whereas in `release` mode overflows will
1745
/// silently wrap.
1746
/// - String columns will sum to None.
1747
pub fn sum(self) -> Self {
1748
self.map_private(DslFunction::Stats(StatsFunction::Sum))
1749
}
1750
1751
/// Aggregate all the columns as their mean values.
1752
///
1753
/// - Boolean and integer columns are converted to `f64` before computing the mean.
1754
/// - String columns will have a mean of None.
1755
pub fn mean(self) -> Self {
1756
self.map_private(DslFunction::Stats(StatsFunction::Mean))
1757
}
1758
1759
/// Aggregate all the columns as their median values.
1760
///
1761
/// - Boolean and integer results are converted to `f64`. However, they are still
1762
/// susceptible to overflow before this conversion occurs.
1763
/// - String columns will sum to None.
1764
pub fn median(self) -> Self {
1765
self.map_private(DslFunction::Stats(StatsFunction::Median))
1766
}
1767
1768
/// Aggregate all the columns as their quantile values.
1769
pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1770
self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1771
quantile,
1772
method,
1773
}))
1774
}
1775
1776
/// Aggregate all the columns as their standard deviation values.
1777
///
1778
/// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1779
/// computing the variance, where `N` is the number of rows.
1780
/// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1781
/// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1782
/// > likelihood estimate of the variance for normally distributed variables. The
1783
/// > standard deviation computed in this function is the square root of the estimated
1784
/// > variance, so even with `ddof=1`, it will not be an unbiased estimate of the
1785
/// > standard deviation per se.
1786
///
1787
/// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.std.html#)
1788
pub fn std(self, ddof: u8) -> Self {
1789
self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1790
}
1791
1792
/// Aggregate all the columns as their variance values.
1793
///
1794
/// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1795
/// computing the variance, where `N` is the number of rows.
1796
/// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1797
/// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1798
/// > likelihood estimate of the variance for normally distributed variables.
1799
///
1800
/// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.var.html#)
1801
pub fn var(self, ddof: u8) -> Self {
1802
self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1803
}
1804
1805
/// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1806
pub fn explode(self, columns: Selector) -> LazyFrame {
1807
self.explode_impl(columns, false)
1808
}
1809
1810
/// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1811
fn explode_impl(self, columns: Selector, allow_empty: bool) -> LazyFrame {
1812
let opt_state = self.get_opt_state();
1813
let lp = self
1814
.get_plan_builder()
1815
.explode(columns, allow_empty)
1816
.build();
1817
Self::from_logical_plan(lp, opt_state)
1818
}
1819
1820
/// Aggregate all the columns as the sum of their null value count.
1821
pub fn null_count(self) -> LazyFrame {
1822
self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1823
}
1824
1825
/// Drop non-unique rows and maintain the order of kept rows.
1826
///
1827
/// `subset` is an optional `Vec` of column names to consider for uniqueness; if
1828
/// `None`, all columns are considered.
1829
pub fn unique_stable(
1830
self,
1831
subset: Option<Selector>,
1832
keep_strategy: UniqueKeepStrategy,
1833
) -> LazyFrame {
1834
self.unique_stable_generic(subset, keep_strategy)
1835
}
1836
1837
pub fn unique_stable_generic(
1838
self,
1839
subset: Option<Selector>,
1840
keep_strategy: UniqueKeepStrategy,
1841
) -> LazyFrame {
1842
let opt_state = self.get_opt_state();
1843
let options = DistinctOptionsDSL {
1844
subset,
1845
maintain_order: true,
1846
keep_strategy,
1847
};
1848
let lp = self.get_plan_builder().distinct(options).build();
1849
Self::from_logical_plan(lp, opt_state)
1850
}
1851
1852
/// Drop non-unique rows without maintaining the order of kept rows.
1853
///
1854
/// The order of the kept rows may change; to maintain the original row order, use
1855
/// [`unique_stable`](LazyFrame::unique_stable).
1856
///
1857
/// `subset` is an optional `Vec` of column names to consider for uniqueness; if None,
1858
/// all columns are considered.
1859
pub fn unique(self, subset: Option<Selector>, keep_strategy: UniqueKeepStrategy) -> LazyFrame {
1860
self.unique_generic(subset, keep_strategy)
1861
}
1862
1863
pub fn unique_generic(
1864
self,
1865
subset: Option<Selector>,
1866
keep_strategy: UniqueKeepStrategy,
1867
) -> LazyFrame {
1868
let opt_state = self.get_opt_state();
1869
let options = DistinctOptionsDSL {
1870
subset,
1871
maintain_order: false,
1872
keep_strategy,
1873
};
1874
let lp = self.get_plan_builder().distinct(options).build();
1875
Self::from_logical_plan(lp, opt_state)
1876
}
1877
1878
/// Drop rows containing one or more NaN values.
1879
///
1880
/// `subset` is an optional `Vec` of column names to consider for NaNs; if None, all
1881
/// floating point columns are considered.
1882
pub fn drop_nans(self, subset: Option<Selector>) -> LazyFrame {
1883
let opt_state = self.get_opt_state();
1884
let lp = self.get_plan_builder().drop_nans(subset).build();
1885
Self::from_logical_plan(lp, opt_state)
1886
}
1887
1888
/// Drop rows containing one or more None values.
1889
///
1890
/// `subset` is an optional `Vec` of column names to consider for nulls; if None, all
1891
/// columns are considered.
1892
pub fn drop_nulls(self, subset: Option<Selector>) -> LazyFrame {
1893
let opt_state = self.get_opt_state();
1894
let lp = self.get_plan_builder().drop_nulls(subset).build();
1895
Self::from_logical_plan(lp, opt_state)
1896
}
1897
1898
/// Slice the DataFrame using an offset (starting row) and a length.
1899
///
1900
/// If `offset` is negative, it is counted from the end of the DataFrame. For
1901
/// instance, `lf.slice(-5, 3)` gets three rows, starting at the row fifth from the
1902
/// end.
1903
///
1904
/// If `offset` and `len` are such that the slice extends beyond the end of the
1905
/// DataFrame, the portion between `offset` and the end will be returned. In this
1906
/// case, the number of rows in the returned DataFrame will be less than `len`.
1907
pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
1908
let opt_state = self.get_opt_state();
1909
let lp = self.get_plan_builder().slice(offset, len).build();
1910
Self::from_logical_plan(lp, opt_state)
1911
}
1912
1913
/// Get the first row.
1914
///
1915
/// Equivalent to `self.slice(0, 1)`.
1916
pub fn first(self) -> LazyFrame {
1917
self.slice(0, 1)
1918
}
1919
1920
/// Get the last row.
1921
///
1922
/// Equivalent to `self.slice(-1, 1)`.
1923
pub fn last(self) -> LazyFrame {
1924
self.slice(-1, 1)
1925
}
1926
1927
/// Get the last `n` rows.
1928
///
1929
/// Equivalent to `self.slice(-(n as i64), n)`.
1930
pub fn tail(self, n: IdxSize) -> LazyFrame {
1931
let neg_tail = -(n as i64);
1932
self.slice(neg_tail, n)
1933
}
1934
1935
/// Unpivot the DataFrame from wide to long format.
1936
///
1937
/// See [`UnpivotArgsIR`] for information on how to unpivot a DataFrame.
1938
#[cfg(feature = "pivot")]
1939
pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
1940
let opt_state = self.get_opt_state();
1941
let lp = self.get_plan_builder().unpivot(args).build();
1942
Self::from_logical_plan(lp, opt_state)
1943
}
1944
1945
/// Limit the DataFrame to the first `n` rows.
1946
pub fn limit(self, n: IdxSize) -> LazyFrame {
1947
self.slice(0, n)
1948
}
1949
1950
/// Apply a function/closure once the logical plan get executed.
1951
///
1952
/// The function has access to the whole materialized DataFrame at the time it is
1953
/// called.
1954
///
1955
/// To apply specific functions to specific columns, use [`Expr::map`] in conjunction
1956
/// with `LazyFrame::with_column` or `with_columns`.
1957
///
1958
/// ## Warning
1959
/// This can blow up in your face if the schema is changed due to the operation. The
1960
/// optimizer relies on a correct schema.
1961
///
1962
/// You can toggle certain optimizations off.
1963
pub fn map<F>(
1964
self,
1965
function: F,
1966
optimizations: AllowedOptimizations,
1967
schema: Option<Arc<dyn UdfSchema>>,
1968
name: Option<&'static str>,
1969
) -> LazyFrame
1970
where
1971
F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
1972
{
1973
let opt_state = self.get_opt_state();
1974
let lp = self
1975
.get_plan_builder()
1976
.map(
1977
function,
1978
optimizations,
1979
schema,
1980
PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
1981
)
1982
.build();
1983
Self::from_logical_plan(lp, opt_state)
1984
}
1985
1986
#[cfg(feature = "python")]
1987
pub fn map_python(
1988
self,
1989
function: polars_utils::python_function::PythonFunction,
1990
optimizations: AllowedOptimizations,
1991
schema: Option<SchemaRef>,
1992
validate_output: bool,
1993
) -> LazyFrame {
1994
let opt_state = self.get_opt_state();
1995
let lp = self
1996
.get_plan_builder()
1997
.map_python(function, optimizations, schema, validate_output)
1998
.build();
1999
Self::from_logical_plan(lp, opt_state)
2000
}
2001
2002
pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
2003
let opt_state = self.get_opt_state();
2004
let lp = self.get_plan_builder().map_private(function).build();
2005
Self::from_logical_plan(lp, opt_state)
2006
}
2007
2008
/// Add a new column at index 0 that counts the rows.
2009
///
2010
/// `name` is the name of the new column. `offset` is where to start counting from; if
2011
/// `None`, it is set to `0`.
2012
///
2013
/// # Warning
2014
/// This can have a negative effect on query performance. This may for instance block
2015
/// predicate pushdown optimization.
2016
pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
2017
where
2018
S: Into<PlSmallStr>,
2019
{
2020
let name = name.into();
2021
2022
match &self.logical_plan {
2023
v @ DslPlan::Scan { scan_type, .. }
2024
if !matches!(&**scan_type, FileScanDsl::Anonymous { .. }) =>
2025
{
2026
let DslPlan::Scan {
2027
sources,
2028
mut unified_scan_args,
2029
scan_type,
2030
cached_ir: _,
2031
} = v.clone()
2032
else {
2033
unreachable!()
2034
};
2035
2036
unified_scan_args.row_index = Some(RowIndex {
2037
name,
2038
offset: offset.unwrap_or(0),
2039
});
2040
2041
DslPlan::Scan {
2042
sources,
2043
unified_scan_args,
2044
scan_type,
2045
cached_ir: Default::default(),
2046
}
2047
.into()
2048
},
2049
_ => self.map_private(DslFunction::RowIndex { name, offset }),
2050
}
2051
}
2052
2053
/// Return the number of non-null elements for each column.
2054
pub fn count(self) -> LazyFrame {
2055
self.select(vec![col(PlSmallStr::from_static("*")).count()])
2056
}
2057
2058
/// Unnest the given `Struct` columns: the fields of the `Struct` type will be
2059
/// inserted as columns.
2060
#[cfg(feature = "dtype-struct")]
2061
pub fn unnest(self, cols: Selector) -> Self {
2062
self.map_private(DslFunction::Unnest(cols))
2063
}
2064
2065
#[cfg(feature = "merge_sorted")]
2066
pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
2067
where
2068
S: Into<PlSmallStr>,
2069
{
2070
let key = key.into();
2071
2072
let lp = DslPlan::MergeSorted {
2073
input_left: Arc::new(self.logical_plan),
2074
input_right: Arc::new(other.logical_plan),
2075
key,
2076
};
2077
Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
2078
}
2079
}
2080
2081
/// Utility struct for lazy group_by operation.
2082
#[derive(Clone)]
2083
pub struct LazyGroupBy {
2084
pub logical_plan: DslPlan,
2085
opt_state: OptFlags,
2086
keys: Vec<Expr>,
2087
maintain_order: bool,
2088
#[cfg(feature = "dynamic_group_by")]
2089
dynamic_options: Option<DynamicGroupOptions>,
2090
#[cfg(feature = "dynamic_group_by")]
2091
rolling_options: Option<RollingGroupOptions>,
2092
}
2093
2094
impl From<LazyGroupBy> for LazyFrame {
2095
fn from(lgb: LazyGroupBy) -> Self {
2096
Self {
2097
logical_plan: lgb.logical_plan,
2098
opt_state: lgb.opt_state,
2099
cached_arena: Default::default(),
2100
}
2101
}
2102
}
2103
2104
impl LazyGroupBy {
2105
/// Group by and aggregate.
2106
///
2107
/// Select a column with [col] and choose an aggregation.
2108
/// If you want to aggregate all columns use `col(PlSmallStr::from_static("*"))`.
2109
///
2110
/// # Example
2111
///
2112
/// ```rust
2113
/// use polars_core::prelude::*;
2114
/// use polars_lazy::prelude::*;
2115
///
2116
/// fn example(df: DataFrame) -> LazyFrame {
2117
/// df.lazy()
2118
/// .group_by_stable([col("date")])
2119
/// .agg([
2120
/// col("rain").min().alias("min_rain"),
2121
/// col("rain").sum().alias("sum_rain"),
2122
/// col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
2123
/// ])
2124
/// }
2125
/// ```
2126
pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2127
#[cfg(feature = "dynamic_group_by")]
2128
let lp = DslBuilder::from(self.logical_plan)
2129
.group_by(
2130
self.keys,
2131
aggs,
2132
None,
2133
self.maintain_order,
2134
self.dynamic_options,
2135
self.rolling_options,
2136
)
2137
.build();
2138
2139
#[cfg(not(feature = "dynamic_group_by"))]
2140
let lp = DslBuilder::from(self.logical_plan)
2141
.group_by(self.keys, aggs, None, self.maintain_order)
2142
.build();
2143
LazyFrame::from_logical_plan(lp, self.opt_state)
2144
}
2145
2146
/// Return first n rows of each group
2147
pub fn head(self, n: Option<usize>) -> LazyFrame {
2148
let keys = self
2149
.keys
2150
.iter()
2151
.filter_map(|expr| expr_output_name(expr).ok())
2152
.collect::<Vec<_>>();
2153
2154
self.agg([all().as_expr().head(n)])
2155
.explode_impl(all() - by_name(keys.iter().cloned(), false), true)
2156
}
2157
2158
/// Return last n rows of each group
2159
pub fn tail(self, n: Option<usize>) -> LazyFrame {
2160
let keys = self
2161
.keys
2162
.iter()
2163
.filter_map(|expr| expr_output_name(expr).ok())
2164
.collect::<Vec<_>>();
2165
2166
self.agg([all().as_expr().tail(n)])
2167
.explode_impl(all() - by_name(keys.iter().cloned(), false), true)
2168
}
2169
2170
/// Apply a function over the groups as a new DataFrame.
2171
///
2172
/// **It is not recommended that you use this as materializing the DataFrame is very
2173
/// expensive.**
2174
pub fn apply(self, f: PlanCallback<DataFrame, DataFrame>, schema: SchemaRef) -> LazyFrame {
2175
#[cfg(feature = "dynamic_group_by")]
2176
let options = GroupbyOptions {
2177
dynamic: self.dynamic_options,
2178
rolling: self.rolling_options,
2179
slice: None,
2180
};
2181
2182
#[cfg(not(feature = "dynamic_group_by"))]
2183
let options = GroupbyOptions { slice: None };
2184
2185
let lp = DslPlan::GroupBy {
2186
input: Arc::new(self.logical_plan),
2187
keys: self.keys,
2188
aggs: vec![],
2189
apply: Some((f, schema)),
2190
maintain_order: self.maintain_order,
2191
options: Arc::new(options),
2192
};
2193
LazyFrame::from_logical_plan(lp, self.opt_state)
2194
}
2195
}
2196
2197
#[must_use]
2198
pub struct JoinBuilder {
2199
lf: LazyFrame,
2200
how: JoinType,
2201
other: Option<LazyFrame>,
2202
left_on: Vec<Expr>,
2203
right_on: Vec<Expr>,
2204
allow_parallel: bool,
2205
force_parallel: bool,
2206
suffix: Option<PlSmallStr>,
2207
validation: JoinValidation,
2208
nulls_equal: bool,
2209
coalesce: JoinCoalesce,
2210
maintain_order: MaintainOrderJoin,
2211
}
2212
impl JoinBuilder {
2213
/// Create the `JoinBuilder` with the provided `LazyFrame` as the left table.
2214
pub fn new(lf: LazyFrame) -> Self {
2215
Self {
2216
lf,
2217
other: None,
2218
how: JoinType::Inner,
2219
left_on: vec![],
2220
right_on: vec![],
2221
allow_parallel: true,
2222
force_parallel: false,
2223
suffix: None,
2224
validation: Default::default(),
2225
nulls_equal: false,
2226
coalesce: Default::default(),
2227
maintain_order: Default::default(),
2228
}
2229
}
2230
2231
/// The right table in the join.
2232
pub fn with(mut self, other: LazyFrame) -> Self {
2233
self.other = Some(other);
2234
self
2235
}
2236
2237
/// Select the join type.
2238
pub fn how(mut self, how: JoinType) -> Self {
2239
self.how = how;
2240
self
2241
}
2242
2243
pub fn validate(mut self, validation: JoinValidation) -> Self {
2244
self.validation = validation;
2245
self
2246
}
2247
2248
/// The expressions you want to join both tables on.
2249
///
2250
/// The passed expressions must be valid in both `LazyFrame`s in the join.
2251
pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2252
let on = on.as_ref().to_vec();
2253
self.left_on.clone_from(&on);
2254
self.right_on = on;
2255
self
2256
}
2257
2258
/// The expressions you want to join the left table on.
2259
///
2260
/// The passed expressions must be valid in the left table.
2261
pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2262
self.left_on = on.as_ref().to_vec();
2263
self
2264
}
2265
2266
/// The expressions you want to join the right table on.
2267
///
2268
/// The passed expressions must be valid in the right table.
2269
pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2270
self.right_on = on.as_ref().to_vec();
2271
self
2272
}
2273
2274
/// Allow parallel table evaluation.
2275
pub fn allow_parallel(mut self, allow: bool) -> Self {
2276
self.allow_parallel = allow;
2277
self
2278
}
2279
2280
/// Force parallel table evaluation.
2281
pub fn force_parallel(mut self, force: bool) -> Self {
2282
self.force_parallel = force;
2283
self
2284
}
2285
2286
/// Join on null values. By default null values will never produce matches.
2287
pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2288
self.nulls_equal = nulls_equal;
2289
self
2290
}
2291
2292
/// Suffix to add duplicate column names in join.
2293
/// Defaults to `"_right"` if this method is never called.
2294
pub fn suffix<S>(mut self, suffix: S) -> Self
2295
where
2296
S: Into<PlSmallStr>,
2297
{
2298
self.suffix = Some(suffix.into());
2299
self
2300
}
2301
2302
/// Whether to coalesce join columns.
2303
pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2304
self.coalesce = coalesce;
2305
self
2306
}
2307
2308
/// Whether to preserve the row order.
2309
pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2310
self.maintain_order = maintain_order;
2311
self
2312
}
2313
2314
/// Finish builder
2315
pub fn finish(self) -> LazyFrame {
2316
let opt_state = self.lf.opt_state;
2317
let other = self.other.expect("'with' not set in join builder");
2318
2319
let args = JoinArgs {
2320
how: self.how,
2321
validation: self.validation,
2322
suffix: self.suffix,
2323
slice: None,
2324
nulls_equal: self.nulls_equal,
2325
coalesce: self.coalesce,
2326
maintain_order: self.maintain_order,
2327
};
2328
2329
let lp = self
2330
.lf
2331
.get_plan_builder()
2332
.join(
2333
other.logical_plan,
2334
self.left_on,
2335
self.right_on,
2336
JoinOptions {
2337
allow_parallel: self.allow_parallel,
2338
force_parallel: self.force_parallel,
2339
args,
2340
}
2341
.into(),
2342
)
2343
.build();
2344
LazyFrame::from_logical_plan(lp, opt_state)
2345
}
2346
2347
// Finish with join predicates
2348
pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2349
let opt_state = self.lf.opt_state;
2350
let other = self.other.expect("with not set");
2351
2352
// Decompose `And` conjunctions into their component expressions
2353
fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2354
if let Expr::BinaryExpr {
2355
op: Operator::And,
2356
left,
2357
right,
2358
} = predicate
2359
{
2360
decompose_and((*left).clone(), expanded_predicates);
2361
decompose_and((*right).clone(), expanded_predicates);
2362
} else {
2363
expanded_predicates.push(predicate);
2364
}
2365
}
2366
let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2367
for predicate in predicates {
2368
decompose_and(predicate, &mut expanded_predicates);
2369
}
2370
let predicates: Vec<Expr> = expanded_predicates;
2371
2372
// Decompose `is_between` predicates to allow for cleaner expression of range joins
2373
#[cfg(feature = "is_between")]
2374
let predicates: Vec<Expr> = {
2375
let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2376
for predicate in predicates {
2377
if let Expr::Function {
2378
function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2379
input,
2380
..
2381
} = &predicate
2382
{
2383
if let [expr, lower, upper] = input.as_slice() {
2384
match closed {
2385
ClosedInterval::Both => {
2386
expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2387
expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2388
},
2389
ClosedInterval::Right => {
2390
expanded_predicates.push(expr.clone().gt(lower.clone()));
2391
expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2392
},
2393
ClosedInterval::Left => {
2394
expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2395
expanded_predicates.push(expr.clone().lt(upper.clone()));
2396
},
2397
ClosedInterval::None => {
2398
expanded_predicates.push(expr.clone().gt(lower.clone()));
2399
expanded_predicates.push(expr.clone().lt(upper.clone()));
2400
},
2401
}
2402
continue;
2403
}
2404
}
2405
expanded_predicates.push(predicate);
2406
}
2407
expanded_predicates
2408
};
2409
2410
let args = JoinArgs {
2411
how: self.how,
2412
validation: self.validation,
2413
suffix: self.suffix,
2414
slice: None,
2415
nulls_equal: self.nulls_equal,
2416
coalesce: self.coalesce,
2417
maintain_order: self.maintain_order,
2418
};
2419
let options = JoinOptions {
2420
allow_parallel: self.allow_parallel,
2421
force_parallel: self.force_parallel,
2422
args,
2423
};
2424
2425
let lp = DslPlan::Join {
2426
input_left: Arc::new(self.lf.logical_plan),
2427
input_right: Arc::new(other.logical_plan),
2428
left_on: Default::default(),
2429
right_on: Default::default(),
2430
predicates,
2431
options: Arc::from(options),
2432
};
2433
2434
LazyFrame::from_logical_plan(lp, opt_state)
2435
}
2436
}
2437
2438
pub const BUILD_STREAMING_EXECUTOR: Option<polars_mem_engine::StreamingExecutorBuilder> = {
2439
#[cfg(not(feature = "new_streaming"))]
2440
{
2441
None
2442
}
2443
#[cfg(feature = "new_streaming")]
2444
{
2445
Some(streaming_dispatch::build_streaming_query_executor)
2446
}
2447
};
2448
#[cfg(feature = "new_streaming")]
2449
pub use streaming_dispatch::build_streaming_query_executor;
2450
2451
#[cfg(feature = "new_streaming")]
2452
mod streaming_dispatch {
2453
use std::sync::{Arc, Mutex};
2454
2455
use polars_core::POOL;
2456
use polars_core::error::PolarsResult;
2457
use polars_core::frame::DataFrame;
2458
use polars_expr::state::ExecutionState;
2459
use polars_mem_engine::Executor;
2460
use polars_plan::dsl::SinkTypeIR;
2461
use polars_plan::plans::{AExpr, IR};
2462
use polars_utils::arena::{Arena, Node};
2463
2464
pub fn build_streaming_query_executor(
2465
node: Node,
2466
ir_arena: &mut Arena<IR>,
2467
expr_arena: &mut Arena<AExpr>,
2468
) -> PolarsResult<Box<dyn Executor>> {
2469
let rechunk = match ir_arena.get(node) {
2470
IR::Scan {
2471
unified_scan_args, ..
2472
} => unified_scan_args.rechunk,
2473
_ => false,
2474
};
2475
2476
let node = match ir_arena.get(node) {
2477
IR::SinkMultiple { .. } => panic!("SinkMultiple not supported"),
2478
IR::Sink { .. } => node,
2479
_ => ir_arena.add(IR::Sink {
2480
input: node,
2481
payload: SinkTypeIR::Memory,
2482
}),
2483
};
2484
2485
polars_stream::StreamingQuery::build(node, ir_arena, expr_arena)
2486
.map(Some)
2487
.map(Mutex::new)
2488
.map(Arc::new)
2489
.map(|x| StreamingQueryExecutor {
2490
executor: x,
2491
rechunk,
2492
})
2493
.map(|x| Box::new(x) as Box<dyn Executor>)
2494
}
2495
2496
// Note: Arc/Mutex is because Executor requires Sync, but SlotMap is not Sync.
2497
struct StreamingQueryExecutor {
2498
executor: Arc<Mutex<Option<polars_stream::StreamingQuery>>>,
2499
rechunk: bool,
2500
}
2501
2502
impl Executor for StreamingQueryExecutor {
2503
fn execute(&mut self, _cache: &mut ExecutionState) -> PolarsResult<DataFrame> {
2504
// Must not block rayon thread on pending new-streaming future.
2505
assert!(POOL.current_thread_index().is_none());
2506
2507
let mut df = { self.executor.try_lock().unwrap().take() }
2508
.expect("unhandled: execute() more than once")
2509
.execute()
2510
.map(|x| x.unwrap_single())?;
2511
2512
if self.rechunk {
2513
df.as_single_chunk_par();
2514
}
2515
2516
Ok(df)
2517
}
2518
}
2519
}
2520
2521