Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-lazy/src/frame/mod.rs
8479 views
1
//! Lazy variant of a [DataFrame].
2
#[cfg(feature = "python")]
3
mod python;
4
5
mod cached_arenas;
6
mod err;
7
#[cfg(not(target_arch = "wasm32"))]
8
mod exitable;
9
10
use std::num::NonZeroUsize;
11
use std::sync::mpsc::{Receiver, sync_channel};
12
use std::sync::{Arc, Mutex};
13
14
pub use anonymous_scan::*;
15
#[cfg(feature = "csv")]
16
pub use csv::*;
17
#[cfg(not(target_arch = "wasm32"))]
18
pub use exitable::*;
19
pub use file_list_reader::*;
20
#[cfg(feature = "json")]
21
pub use ndjson::*;
22
#[cfg(feature = "parquet")]
23
pub use parquet::*;
24
use polars_compute::rolling::QuantileMethod;
25
use polars_core::POOL;
26
use polars_core::error::feature_gated;
27
use polars_core::prelude::*;
28
use polars_io::RowIndex;
29
use polars_mem_engine::scan_predicate::functions::apply_scan_predicate_to_scan_ir;
30
use polars_mem_engine::{Executor, create_multiple_physical_plans, create_physical_plan};
31
use polars_ops::frame::{JoinBuildSide, JoinCoalesce, MaintainOrderJoin};
32
#[cfg(feature = "is_between")]
33
use polars_ops::prelude::ClosedInterval;
34
pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
35
use polars_utils::pl_str::PlSmallStr;
36
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
37
38
use crate::frame::cached_arenas::CachedArena;
39
use crate::prelude::*;
40
41
pub trait IntoLazy {
42
fn lazy(self) -> LazyFrame;
43
}
44
45
impl IntoLazy for DataFrame {
46
/// Convert the `DataFrame` into a `LazyFrame`
47
fn lazy(self) -> LazyFrame {
48
let lp = DslBuilder::from_existing_df(self).build();
49
LazyFrame {
50
logical_plan: lp,
51
opt_state: Default::default(),
52
cached_arena: Default::default(),
53
}
54
}
55
}
56
57
impl IntoLazy for LazyFrame {
58
fn lazy(self) -> LazyFrame {
59
self
60
}
61
}
62
63
/// Lazy abstraction over an eager `DataFrame`.
64
///
65
/// It really is an abstraction over a logical plan. The methods of this struct will incrementally
66
/// modify a logical plan until output is requested (via [`collect`](crate::frame::LazyFrame::collect)).
67
#[derive(Clone, Default)]
68
#[must_use]
69
pub struct LazyFrame {
70
pub logical_plan: DslPlan,
71
pub(crate) opt_state: OptFlags,
72
pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
73
}
74
75
impl From<DslPlan> for LazyFrame {
76
fn from(plan: DslPlan) -> Self {
77
Self {
78
logical_plan: plan,
79
opt_state: OptFlags::default(),
80
cached_arena: Default::default(),
81
}
82
}
83
}
84
85
impl LazyFrame {
86
pub(crate) fn from_inner(
87
logical_plan: DslPlan,
88
opt_state: OptFlags,
89
cached_arena: Arc<Mutex<Option<CachedArena>>>,
90
) -> Self {
91
Self {
92
logical_plan,
93
opt_state,
94
cached_arena,
95
}
96
}
97
98
pub(crate) fn get_plan_builder(self) -> DslBuilder {
99
DslBuilder::from(self.logical_plan)
100
}
101
102
fn get_opt_state(&self) -> OptFlags {
103
self.opt_state
104
}
105
106
pub fn from_logical_plan(logical_plan: DslPlan, opt_state: OptFlags) -> Self {
107
LazyFrame {
108
logical_plan,
109
opt_state,
110
cached_arena: Default::default(),
111
}
112
}
113
114
/// Get current optimizations.
115
pub fn get_current_optimizations(&self) -> OptFlags {
116
self.opt_state
117
}
118
119
/// Set allowed optimizations.
120
pub fn with_optimizations(mut self, opt_state: OptFlags) -> Self {
121
self.opt_state = opt_state;
122
self
123
}
124
125
/// Turn off all optimizations.
126
pub fn without_optimizations(self) -> Self {
127
self.with_optimizations(OptFlags::from_bits_truncate(0) | OptFlags::TYPE_COERCION)
128
}
129
130
/// Toggle projection pushdown optimization.
131
pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
132
self.opt_state.set(OptFlags::PROJECTION_PUSHDOWN, toggle);
133
self
134
}
135
136
/// Toggle cluster with columns optimization.
137
pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
138
self.opt_state.set(OptFlags::CLUSTER_WITH_COLUMNS, toggle);
139
self
140
}
141
142
/// Check if operations are order dependent and unset maintaining_order if
143
/// the order would not be observed.
144
pub fn with_check_order(mut self, toggle: bool) -> Self {
145
self.opt_state.set(OptFlags::CHECK_ORDER_OBSERVE, toggle);
146
self
147
}
148
149
/// Toggle predicate pushdown optimization.
150
pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
151
self.opt_state.set(OptFlags::PREDICATE_PUSHDOWN, toggle);
152
self
153
}
154
155
/// Toggle type coercion optimization.
156
pub fn with_type_coercion(mut self, toggle: bool) -> Self {
157
self.opt_state.set(OptFlags::TYPE_COERCION, toggle);
158
self
159
}
160
161
/// Toggle type check optimization.
162
pub fn with_type_check(mut self, toggle: bool) -> Self {
163
self.opt_state.set(OptFlags::TYPE_CHECK, toggle);
164
self
165
}
166
167
/// Toggle expression simplification optimization on or off.
168
pub fn with_simplify_expr(mut self, toggle: bool) -> Self {
169
self.opt_state.set(OptFlags::SIMPLIFY_EXPR, toggle);
170
self
171
}
172
173
/// Toggle common subplan elimination optimization on or off
174
#[cfg(feature = "cse")]
175
pub fn with_comm_subplan_elim(mut self, toggle: bool) -> Self {
176
self.opt_state.set(OptFlags::COMM_SUBPLAN_ELIM, toggle);
177
self
178
}
179
180
/// Toggle common subexpression elimination optimization on or off
181
#[cfg(feature = "cse")]
182
pub fn with_comm_subexpr_elim(mut self, toggle: bool) -> Self {
183
self.opt_state.set(OptFlags::COMM_SUBEXPR_ELIM, toggle);
184
self
185
}
186
187
/// Toggle slice pushdown optimization.
188
pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
189
self.opt_state.set(OptFlags::SLICE_PUSHDOWN, toggle);
190
self
191
}
192
193
#[cfg(feature = "new_streaming")]
194
pub fn with_new_streaming(mut self, toggle: bool) -> Self {
195
self.opt_state.set(OptFlags::NEW_STREAMING, toggle);
196
self
197
}
198
199
/// Try to estimate the number of rows so that joins can determine which side to keep in memory.
200
pub fn with_row_estimate(mut self, toggle: bool) -> Self {
201
self.opt_state.set(OptFlags::ROW_ESTIMATE, toggle);
202
self
203
}
204
205
/// Run every node eagerly. This turns off multi-node optimizations.
206
pub fn _with_eager(mut self, toggle: bool) -> Self {
207
self.opt_state.set(OptFlags::EAGER, toggle);
208
self
209
}
210
211
/// Return a String describing the naive (un-optimized) logical plan.
212
pub fn describe_plan(&self) -> PolarsResult<String> {
213
Ok(self.clone().to_alp()?.describe())
214
}
215
216
/// Return a String describing the naive (un-optimized) logical plan in tree format.
217
pub fn describe_plan_tree(&self) -> PolarsResult<String> {
218
Ok(self.clone().to_alp()?.describe_tree_format())
219
}
220
221
/// Return a String describing the optimized logical plan.
222
///
223
/// Returns `Err` if optimizing the logical plan fails.
224
pub fn describe_optimized_plan(&self) -> PolarsResult<String> {
225
Ok(self.clone().to_alp_optimized()?.describe())
226
}
227
228
/// Return a String describing the optimized logical plan in tree format.
229
///
230
/// Returns `Err` if optimizing the logical plan fails.
231
pub fn describe_optimized_plan_tree(&self) -> PolarsResult<String> {
232
Ok(self.clone().to_alp_optimized()?.describe_tree_format())
233
}
234
235
/// Return a String describing the logical plan.
236
///
237
/// If `optimized` is `true`, explains the optimized plan. If `optimized` is `false`,
238
/// explains the naive, un-optimized plan.
239
pub fn explain(&self, optimized: bool) -> PolarsResult<String> {
240
if optimized {
241
self.describe_optimized_plan()
242
} else {
243
self.describe_plan()
244
}
245
}
246
247
/// Add a sort operation to the logical plan.
248
///
249
/// Sorts the LazyFrame by the column name specified using the provided options.
250
///
251
/// # Example
252
///
253
/// Sort DataFrame by 'sepal_width' column:
254
/// ```rust
255
/// # use polars_core::prelude::*;
256
/// # use polars_lazy::prelude::*;
257
/// fn sort_by_a(df: DataFrame) -> LazyFrame {
258
/// df.lazy().sort(["sepal_width"], Default::default())
259
/// }
260
/// ```
261
/// Sort by a single column with specific order:
262
/// ```
263
/// # use polars_core::prelude::*;
264
/// # use polars_lazy::prelude::*;
265
/// fn sort_with_specific_order(df: DataFrame, descending: bool) -> LazyFrame {
266
/// df.lazy().sort(
267
/// ["sepal_width"],
268
/// SortMultipleOptions::new()
269
/// .with_order_descending(descending)
270
/// )
271
/// }
272
/// ```
273
/// Sort by multiple columns with specifying order for each column:
274
/// ```
275
/// # use polars_core::prelude::*;
276
/// # use polars_lazy::prelude::*;
277
/// fn sort_by_multiple_columns_with_specific_order(df: DataFrame) -> LazyFrame {
278
/// df.lazy().sort(
279
/// ["sepal_width", "sepal_length"],
280
/// SortMultipleOptions::new()
281
/// .with_order_descending_multi([false, true])
282
/// )
283
/// }
284
/// ```
285
/// See [`SortMultipleOptions`] for more options.
286
pub fn sort(self, by: impl IntoVec<PlSmallStr>, sort_options: SortMultipleOptions) -> Self {
287
let opt_state = self.get_opt_state();
288
let lp = self
289
.get_plan_builder()
290
.sort(by.into_vec().into_iter().map(col).collect(), sort_options)
291
.build();
292
Self::from_logical_plan(lp, opt_state)
293
}
294
295
/// Add a sort operation to the logical plan.
296
///
297
/// Sorts the LazyFrame by the provided list of expressions, which will be turned into
298
/// concrete columns before sorting.
299
///
300
/// See [`SortMultipleOptions`] for more options.
301
///
302
/// # Example
303
///
304
/// ```rust
305
/// use polars_core::prelude::*;
306
/// use polars_lazy::prelude::*;
307
///
308
/// /// Sort DataFrame by 'sepal_width' column
309
/// fn example(df: DataFrame) -> LazyFrame {
310
/// df.lazy()
311
/// .sort_by_exprs(vec![col("sepal_width")], Default::default())
312
/// }
313
/// ```
314
pub fn sort_by_exprs<E: AsRef<[Expr]>>(
315
self,
316
by_exprs: E,
317
sort_options: SortMultipleOptions,
318
) -> Self {
319
let by_exprs = by_exprs.as_ref().to_vec();
320
if by_exprs.is_empty() {
321
self
322
} else {
323
let opt_state = self.get_opt_state();
324
let lp = self.get_plan_builder().sort(by_exprs, sort_options).build();
325
Self::from_logical_plan(lp, opt_state)
326
}
327
}
328
329
pub fn top_k<E: AsRef<[Expr]>>(
330
self,
331
k: IdxSize,
332
by_exprs: E,
333
sort_options: SortMultipleOptions,
334
) -> Self {
335
// this will optimize to top-k
336
self.sort_by_exprs(
337
by_exprs,
338
sort_options.with_order_reversed().with_nulls_last(true),
339
)
340
.slice(0, k)
341
}
342
343
pub fn bottom_k<E: AsRef<[Expr]>>(
344
self,
345
k: IdxSize,
346
by_exprs: E,
347
sort_options: SortMultipleOptions,
348
) -> Self {
349
// this will optimize to bottom-k
350
self.sort_by_exprs(by_exprs, sort_options.with_nulls_last(true))
351
.slice(0, k)
352
}
353
354
/// Reverse the `DataFrame` from top to bottom.
355
///
356
/// Row `i` becomes row `number_of_rows - i - 1`.
357
///
358
/// # Example
359
///
360
/// ```rust
361
/// use polars_core::prelude::*;
362
/// use polars_lazy::prelude::*;
363
///
364
/// fn example(df: DataFrame) -> LazyFrame {
365
/// df.lazy()
366
/// .reverse()
367
/// }
368
/// ```
369
pub fn reverse(self) -> Self {
370
self.select(vec![col(PlSmallStr::from_static("*")).reverse()])
371
}
372
373
/// Rename columns in the DataFrame.
374
///
375
/// `existing` and `new` are iterables of the same length containing the old and
376
/// corresponding new column names. Renaming happens to all `existing` columns
377
/// simultaneously, not iteratively. If `strict` is true, all columns in `existing`
378
/// must be present in the `LazyFrame` when `rename` is called; otherwise, only
379
/// those columns that are actually found will be renamed (others will be ignored).
380
pub fn rename<I, J, T, S>(self, existing: I, new: J, strict: bool) -> Self
381
where
382
I: IntoIterator<Item = T>,
383
J: IntoIterator<Item = S>,
384
T: AsRef<str>,
385
S: AsRef<str>,
386
{
387
let iter = existing.into_iter();
388
let cap = iter.size_hint().0;
389
let mut existing_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
390
let mut new_vec: Vec<PlSmallStr> = Vec::with_capacity(cap);
391
392
// TODO! should this error if `existing` and `new` have different lengths?
393
// Currently, the longer of the two is truncated.
394
for (existing, new) in iter.zip(new) {
395
let existing = existing.as_ref();
396
let new = new.as_ref();
397
if new != existing {
398
existing_vec.push(existing.into());
399
new_vec.push(new.into());
400
}
401
}
402
403
self.map_private(DslFunction::Rename {
404
existing: existing_vec.into(),
405
new: new_vec.into(),
406
strict,
407
})
408
}
409
410
/// Removes columns from the DataFrame.
411
/// Note that it's better to only select the columns you need
412
/// and let the projection pushdown optimize away the unneeded columns.
413
///
414
/// Any given columns that are not in the schema will give a [`PolarsError::ColumnNotFound`]
415
/// error while materializing the [`LazyFrame`].
416
pub fn drop(self, columns: Selector) -> Self {
417
let opt_state = self.get_opt_state();
418
let lp = self.get_plan_builder().drop(columns).build();
419
Self::from_logical_plan(lp, opt_state)
420
}
421
422
/// Shift the values by a given period and fill the parts that will be empty due to this operation
423
/// with `Nones`.
424
///
425
/// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
426
pub fn shift<E: Into<Expr>>(self, n: E) -> Self {
427
self.select(vec![col(PlSmallStr::from_static("*")).shift(n.into())])
428
}
429
430
/// Shift the values by a given period and fill the parts that will be empty due to this operation
431
/// with the result of the `fill_value` expression.
432
///
433
/// See the method on [Series](polars_core::series::SeriesTrait::shift) for more info on the `shift` operation.
434
pub fn shift_and_fill<E: Into<Expr>, IE: Into<Expr>>(self, n: E, fill_value: IE) -> Self {
435
self.select(vec![
436
col(PlSmallStr::from_static("*")).shift_and_fill(n.into(), fill_value.into()),
437
])
438
}
439
440
/// Fill None values in the DataFrame with an expression.
441
pub fn fill_null<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
442
let opt_state = self.get_opt_state();
443
let lp = self.get_plan_builder().fill_null(fill_value.into()).build();
444
Self::from_logical_plan(lp, opt_state)
445
}
446
447
/// Fill NaN values in the DataFrame with an expression.
448
pub fn fill_nan<E: Into<Expr>>(self, fill_value: E) -> LazyFrame {
449
let opt_state = self.get_opt_state();
450
let lp = self.get_plan_builder().fill_nan(fill_value.into()).build();
451
Self::from_logical_plan(lp, opt_state)
452
}
453
454
/// Caches the result into a new LazyFrame.
455
///
456
/// This should be used to prevent computations running multiple times.
457
pub fn cache(self) -> Self {
458
let opt_state = self.get_opt_state();
459
let lp = self.get_plan_builder().cache().build();
460
Self::from_logical_plan(lp, opt_state)
461
}
462
463
/// Cast named frame columns, resulting in a new LazyFrame with updated dtypes
464
pub fn cast(self, dtypes: PlHashMap<&str, DataType>, strict: bool) -> Self {
465
let cast_cols: Vec<Expr> = dtypes
466
.into_iter()
467
.map(|(name, dt)| {
468
let name = PlSmallStr::from_str(name);
469
470
if strict {
471
col(name).strict_cast(dt)
472
} else {
473
col(name).cast(dt)
474
}
475
})
476
.collect();
477
478
if cast_cols.is_empty() {
479
self
480
} else {
481
self.with_columns(cast_cols)
482
}
483
}
484
485
/// Cast all frame columns to the given dtype, resulting in a new LazyFrame
486
pub fn cast_all(self, dtype: impl Into<DataTypeExpr>, strict: bool) -> Self {
487
self.with_columns(vec![if strict {
488
col(PlSmallStr::from_static("*")).strict_cast(dtype)
489
} else {
490
col(PlSmallStr::from_static("*")).cast(dtype)
491
}])
492
}
493
494
pub fn optimize(
495
self,
496
lp_arena: &mut Arena<IR>,
497
expr_arena: &mut Arena<AExpr>,
498
) -> PolarsResult<Node> {
499
self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![])
500
}
501
502
pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
503
let (mut lp_arena, mut expr_arena) = self.get_arenas();
504
let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![])?;
505
506
Ok(IRPlan::new(node, lp_arena, expr_arena))
507
}
508
509
pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
510
let (mut lp_arena, mut expr_arena) = self.get_arenas();
511
let node = to_alp(
512
self.logical_plan,
513
&mut expr_arena,
514
&mut lp_arena,
515
&mut self.opt_state,
516
)?;
517
let plan = IRPlan::new(node, lp_arena, expr_arena);
518
Ok(plan)
519
}
520
521
pub(crate) fn optimize_with_scratch(
522
self,
523
lp_arena: &mut Arena<IR>,
524
expr_arena: &mut Arena<AExpr>,
525
scratch: &mut Vec<Node>,
526
) -> PolarsResult<Node> {
527
let lp_top = optimize(
528
self.logical_plan,
529
self.opt_state,
530
lp_arena,
531
expr_arena,
532
scratch,
533
apply_scan_predicate_to_scan_ir,
534
)?;
535
536
Ok(lp_top)
537
}
538
539
fn prepare_collect_post_opt<P>(
540
mut self,
541
check_sink: bool,
542
query_start: Option<std::time::Instant>,
543
post_opt: P,
544
) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
545
where
546
P: FnOnce(
547
Node,
548
&mut Arena<IR>,
549
&mut Arena<AExpr>,
550
Option<std::time::Duration>,
551
) -> PolarsResult<()>,
552
{
553
let (mut lp_arena, mut expr_arena) = self.get_arenas();
554
555
let mut scratch = vec![];
556
let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch)?;
557
558
post_opt(
559
lp_top,
560
&mut lp_arena,
561
&mut expr_arena,
562
// Post optimization callback gets the time since the
563
// query was started as its "base" timepoint.
564
query_start.map(|s| s.elapsed()),
565
)?;
566
567
// sink should be replaced
568
let no_file_sink = if check_sink {
569
!matches!(
570
lp_arena.get(lp_top),
571
IR::Sink {
572
payload: SinkTypeIR::File { .. },
573
..
574
}
575
)
576
} else {
577
true
578
};
579
let physical_plan = create_physical_plan(
580
lp_top,
581
&mut lp_arena,
582
&mut expr_arena,
583
BUILD_STREAMING_EXECUTOR,
584
)?;
585
586
let state = ExecutionState::new();
587
Ok((state, physical_plan, no_file_sink))
588
}
589
590
// post_opt: A function that is called after optimization. This can be used to modify the IR jit.
591
pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
592
where
593
P: FnOnce(
594
Node,
595
&mut Arena<IR>,
596
&mut Arena<AExpr>,
597
Option<std::time::Duration>,
598
) -> PolarsResult<()>,
599
{
600
let (mut state, mut physical_plan, _) =
601
self.prepare_collect_post_opt(false, None, post_opt)?;
602
physical_plan.execute(&mut state)
603
}
604
605
#[allow(unused_mut)]
606
fn prepare_collect(
607
self,
608
check_sink: bool,
609
query_start: Option<std::time::Instant>,
610
) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
611
self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
612
}
613
614
/// Execute all the lazy operations and collect them into a [`DataFrame`] using a specified
615
/// `engine`.
616
///
617
/// The query is optimized prior to execution.
618
pub fn collect_with_engine(mut self, mut engine: Engine) -> PolarsResult<DataFrame> {
619
let payload = match &self.logical_plan {
620
DslPlan::Sink { payload, .. } => payload.clone(),
621
DslPlan::SinkMultiple { .. } => {
622
polars_ensure!(matches!(engine, Engine::Auto | Engine::Streaming), InvalidOperation: "lazy multisinks only supported on streaming engine");
623
feature_gated!("new_streaming", {
624
let sink_multiple = self.with_new_streaming(true);
625
let mut alp_plan = sink_multiple.to_alp_optimized()?;
626
let result = polars_stream::run_query(
627
alp_plan.lp_top,
628
&mut alp_plan.lp_arena,
629
&mut alp_plan.expr_arena,
630
);
631
return result.map(|_| DataFrame::empty());
632
})
633
},
634
_ => {
635
self.logical_plan = DslPlan::Sink {
636
input: Arc::new(self.logical_plan),
637
payload: SinkType::Memory,
638
};
639
SinkType::Memory
640
},
641
};
642
643
// Default engine for collect is InMemory, sink_* is Streaming
644
if engine == Engine::Auto {
645
engine = match payload {
646
#[cfg(feature = "new_streaming")]
647
SinkType::Callback { .. } | SinkType::File { .. } => Engine::Streaming,
648
_ => Engine::InMemory,
649
};
650
}
651
// Gpu uses some hacks to dispatch.
652
if engine == Engine::Gpu {
653
engine = Engine::InMemory;
654
}
655
656
#[cfg(feature = "new_streaming")]
657
{
658
if let Some(result) = self.try_new_streaming_if_requested() {
659
return result.map(|v| v.unwrap_single());
660
}
661
}
662
663
match engine {
664
Engine::Auto => unreachable!(),
665
Engine::Streaming => {
666
feature_gated!("new_streaming", self = self.with_new_streaming(true))
667
},
668
_ => {},
669
}
670
let mut alp_plan = self.clone().to_alp_optimized()?;
671
672
match engine {
673
Engine::Auto | Engine::Streaming => feature_gated!("new_streaming", {
674
let result = polars_stream::run_query(
675
alp_plan.lp_top,
676
&mut alp_plan.lp_arena,
677
&mut alp_plan.expr_arena,
678
);
679
result.map(|v| v.unwrap_single())
680
}),
681
Engine::Gpu => {
682
Err(polars_err!(InvalidOperation: "sink is not supported for the gpu engine"))
683
},
684
Engine::InMemory => {
685
let mut physical_plan = create_physical_plan(
686
alp_plan.lp_top,
687
&mut alp_plan.lp_arena,
688
&mut alp_plan.expr_arena,
689
BUILD_STREAMING_EXECUTOR,
690
)?;
691
let mut state = ExecutionState::new();
692
physical_plan.execute(&mut state)
693
},
694
}
695
}
696
697
pub fn explain_all(plans: Vec<DslPlan>, opt_state: OptFlags) -> PolarsResult<String> {
698
let sink_multiple = LazyFrame {
699
logical_plan: DslPlan::SinkMultiple { inputs: plans },
700
opt_state,
701
cached_arena: Default::default(),
702
};
703
sink_multiple.explain(true)
704
}
705
706
pub fn collect_all_with_engine(
707
plans: Vec<DslPlan>,
708
mut engine: Engine,
709
opt_state: OptFlags,
710
) -> PolarsResult<Vec<DataFrame>> {
711
if plans.is_empty() {
712
return Ok(Vec::new());
713
}
714
715
// Default engine for collect_all is InMemory
716
if engine == Engine::Auto {
717
engine = Engine::InMemory;
718
}
719
// Gpu uses some hacks to dispatch.
720
if engine == Engine::Gpu {
721
engine = Engine::InMemory;
722
}
723
724
let mut sink_multiple = LazyFrame {
725
logical_plan: DslPlan::SinkMultiple { inputs: plans },
726
opt_state,
727
cached_arena: Default::default(),
728
};
729
730
#[cfg(feature = "new_streaming")]
731
{
732
if let Some(result) = sink_multiple.try_new_streaming_if_requested() {
733
return result.map(|v| v.unwrap_multiple());
734
}
735
}
736
737
match engine {
738
Engine::Auto => unreachable!(),
739
Engine::Streaming => {
740
feature_gated!(
741
"new_streaming",
742
sink_multiple = sink_multiple.with_new_streaming(true)
743
)
744
},
745
_ => {},
746
}
747
let mut alp_plan = sink_multiple.to_alp_optimized()?;
748
749
if engine == Engine::Streaming {
750
feature_gated!("new_streaming", {
751
let result = polars_stream::run_query(
752
alp_plan.lp_top,
753
&mut alp_plan.lp_arena,
754
&mut alp_plan.expr_arena,
755
);
756
return result.map(|v| v.unwrap_multiple());
757
});
758
}
759
760
let IR::SinkMultiple { inputs } = alp_plan.root() else {
761
unreachable!()
762
};
763
764
let mut multiplan = create_multiple_physical_plans(
765
inputs.clone().as_slice(),
766
&mut alp_plan.lp_arena,
767
&mut alp_plan.expr_arena,
768
BUILD_STREAMING_EXECUTOR,
769
)?;
770
771
match engine {
772
Engine::Gpu => polars_bail!(
773
InvalidOperation: "collect_all is not supported for the gpu engine"
774
),
775
Engine::InMemory => {
776
// We don't use par_iter directly because the LP may also start threads for every LP (for instance scan_csv)
777
// this might then lead to a rayon SO. So we take a multitude of the threads to keep work stealing
778
// within bounds
779
let mut state = ExecutionState::new();
780
if let Some(mut cache_prefiller) = multiplan.cache_prefiller {
781
cache_prefiller.execute(&mut state)?;
782
}
783
let out = POOL.install(|| {
784
multiplan
785
.physical_plans
786
.chunks_mut(POOL.current_num_threads() * 3)
787
.map(|chunk| {
788
chunk
789
.into_par_iter()
790
.enumerate()
791
.map(|(idx, input)| {
792
let mut input = std::mem::take(input);
793
let mut state = state.split();
794
state.branch_idx += idx;
795
796
let df = input.execute(&mut state)?;
797
Ok(df)
798
})
799
.collect::<PolarsResult<Vec<_>>>()
800
})
801
.collect::<PolarsResult<Vec<_>>>()
802
});
803
Ok(out?.into_iter().flatten().collect())
804
},
805
_ => unreachable!(),
806
}
807
}
808
809
/// Execute all the lazy operations and collect them into a [`DataFrame`].
810
///
811
/// The query is optimized prior to execution.
812
///
813
/// # Example
814
///
815
/// ```rust
816
/// use polars_core::prelude::*;
817
/// use polars_lazy::prelude::*;
818
///
819
/// fn example(df: DataFrame) -> PolarsResult<DataFrame> {
820
/// df.lazy()
821
/// .group_by([col("foo")])
822
/// .agg([col("bar").sum(), col("ham").mean().alias("avg_ham")])
823
/// .collect()
824
/// }
825
/// ```
826
pub fn collect(self) -> PolarsResult<DataFrame> {
827
self.collect_with_engine(Engine::InMemory)
828
}
829
830
/// Collect the query in batches.
831
///
832
/// If lazy is true the query will not start until the first poll (or until
833
/// start is called on CollectBatches).
834
#[cfg(feature = "async")]
835
pub fn collect_batches(
836
self,
837
engine: Engine,
838
maintain_order: bool,
839
chunk_size: Option<NonZeroUsize>,
840
lazy: bool,
841
) -> PolarsResult<CollectBatches> {
842
let (send, recv) = sync_channel(1);
843
let runner_send = send.clone();
844
let ldf = self.sink_batches(
845
PlanCallback::new(move |df| {
846
// Stop if receiver has closed.
847
let send_result = send.send(Ok(df));
848
Ok(send_result.is_err())
849
}),
850
maintain_order,
851
chunk_size,
852
)?;
853
let runner = move || {
854
// We use a tokio spawn_blocking here as it has a high blocking
855
// thread pool limit.
856
polars_io::pl_async::get_runtime().spawn_blocking(move || {
857
if let Err(e) = ldf.collect_with_engine(engine) {
858
runner_send.send(Err(e)).ok();
859
}
860
});
861
};
862
863
let mut collect_batches = CollectBatches {
864
recv,
865
runner: Some(Box::new(runner)),
866
};
867
if !lazy {
868
collect_batches.start();
869
}
870
Ok(collect_batches)
871
}
872
873
// post_opt: A function that is called after optimization. This can be used to modify the IR jit.
874
// This version does profiling of the node execution.
875
pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
876
where
877
P: FnOnce(
878
Node,
879
&mut Arena<IR>,
880
&mut Arena<AExpr>,
881
Option<std::time::Duration>,
882
) -> PolarsResult<()>,
883
{
884
let query_start = std::time::Instant::now();
885
let (mut state, mut physical_plan, _) =
886
self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
887
state.time_nodes(query_start);
888
let out = physical_plan.execute(&mut state)?;
889
let timer_df = state.finish_timer()?;
890
Ok((out, timer_df))
891
}
892
893
/// Profile a LazyFrame.
894
///
895
/// This will run the query and return a tuple
896
/// containing the materialized DataFrame and a DataFrame that contains profiling information
897
/// of each node that is executed.
898
///
899
/// The units of the timings are microseconds.
900
pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
901
self._profile_post_opt(|_, _, _, _| Ok(()))
902
}
903
904
pub fn sink_batches(
905
mut self,
906
function: PlanCallback<DataFrame, bool>,
907
maintain_order: bool,
908
chunk_size: Option<NonZeroUsize>,
909
) -> PolarsResult<Self> {
910
use polars_plan::prelude::sink::CallbackSinkType;
911
912
polars_ensure!(
913
!matches!(self.logical_plan, DslPlan::Sink { .. }),
914
InvalidOperation: "cannot create a sink on top of another sink"
915
);
916
917
self.logical_plan = DslPlan::Sink {
918
input: Arc::new(self.logical_plan),
919
payload: SinkType::Callback(CallbackSinkType {
920
function,
921
maintain_order,
922
chunk_size,
923
}),
924
};
925
926
Ok(self)
927
}
928
929
#[cfg(feature = "new_streaming")]
930
pub fn try_new_streaming_if_requested(
931
&mut self,
932
) -> Option<PolarsResult<polars_stream::QueryResult>> {
933
let auto_new_streaming = std::env::var("POLARS_AUTO_NEW_STREAMING").as_deref() == Ok("1");
934
let force_new_streaming = std::env::var("POLARS_FORCE_NEW_STREAMING").as_deref() == Ok("1");
935
936
if auto_new_streaming || force_new_streaming {
937
// Try to run using the new streaming engine, falling back
938
// if it fails in a todo!() error if auto_new_streaming is set.
939
let mut new_stream_lazy = self.clone();
940
new_stream_lazy.opt_state |= OptFlags::NEW_STREAMING;
941
let mut alp_plan = match new_stream_lazy.to_alp_optimized() {
942
Ok(v) => v,
943
Err(e) => return Some(Err(e)),
944
};
945
946
let f = || {
947
polars_stream::run_query(
948
alp_plan.lp_top,
949
&mut alp_plan.lp_arena,
950
&mut alp_plan.expr_arena,
951
)
952
};
953
954
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
955
Ok(v) => return Some(v),
956
Err(e) => {
957
// Fallback to normal engine if error is due to not being implemented
958
// and auto_new_streaming is set, otherwise propagate error.
959
if !force_new_streaming
960
&& auto_new_streaming
961
&& e.downcast_ref::<&str>()
962
.map(|s| s.starts_with("not yet implemented"))
963
.unwrap_or(false)
964
{
965
if polars_core::config::verbose() {
966
eprintln!(
967
"caught unimplemented error in new streaming engine, falling back to normal engine"
968
);
969
}
970
} else {
971
std::panic::resume_unwind(e);
972
}
973
},
974
}
975
}
976
977
None
978
}
979
980
pub fn sink(
981
mut self,
982
sink_type: SinkDestination,
983
file_format: FileWriteFormat,
984
unified_sink_args: UnifiedSinkArgs,
985
) -> PolarsResult<Self> {
986
polars_ensure!(
987
!matches!(self.logical_plan, DslPlan::Sink { .. }),
988
InvalidOperation: "cannot create a sink on top of another sink"
989
);
990
991
self.logical_plan = DslPlan::Sink {
992
input: Arc::new(self.logical_plan),
993
payload: match sink_type {
994
SinkDestination::File { target } => SinkType::File(FileSinkOptions {
995
target,
996
file_format,
997
unified_sink_args,
998
}),
999
SinkDestination::Partitioned {
1000
base_path,
1001
file_path_provider,
1002
partition_strategy,
1003
max_rows_per_file,
1004
approximate_bytes_per_file,
1005
} => SinkType::Partitioned(PartitionedSinkOptions {
1006
base_path,
1007
file_path_provider,
1008
partition_strategy,
1009
file_format,
1010
unified_sink_args,
1011
max_rows_per_file,
1012
approximate_bytes_per_file,
1013
}),
1014
},
1015
};
1016
Ok(self)
1017
}
1018
1019
/// Filter frame rows that match a predicate expression.
1020
///
1021
/// The expression must yield boolean values (note that rows where the
1022
/// predicate resolves to `null` are *not* included in the resulting frame).
1023
///
1024
/// # Example
1025
///
1026
/// ```rust
1027
/// use polars_core::prelude::*;
1028
/// use polars_lazy::prelude::*;
1029
///
1030
/// fn example(df: DataFrame) -> LazyFrame {
1031
/// df.lazy()
1032
/// .filter(col("sepal_width").is_not_null())
1033
/// .select([col("sepal_width"), col("sepal_length")])
1034
/// }
1035
/// ```
1036
pub fn filter(self, predicate: Expr) -> Self {
1037
let opt_state = self.get_opt_state();
1038
let lp = self.get_plan_builder().filter(predicate).build();
1039
Self::from_logical_plan(lp, opt_state)
1040
}
1041
1042
/// Remove frame rows that match a predicate expression.
1043
///
1044
/// The expression must yield boolean values (note that rows where the
1045
/// predicate resolves to `null` are *not* removed from the resulting frame).
1046
///
1047
/// # Example
1048
///
1049
/// ```rust
1050
/// use polars_core::prelude::*;
1051
/// use polars_lazy::prelude::*;
1052
///
1053
/// fn example(df: DataFrame) -> LazyFrame {
1054
/// df.lazy()
1055
/// .remove(col("sepal_width").is_null())
1056
/// .select([col("sepal_width"), col("sepal_length")])
1057
/// }
1058
/// ```
1059
pub fn remove(self, predicate: Expr) -> Self {
1060
self.filter(predicate.neq_missing(lit(true)))
1061
}
1062
1063
/// Select (and optionally rename, with [`alias`](crate::dsl::Expr::alias)) columns from the query.
1064
///
1065
/// Columns can be selected with [`col`];
1066
/// If you want to select all columns use `col(PlSmallStr::from_static("*"))`.
1067
///
1068
/// # Example
1069
///
1070
/// ```rust
1071
/// use polars_core::prelude::*;
1072
/// use polars_lazy::prelude::*;
1073
///
1074
/// /// This function selects column "foo" and column "bar".
1075
/// /// Column "bar" is renamed to "ham".
1076
/// fn example(df: DataFrame) -> LazyFrame {
1077
/// df.lazy()
1078
/// .select([col("foo"),
1079
/// col("bar").alias("ham")])
1080
/// }
1081
///
1082
/// /// This function selects all columns except "foo"
1083
/// fn exclude_a_column(df: DataFrame) -> LazyFrame {
1084
/// df.lazy()
1085
/// .select([all().exclude_cols(["foo"]).as_expr()])
1086
/// }
1087
/// ```
1088
pub fn select<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1089
let exprs = exprs.as_ref().to_vec();
1090
self.select_impl(
1091
exprs,
1092
ProjectionOptions {
1093
run_parallel: true,
1094
duplicate_check: true,
1095
should_broadcast: true,
1096
},
1097
)
1098
}
1099
1100
pub fn select_seq<E: AsRef<[Expr]>>(self, exprs: E) -> Self {
1101
let exprs = exprs.as_ref().to_vec();
1102
self.select_impl(
1103
exprs,
1104
ProjectionOptions {
1105
run_parallel: false,
1106
duplicate_check: true,
1107
should_broadcast: true,
1108
},
1109
)
1110
}
1111
1112
fn select_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {
1113
let opt_state = self.get_opt_state();
1114
let lp = self.get_plan_builder().project(exprs, options).build();
1115
Self::from_logical_plan(lp, opt_state)
1116
}
1117
1118
/// Performs a "group-by" on a `LazyFrame`, producing a [`LazyGroupBy`], which can subsequently be aggregated.
1119
///
1120
/// Takes a list of expressions to group on.
1121
///
1122
/// # Example
1123
///
1124
/// ```rust
1125
/// use polars_core::prelude::*;
1126
/// use polars_lazy::prelude::*;
1127
///
1128
/// fn example(df: DataFrame) -> LazyFrame {
1129
/// df.lazy()
1130
/// .group_by([col("date")])
1131
/// .agg([
1132
/// col("rain").min().alias("min_rain"),
1133
/// col("rain").sum().alias("sum_rain"),
1134
/// col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
1135
/// ])
1136
/// }
1137
/// ```
1138
pub fn group_by<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1139
let keys = by
1140
.as_ref()
1141
.iter()
1142
.map(|e| e.clone().into())
1143
.collect::<Vec<_>>();
1144
let opt_state = self.get_opt_state();
1145
1146
#[cfg(feature = "dynamic_group_by")]
1147
{
1148
LazyGroupBy {
1149
logical_plan: self.logical_plan,
1150
opt_state,
1151
keys,
1152
predicates: vec![],
1153
maintain_order: false,
1154
dynamic_options: None,
1155
rolling_options: None,
1156
}
1157
}
1158
1159
#[cfg(not(feature = "dynamic_group_by"))]
1160
{
1161
LazyGroupBy {
1162
logical_plan: self.logical_plan,
1163
opt_state,
1164
keys,
1165
predicates: vec![],
1166
maintain_order: false,
1167
}
1168
}
1169
}
1170
1171
/// Create rolling groups based on a time column.
1172
///
1173
/// Also works for index values of type UInt32, UInt64, Int32, or Int64.
1174
///
1175
/// Different from a [`group_by_dynamic`][`Self::group_by_dynamic`], the windows are now determined by the
1176
/// individual values and are not of constant intervals. For constant intervals use
1177
/// *group_by_dynamic*
1178
#[cfg(feature = "dynamic_group_by")]
1179
pub fn rolling<E: AsRef<[Expr]>>(
1180
mut self,
1181
index_column: Expr,
1182
group_by: E,
1183
mut options: RollingGroupOptions,
1184
) -> LazyGroupBy {
1185
if let Expr::Column(name) = index_column {
1186
options.index_column = name;
1187
} else {
1188
let output_field = index_column
1189
.to_field(&self.collect_schema().unwrap())
1190
.unwrap();
1191
return self.with_column(index_column).rolling(
1192
Expr::Column(output_field.name().clone()),
1193
group_by,
1194
options,
1195
);
1196
}
1197
let opt_state = self.get_opt_state();
1198
LazyGroupBy {
1199
logical_plan: self.logical_plan,
1200
opt_state,
1201
predicates: vec![],
1202
keys: group_by.as_ref().to_vec(),
1203
maintain_order: true,
1204
dynamic_options: None,
1205
rolling_options: Some(options),
1206
}
1207
}
1208
1209
/// Group based on a time value (or index value of type Int32, Int64).
1210
///
1211
/// Time windows are calculated and rows are assigned to windows. Different from a
1212
/// normal group_by is that a row can be member of multiple groups. The time/index
1213
/// window could be seen as a rolling window, with a window size determined by
1214
/// dates/times/values instead of slots in the DataFrame.
1215
///
1216
/// A window is defined by:
1217
///
1218
/// - every: interval of the window
1219
/// - period: length of the window
1220
/// - offset: offset of the window
1221
///
1222
/// The `group_by` argument should be empty `[]` if you don't want to combine this
1223
/// with a ordinary group_by on these keys.
1224
#[cfg(feature = "dynamic_group_by")]
1225
pub fn group_by_dynamic<E: AsRef<[Expr]>>(
1226
mut self,
1227
index_column: Expr,
1228
group_by: E,
1229
mut options: DynamicGroupOptions,
1230
) -> LazyGroupBy {
1231
if let Expr::Column(name) = index_column {
1232
options.index_column = name;
1233
} else {
1234
let output_field = index_column
1235
.to_field(&self.collect_schema().unwrap())
1236
.unwrap();
1237
return self.with_column(index_column).group_by_dynamic(
1238
Expr::Column(output_field.name().clone()),
1239
group_by,
1240
options,
1241
);
1242
}
1243
let opt_state = self.get_opt_state();
1244
LazyGroupBy {
1245
logical_plan: self.logical_plan,
1246
opt_state,
1247
predicates: vec![],
1248
keys: group_by.as_ref().to_vec(),
1249
maintain_order: true,
1250
dynamic_options: Some(options),
1251
rolling_options: None,
1252
}
1253
}
1254
1255
/// Similar to [`group_by`][`Self::group_by`], but order of the DataFrame is maintained.
1256
pub fn group_by_stable<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(self, by: E) -> LazyGroupBy {
1257
let keys = by
1258
.as_ref()
1259
.iter()
1260
.map(|e| e.clone().into())
1261
.collect::<Vec<_>>();
1262
let opt_state = self.get_opt_state();
1263
1264
#[cfg(feature = "dynamic_group_by")]
1265
{
1266
LazyGroupBy {
1267
logical_plan: self.logical_plan,
1268
opt_state,
1269
keys,
1270
predicates: vec![],
1271
maintain_order: true,
1272
dynamic_options: None,
1273
rolling_options: None,
1274
}
1275
}
1276
1277
#[cfg(not(feature = "dynamic_group_by"))]
1278
{
1279
LazyGroupBy {
1280
logical_plan: self.logical_plan,
1281
opt_state,
1282
keys,
1283
predicates: vec![],
1284
maintain_order: true,
1285
}
1286
}
1287
}
1288
1289
/// Left anti join this query with another lazy query.
1290
///
1291
/// Matches on the values of the expressions `left_on` and `right_on`. For more
1292
/// flexible join logic, see [`join`](LazyFrame::join) or
1293
/// [`join_builder`](LazyFrame::join_builder).
1294
///
1295
/// # Example
1296
///
1297
/// ```rust
1298
/// use polars_core::prelude::*;
1299
/// use polars_lazy::prelude::*;
1300
/// fn anti_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1301
/// ldf
1302
/// .anti_join(other, col("foo"), col("bar").cast(DataType::String))
1303
/// }
1304
/// ```
1305
#[cfg(feature = "semi_anti_join")]
1306
pub fn anti_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1307
self.join(
1308
other,
1309
[left_on.into()],
1310
[right_on.into()],
1311
JoinArgs::new(JoinType::Anti),
1312
)
1313
}
1314
1315
/// Creates the Cartesian product from both frames, preserving the order of the left keys.
1316
#[cfg(feature = "cross_join")]
1317
pub fn cross_join(self, other: LazyFrame, suffix: Option<PlSmallStr>) -> LazyFrame {
1318
self.join(
1319
other,
1320
vec![],
1321
vec![],
1322
JoinArgs::new(JoinType::Cross).with_suffix(suffix),
1323
)
1324
}
1325
1326
/// Left outer join this query with another lazy query.
1327
///
1328
/// Matches on the values of the expressions `left_on` and `right_on`. For more
1329
/// flexible join logic, see [`join`](LazyFrame::join) or
1330
/// [`join_builder`](LazyFrame::join_builder).
1331
///
1332
/// # Example
1333
///
1334
/// ```rust
1335
/// use polars_core::prelude::*;
1336
/// use polars_lazy::prelude::*;
1337
/// fn left_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1338
/// ldf
1339
/// .left_join(other, col("foo"), col("bar"))
1340
/// }
1341
/// ```
1342
pub fn left_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1343
self.join(
1344
other,
1345
[left_on.into()],
1346
[right_on.into()],
1347
JoinArgs::new(JoinType::Left),
1348
)
1349
}
1350
1351
/// Inner join this query with another lazy query.
1352
///
1353
/// Matches on the values of the expressions `left_on` and `right_on`. For more
1354
/// flexible join logic, see [`join`](LazyFrame::join) or
1355
/// [`join_builder`](LazyFrame::join_builder).
1356
///
1357
/// # Example
1358
///
1359
/// ```rust
1360
/// use polars_core::prelude::*;
1361
/// use polars_lazy::prelude::*;
1362
/// fn inner_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1363
/// ldf
1364
/// .inner_join(other, col("foo"), col("bar").cast(DataType::String))
1365
/// }
1366
/// ```
1367
pub fn inner_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1368
self.join(
1369
other,
1370
[left_on.into()],
1371
[right_on.into()],
1372
JoinArgs::new(JoinType::Inner),
1373
)
1374
}
1375
1376
/// Full outer join this query with another lazy query.
1377
///
1378
/// Matches on the values of the expressions `left_on` and `right_on`. For more
1379
/// flexible join logic, see [`join`](LazyFrame::join) or
1380
/// [`join_builder`](LazyFrame::join_builder).
1381
///
1382
/// # Example
1383
///
1384
/// ```rust
1385
/// use polars_core::prelude::*;
1386
/// use polars_lazy::prelude::*;
1387
/// fn full_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1388
/// ldf
1389
/// .full_join(other, col("foo"), col("bar"))
1390
/// }
1391
/// ```
1392
pub fn full_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1393
self.join(
1394
other,
1395
[left_on.into()],
1396
[right_on.into()],
1397
JoinArgs::new(JoinType::Full),
1398
)
1399
}
1400
1401
/// Left semi join this query with another lazy query.
1402
///
1403
/// Matches on the values of the expressions `left_on` and `right_on`. For more
1404
/// flexible join logic, see [`join`](LazyFrame::join) or
1405
/// [`join_builder`](LazyFrame::join_builder).
1406
///
1407
/// # Example
1408
///
1409
/// ```rust
1410
/// use polars_core::prelude::*;
1411
/// use polars_lazy::prelude::*;
1412
/// fn semi_join_dataframes(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1413
/// ldf
1414
/// .semi_join(other, col("foo"), col("bar").cast(DataType::String))
1415
/// }
1416
/// ```
1417
#[cfg(feature = "semi_anti_join")]
1418
pub fn semi_join<E: Into<Expr>>(self, other: LazyFrame, left_on: E, right_on: E) -> LazyFrame {
1419
self.join(
1420
other,
1421
[left_on.into()],
1422
[right_on.into()],
1423
JoinArgs::new(JoinType::Semi),
1424
)
1425
}
1426
1427
/// Generic function to join two LazyFrames.
1428
///
1429
/// `join` can join on multiple columns, given as two list of expressions, and with a
1430
/// [`JoinType`] specified by `how`. Non-joined column names in the right DataFrame
1431
/// that already exist in this DataFrame are suffixed with `"_right"`. For control
1432
/// over how columns are renamed and parallelization options, use
1433
/// [`join_builder`](LazyFrame::join_builder).
1434
///
1435
/// Any provided `args.slice` parameter is not considered, but set by the internal optimizer.
1436
///
1437
/// # Example
1438
///
1439
/// ```rust
1440
/// use polars_core::prelude::*;
1441
/// use polars_lazy::prelude::*;
1442
///
1443
/// fn example(ldf: LazyFrame, other: LazyFrame) -> LazyFrame {
1444
/// ldf
1445
/// .join(other, [col("foo"), col("bar")], [col("foo"), col("bar")], JoinArgs::new(JoinType::Inner))
1446
/// }
1447
/// ```
1448
pub fn join<E: AsRef<[Expr]>>(
1449
self,
1450
other: LazyFrame,
1451
left_on: E,
1452
right_on: E,
1453
args: JoinArgs,
1454
) -> LazyFrame {
1455
let left_on = left_on.as_ref().to_vec();
1456
let right_on = right_on.as_ref().to_vec();
1457
1458
self._join_impl(other, left_on, right_on, args)
1459
}
1460
1461
fn _join_impl(
1462
self,
1463
other: LazyFrame,
1464
left_on: Vec<Expr>,
1465
right_on: Vec<Expr>,
1466
args: JoinArgs,
1467
) -> LazyFrame {
1468
let JoinArgs {
1469
how,
1470
validation,
1471
suffix,
1472
slice,
1473
nulls_equal,
1474
coalesce,
1475
maintain_order,
1476
build_side,
1477
} = args;
1478
1479
if slice.is_some() {
1480
panic!("impl error: slice is not handled")
1481
}
1482
1483
let mut builder = self
1484
.join_builder()
1485
.with(other)
1486
.left_on(left_on)
1487
.right_on(right_on)
1488
.how(how)
1489
.validate(validation)
1490
.join_nulls(nulls_equal)
1491
.coalesce(coalesce)
1492
.maintain_order(maintain_order)
1493
.build_side(build_side);
1494
1495
if let Some(suffix) = suffix {
1496
builder = builder.suffix(suffix);
1497
}
1498
1499
// Note: args.slice is set by the optimizer
1500
builder.finish()
1501
}
1502
1503
/// Consume `self` and return a [`JoinBuilder`] to customize a join on this LazyFrame.
1504
///
1505
/// After the `JoinBuilder` has been created and set up, calling
1506
/// [`finish()`](JoinBuilder::finish) on it will give back the `LazyFrame`
1507
/// representing the `join` operation.
1508
pub fn join_builder(self) -> JoinBuilder {
1509
JoinBuilder::new(self)
1510
}
1511
1512
/// Add or replace a column, given as an expression, to a DataFrame.
1513
///
1514
/// # Example
1515
///
1516
/// ```rust
1517
/// use polars_core::prelude::*;
1518
/// use polars_lazy::prelude::*;
1519
/// fn add_column(df: DataFrame) -> LazyFrame {
1520
/// df.lazy()
1521
/// .with_column(
1522
/// when(col("sepal_length").lt(lit(5.0)))
1523
/// .then(lit(10))
1524
/// .otherwise(lit(1))
1525
/// .alias("new_column_name"),
1526
/// )
1527
/// }
1528
/// ```
1529
pub fn with_column(self, expr: Expr) -> LazyFrame {
1530
let opt_state = self.get_opt_state();
1531
let lp = self
1532
.get_plan_builder()
1533
.with_columns(
1534
vec![expr],
1535
ProjectionOptions {
1536
run_parallel: false,
1537
duplicate_check: true,
1538
should_broadcast: true,
1539
},
1540
)
1541
.build();
1542
Self::from_logical_plan(lp, opt_state)
1543
}
1544
1545
/// Add or replace multiple columns, given as expressions, to a DataFrame.
1546
///
1547
/// # Example
1548
///
1549
/// ```rust
1550
/// use polars_core::prelude::*;
1551
/// use polars_lazy::prelude::*;
1552
/// fn add_columns(df: DataFrame) -> LazyFrame {
1553
/// df.lazy()
1554
/// .with_columns(
1555
/// vec![lit(10).alias("foo"), lit(100).alias("bar")]
1556
/// )
1557
/// }
1558
/// ```
1559
pub fn with_columns<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1560
let exprs = exprs.as_ref().to_vec();
1561
self.with_columns_impl(
1562
exprs,
1563
ProjectionOptions {
1564
run_parallel: true,
1565
duplicate_check: true,
1566
should_broadcast: true,
1567
},
1568
)
1569
}
1570
1571
/// Add or replace multiple columns to a DataFrame, but evaluate them sequentially.
1572
pub fn with_columns_seq<E: AsRef<[Expr]>>(self, exprs: E) -> LazyFrame {
1573
let exprs = exprs.as_ref().to_vec();
1574
self.with_columns_impl(
1575
exprs,
1576
ProjectionOptions {
1577
run_parallel: false,
1578
duplicate_check: true,
1579
should_broadcast: true,
1580
},
1581
)
1582
}
1583
1584
/// Match or evolve to a certain schema.
1585
pub fn match_to_schema(
1586
self,
1587
schema: SchemaRef,
1588
per_column: Arc<[MatchToSchemaPerColumn]>,
1589
extra_columns: ExtraColumnsPolicy,
1590
) -> LazyFrame {
1591
let opt_state = self.get_opt_state();
1592
let lp = self
1593
.get_plan_builder()
1594
.match_to_schema(schema, per_column, extra_columns)
1595
.build();
1596
Self::from_logical_plan(lp, opt_state)
1597
}
1598
1599
pub fn pipe_with_schema(
1600
self,
1601
callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1602
) -> Self {
1603
let opt_state = self.get_opt_state();
1604
let lp = self
1605
.get_plan_builder()
1606
.pipe_with_schema(vec![], callback)
1607
.build();
1608
Self::from_logical_plan(lp, opt_state)
1609
}
1610
1611
pub fn pipe_with_schemas(
1612
self,
1613
others: Vec<LazyFrame>,
1614
callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
1615
) -> Self {
1616
let opt_state = self.get_opt_state();
1617
let lp = self
1618
.get_plan_builder()
1619
.pipe_with_schema(
1620
others.into_iter().map(|lf| lf.logical_plan).collect(),
1621
callback,
1622
)
1623
.build();
1624
Self::from_logical_plan(lp, opt_state)
1625
}
1626
1627
fn with_columns_impl(self, exprs: Vec<Expr>, options: ProjectionOptions) -> LazyFrame {
1628
let opt_state = self.get_opt_state();
1629
let lp = self.get_plan_builder().with_columns(exprs, options).build();
1630
Self::from_logical_plan(lp, opt_state)
1631
}
1632
1633
pub fn with_context<C: AsRef<[LazyFrame]>>(self, contexts: C) -> LazyFrame {
1634
let contexts = contexts
1635
.as_ref()
1636
.iter()
1637
.map(|lf| lf.logical_plan.clone())
1638
.collect();
1639
let opt_state = self.get_opt_state();
1640
let lp = self.get_plan_builder().with_context(contexts).build();
1641
Self::from_logical_plan(lp, opt_state)
1642
}
1643
1644
/// Aggregate all the columns as their maximum values.
1645
///
1646
/// Aggregated columns will have the same names as the original columns.
1647
pub fn max(self) -> Self {
1648
self.map_private(DslFunction::Stats(StatsFunction::Max))
1649
}
1650
1651
/// Aggregate all the columns as their minimum values.
1652
///
1653
/// Aggregated columns will have the same names as the original columns.
1654
pub fn min(self) -> Self {
1655
self.map_private(DslFunction::Stats(StatsFunction::Min))
1656
}
1657
1658
/// Aggregate all the columns as their sum values.
1659
///
1660
/// Aggregated columns will have the same names as the original columns.
1661
///
1662
/// - Boolean columns will sum to a `u32` containing the number of `true`s.
1663
/// - For integer columns, the ordinary checks for overflow are performed:
1664
/// if running in `debug` mode, overflows will panic, whereas in `release` mode overflows will
1665
/// silently wrap.
1666
/// - String columns will sum to None.
1667
pub fn sum(self) -> Self {
1668
self.map_private(DslFunction::Stats(StatsFunction::Sum))
1669
}
1670
1671
/// Aggregate all the columns as their mean values.
1672
///
1673
/// - Boolean and integer columns are converted to `f64` before computing the mean.
1674
/// - String columns will have a mean of None.
1675
pub fn mean(self) -> Self {
1676
self.map_private(DslFunction::Stats(StatsFunction::Mean))
1677
}
1678
1679
/// Aggregate all the columns as their median values.
1680
///
1681
/// - Boolean and integer results are converted to `f64`. However, they are still
1682
/// susceptible to overflow before this conversion occurs.
1683
/// - String columns will sum to None.
1684
pub fn median(self) -> Self {
1685
self.map_private(DslFunction::Stats(StatsFunction::Median))
1686
}
1687
1688
/// Aggregate all the columns as their quantile values.
1689
pub fn quantile(self, quantile: Expr, method: QuantileMethod) -> Self {
1690
self.map_private(DslFunction::Stats(StatsFunction::Quantile {
1691
quantile,
1692
method,
1693
}))
1694
}
1695
1696
/// Aggregate all the columns as their standard deviation values.
1697
///
1698
/// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1699
/// computing the variance, where `N` is the number of rows.
1700
/// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1701
/// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1702
/// > likelihood estimate of the variance for normally distributed variables. The
1703
/// > standard deviation computed in this function is the square root of the estimated
1704
/// > variance, so even with `ddof=1`, it will not be an unbiased estimate of the
1705
/// > standard deviation per se.
1706
///
1707
/// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.std.html#)
1708
pub fn std(self, ddof: u8) -> Self {
1709
self.map_private(DslFunction::Stats(StatsFunction::Std { ddof }))
1710
}
1711
1712
/// Aggregate all the columns as their variance values.
1713
///
1714
/// `ddof` is the "Delta Degrees of Freedom"; `N - ddof` will be the denominator when
1715
/// computing the variance, where `N` is the number of rows.
1716
/// > In standard statistical practice, `ddof=1` provides an unbiased estimator of the
1717
/// > variance of a hypothetical infinite population. `ddof=0` provides a maximum
1718
/// > likelihood estimate of the variance for normally distributed variables.
1719
///
1720
/// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.var.html#)
1721
pub fn var(self, ddof: u8) -> Self {
1722
self.map_private(DslFunction::Stats(StatsFunction::Var { ddof }))
1723
}
1724
1725
/// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1726
pub fn explode(self, columns: Selector, options: ExplodeOptions) -> LazyFrame {
1727
self.explode_impl(columns, options, false)
1728
}
1729
1730
/// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode).
1731
fn explode_impl(
1732
self,
1733
columns: Selector,
1734
options: ExplodeOptions,
1735
allow_empty: bool,
1736
) -> LazyFrame {
1737
let opt_state = self.get_opt_state();
1738
let lp = self
1739
.get_plan_builder()
1740
.explode(columns, options, allow_empty)
1741
.build();
1742
Self::from_logical_plan(lp, opt_state)
1743
}
1744
1745
/// Aggregate all the columns as the sum of their null value count.
1746
pub fn null_count(self) -> LazyFrame {
1747
self.select(vec![col(PlSmallStr::from_static("*")).null_count()])
1748
}
1749
1750
/// Drop non-unique rows and maintain the order of kept rows.
1751
///
1752
/// `subset` is an optional `Vec` of column names to consider for uniqueness; if
1753
/// `None`, all columns are considered.
1754
pub fn unique_stable(
1755
self,
1756
subset: Option<Selector>,
1757
keep_strategy: UniqueKeepStrategy,
1758
) -> LazyFrame {
1759
let subset = subset.map(|s| vec![Expr::Selector(s)]);
1760
self.unique_stable_generic(subset, keep_strategy)
1761
}
1762
1763
pub fn unique_stable_generic(
1764
self,
1765
subset: Option<Vec<Expr>>,
1766
keep_strategy: UniqueKeepStrategy,
1767
) -> LazyFrame {
1768
let opt_state = self.get_opt_state();
1769
let options = DistinctOptionsDSL {
1770
subset,
1771
maintain_order: true,
1772
keep_strategy,
1773
};
1774
let lp = self.get_plan_builder().distinct(options).build();
1775
Self::from_logical_plan(lp, opt_state)
1776
}
1777
1778
/// Drop non-unique rows without maintaining the order of kept rows.
1779
///
1780
/// The order of the kept rows may change; to maintain the original row order, use
1781
/// [`unique_stable`](LazyFrame::unique_stable).
1782
///
1783
/// `subset` is an optional `Vec` of column names to consider for uniqueness; if None,
1784
/// all columns are considered.
1785
pub fn unique(self, subset: Option<Selector>, keep_strategy: UniqueKeepStrategy) -> LazyFrame {
1786
let subset = subset.map(|s| vec![Expr::Selector(s)]);
1787
self.unique_generic(subset, keep_strategy)
1788
}
1789
1790
pub fn unique_generic(
1791
self,
1792
subset: Option<Vec<Expr>>,
1793
keep_strategy: UniqueKeepStrategy,
1794
) -> LazyFrame {
1795
let opt_state = self.get_opt_state();
1796
let options = DistinctOptionsDSL {
1797
subset,
1798
maintain_order: false,
1799
keep_strategy,
1800
};
1801
let lp = self.get_plan_builder().distinct(options).build();
1802
Self::from_logical_plan(lp, opt_state)
1803
}
1804
1805
/// Drop rows containing one or more NaN values.
1806
///
1807
/// `subset` is an optional `Vec` of column names to consider for NaNs; if None, all
1808
/// floating point columns are considered.
1809
pub fn drop_nans(self, subset: Option<Selector>) -> LazyFrame {
1810
let opt_state = self.get_opt_state();
1811
let lp = self.get_plan_builder().drop_nans(subset).build();
1812
Self::from_logical_plan(lp, opt_state)
1813
}
1814
1815
/// Drop rows containing one or more None values.
1816
///
1817
/// `subset` is an optional `Vec` of column names to consider for nulls; if None, all
1818
/// columns are considered.
1819
pub fn drop_nulls(self, subset: Option<Selector>) -> LazyFrame {
1820
let opt_state = self.get_opt_state();
1821
let lp = self.get_plan_builder().drop_nulls(subset).build();
1822
Self::from_logical_plan(lp, opt_state)
1823
}
1824
1825
/// Slice the DataFrame using an offset (starting row) and a length.
1826
///
1827
/// If `offset` is negative, it is counted from the end of the DataFrame. For
1828
/// instance, `lf.slice(-5, 3)` gets three rows, starting at the row fifth from the
1829
/// end.
1830
///
1831
/// If `offset` and `len` are such that the slice extends beyond the end of the
1832
/// DataFrame, the portion between `offset` and the end will be returned. In this
1833
/// case, the number of rows in the returned DataFrame will be less than `len`.
1834
pub fn slice(self, offset: i64, len: IdxSize) -> LazyFrame {
1835
let opt_state = self.get_opt_state();
1836
let lp = self.get_plan_builder().slice(offset, len).build();
1837
Self::from_logical_plan(lp, opt_state)
1838
}
1839
1840
/// Remove all the rows of the LazyFrame.
1841
pub fn clear(self) -> LazyFrame {
1842
self.slice(0, 0)
1843
}
1844
1845
/// Get the first row.
1846
///
1847
/// Equivalent to `self.slice(0, 1)`.
1848
pub fn first(self) -> LazyFrame {
1849
self.slice(0, 1)
1850
}
1851
1852
/// Get the last row.
1853
///
1854
/// Equivalent to `self.slice(-1, 1)`.
1855
pub fn last(self) -> LazyFrame {
1856
self.slice(-1, 1)
1857
}
1858
1859
/// Get the last `n` rows.
1860
///
1861
/// Equivalent to `self.slice(-(n as i64), n)`.
1862
pub fn tail(self, n: IdxSize) -> LazyFrame {
1863
let neg_tail = -(n as i64);
1864
self.slice(neg_tail, n)
1865
}
1866
1867
#[cfg(feature = "pivot")]
1868
#[expect(clippy::too_many_arguments)]
1869
pub fn pivot(
1870
self,
1871
on: Selector,
1872
on_columns: Arc<DataFrame>,
1873
index: Selector,
1874
values: Selector,
1875
agg: Expr,
1876
maintain_order: bool,
1877
separator: PlSmallStr,
1878
) -> LazyFrame {
1879
let opt_state = self.get_opt_state();
1880
let lp = self
1881
.get_plan_builder()
1882
.pivot(
1883
on,
1884
on_columns,
1885
index,
1886
values,
1887
agg,
1888
maintain_order,
1889
separator,
1890
)
1891
.build();
1892
Self::from_logical_plan(lp, opt_state)
1893
}
1894
1895
/// Unpivot the DataFrame from wide to long format.
1896
///
1897
/// See [`UnpivotArgsIR`] for information on how to unpivot a DataFrame.
1898
#[cfg(feature = "pivot")]
1899
pub fn unpivot(self, args: UnpivotArgsDSL) -> LazyFrame {
1900
let opt_state = self.get_opt_state();
1901
let lp = self.get_plan_builder().unpivot(args).build();
1902
Self::from_logical_plan(lp, opt_state)
1903
}
1904
1905
/// Limit the DataFrame to the first `n` rows.
1906
pub fn limit(self, n: IdxSize) -> LazyFrame {
1907
self.slice(0, n)
1908
}
1909
1910
/// Apply a function/closure once the logical plan get executed.
1911
///
1912
/// The function has access to the whole materialized DataFrame at the time it is
1913
/// called.
1914
///
1915
/// To apply specific functions to specific columns, use [`Expr::map`] in conjunction
1916
/// with `LazyFrame::with_column` or `with_columns`.
1917
///
1918
/// ## Warning
1919
/// This can blow up in your face if the schema is changed due to the operation. The
1920
/// optimizer relies on a correct schema.
1921
///
1922
/// You can toggle certain optimizations off.
1923
pub fn map<F>(
1924
self,
1925
function: F,
1926
optimizations: AllowedOptimizations,
1927
schema: Option<Arc<dyn UdfSchema>>,
1928
name: Option<&'static str>,
1929
) -> LazyFrame
1930
where
1931
F: 'static + Fn(DataFrame) -> PolarsResult<DataFrame> + Send + Sync,
1932
{
1933
let opt_state = self.get_opt_state();
1934
let lp = self
1935
.get_plan_builder()
1936
.map(
1937
function,
1938
optimizations,
1939
schema,
1940
PlSmallStr::from_static(name.unwrap_or("ANONYMOUS UDF")),
1941
)
1942
.build();
1943
Self::from_logical_plan(lp, opt_state)
1944
}
1945
1946
#[cfg(feature = "python")]
1947
pub fn map_python(
1948
self,
1949
function: polars_utils::python_function::PythonFunction,
1950
optimizations: AllowedOptimizations,
1951
schema: Option<SchemaRef>,
1952
validate_output: bool,
1953
) -> LazyFrame {
1954
let opt_state = self.get_opt_state();
1955
let lp = self
1956
.get_plan_builder()
1957
.map_python(function, optimizations, schema, validate_output)
1958
.build();
1959
Self::from_logical_plan(lp, opt_state)
1960
}
1961
1962
pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame {
1963
let opt_state = self.get_opt_state();
1964
let lp = self.get_plan_builder().map_private(function).build();
1965
Self::from_logical_plan(lp, opt_state)
1966
}
1967
1968
/// Add a new column at index 0 that counts the rows.
1969
///
1970
/// `name` is the name of the new column. `offset` is where to start counting from; if
1971
/// `None`, it is set to `0`.
1972
///
1973
/// # Warning
1974
/// This can have a negative effect on query performance. This may for instance block
1975
/// predicate pushdown optimization.
1976
pub fn with_row_index<S>(self, name: S, offset: Option<IdxSize>) -> LazyFrame
1977
where
1978
S: Into<PlSmallStr>,
1979
{
1980
let name = name.into();
1981
1982
match &self.logical_plan {
1983
v @ DslPlan::Scan {
1984
scan_type,
1985
unified_scan_args,
1986
..
1987
} if unified_scan_args.row_index.is_none()
1988
&& !matches!(&**scan_type, FileScanDsl::Anonymous { .. }) =>
1989
{
1990
let DslPlan::Scan {
1991
sources,
1992
mut unified_scan_args,
1993
scan_type,
1994
cached_ir: _,
1995
} = v.clone()
1996
else {
1997
unreachable!()
1998
};
1999
2000
unified_scan_args.row_index = Some(RowIndex {
2001
name,
2002
offset: offset.unwrap_or(0),
2003
});
2004
2005
DslPlan::Scan {
2006
sources,
2007
unified_scan_args,
2008
scan_type,
2009
cached_ir: Default::default(),
2010
}
2011
.into()
2012
},
2013
_ => self.map_private(DslFunction::RowIndex { name, offset }),
2014
}
2015
}
2016
2017
/// Return the number of non-null elements for each column.
2018
pub fn count(self) -> LazyFrame {
2019
self.select(vec![col(PlSmallStr::from_static("*")).count()])
2020
}
2021
2022
/// Unnest the given `Struct` columns: the fields of the `Struct` type will be
2023
/// inserted as columns.
2024
#[cfg(feature = "dtype-struct")]
2025
pub fn unnest(self, cols: Selector, separator: Option<PlSmallStr>) -> Self {
2026
self.map_private(DslFunction::Unnest {
2027
columns: cols,
2028
separator,
2029
})
2030
}
2031
2032
#[cfg(feature = "merge_sorted")]
2033
pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
2034
where
2035
S: Into<PlSmallStr>,
2036
{
2037
let key = key.into();
2038
2039
let lp = DslPlan::MergeSorted {
2040
input_left: Arc::new(self.logical_plan),
2041
input_right: Arc::new(other.logical_plan),
2042
key,
2043
};
2044
Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
2045
}
2046
2047
pub fn hint(self, hint: HintIR) -> PolarsResult<LazyFrame> {
2048
let lp = DslPlan::MapFunction {
2049
input: Arc::new(self.logical_plan),
2050
function: DslFunction::Hint(hint),
2051
};
2052
Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
2053
}
2054
}
2055
2056
/// Utility struct for lazy group_by operation.
2057
#[derive(Clone)]
2058
pub struct LazyGroupBy {
2059
pub logical_plan: DslPlan,
2060
opt_state: OptFlags,
2061
keys: Vec<Expr>,
2062
predicates: Vec<Expr>,
2063
maintain_order: bool,
2064
#[cfg(feature = "dynamic_group_by")]
2065
dynamic_options: Option<DynamicGroupOptions>,
2066
#[cfg(feature = "dynamic_group_by")]
2067
rolling_options: Option<RollingGroupOptions>,
2068
}
2069
2070
impl From<LazyGroupBy> for LazyFrame {
2071
fn from(lgb: LazyGroupBy) -> Self {
2072
Self {
2073
logical_plan: lgb.logical_plan,
2074
opt_state: lgb.opt_state,
2075
cached_arena: Default::default(),
2076
}
2077
}
2078
}
2079
2080
impl LazyGroupBy {
2081
/// Filter groups with a predicate after aggregation.
2082
///
2083
/// Similarly to the [LazyGroupBy::agg] method, the predicate must run an aggregation as it
2084
/// is evaluated on the groups.
2085
/// This method can be chained in which case all predicates must evaluate to `true` for a
2086
/// group to be kept.
2087
///
2088
/// # Example
2089
///
2090
/// ```rust
2091
/// use polars_core::prelude::*;
2092
/// use polars_lazy::prelude::*;
2093
///
2094
/// fn example(df: DataFrame) -> LazyFrame {
2095
/// df.lazy()
2096
/// .group_by_stable([col("date")])
2097
/// .having(col("rain").sum().gt(lit(10)))
2098
/// .agg([col("rain").min().alias("min_rain")])
2099
/// }
2100
/// ```
2101
pub fn having(mut self, predicate: Expr) -> Self {
2102
self.predicates.push(predicate);
2103
self
2104
}
2105
2106
/// Group by and aggregate.
2107
///
2108
/// Select a column with [col] and choose an aggregation.
2109
/// If you want to aggregate all columns use `col(PlSmallStr::from_static("*"))`.
2110
///
2111
/// # Example
2112
///
2113
/// ```rust
2114
/// use polars_core::prelude::*;
2115
/// use polars_lazy::prelude::*;
2116
///
2117
/// fn example(df: DataFrame) -> LazyFrame {
2118
/// df.lazy()
2119
/// .group_by_stable([col("date")])
2120
/// .agg([
2121
/// col("rain").min().alias("min_rain"),
2122
/// col("rain").sum().alias("sum_rain"),
2123
/// col("rain").quantile(lit(0.5), QuantileMethod::Nearest).alias("median_rain"),
2124
/// ])
2125
/// }
2126
/// ```
2127
pub fn agg<E: AsRef<[Expr]>>(self, aggs: E) -> LazyFrame {
2128
#[cfg(feature = "dynamic_group_by")]
2129
let lp = DslBuilder::from(self.logical_plan)
2130
.group_by(
2131
self.keys,
2132
self.predicates,
2133
aggs,
2134
None,
2135
self.maintain_order,
2136
self.dynamic_options,
2137
self.rolling_options,
2138
)
2139
.build();
2140
2141
#[cfg(not(feature = "dynamic_group_by"))]
2142
let lp = DslBuilder::from(self.logical_plan)
2143
.group_by(self.keys, self.predicates, aggs, None, self.maintain_order)
2144
.build();
2145
LazyFrame::from_logical_plan(lp, self.opt_state)
2146
}
2147
2148
/// Return first n rows of each group
2149
pub fn head(self, n: Option<usize>) -> LazyFrame {
2150
let keys = self
2151
.keys
2152
.iter()
2153
.filter_map(|expr| expr_output_name(expr).ok())
2154
.collect::<Vec<_>>();
2155
2156
self.agg([all().as_expr().head(n)]).explode_impl(
2157
all() - by_name(keys.iter().cloned(), false, false),
2158
ExplodeOptions {
2159
empty_as_null: true,
2160
keep_nulls: true,
2161
},
2162
true,
2163
)
2164
}
2165
2166
/// Return last n rows of each group
2167
pub fn tail(self, n: Option<usize>) -> LazyFrame {
2168
let keys = self
2169
.keys
2170
.iter()
2171
.filter_map(|expr| expr_output_name(expr).ok())
2172
.collect::<Vec<_>>();
2173
2174
self.agg([all().as_expr().tail(n)]).explode_impl(
2175
all() - by_name(keys.iter().cloned(), false, false),
2176
ExplodeOptions {
2177
empty_as_null: true,
2178
keep_nulls: true,
2179
},
2180
true,
2181
)
2182
}
2183
2184
/// Apply a function over the groups as a new DataFrame.
2185
///
2186
/// **It is not recommended that you use this as materializing the DataFrame is very
2187
/// expensive.**
2188
pub fn apply(self, f: PlanCallback<DataFrame, DataFrame>, schema: SchemaRef) -> LazyFrame {
2189
if !self.predicates.is_empty() {
2190
panic!("not yet implemented: `apply` cannot be used with `having` predicates");
2191
}
2192
2193
#[cfg(feature = "dynamic_group_by")]
2194
let options = GroupbyOptions {
2195
dynamic: self.dynamic_options,
2196
rolling: self.rolling_options,
2197
slice: None,
2198
};
2199
2200
#[cfg(not(feature = "dynamic_group_by"))]
2201
let options = GroupbyOptions { slice: None };
2202
2203
let lp = DslPlan::GroupBy {
2204
input: Arc::new(self.logical_plan),
2205
keys: self.keys,
2206
predicates: vec![],
2207
aggs: vec![],
2208
apply: Some((f, schema)),
2209
maintain_order: self.maintain_order,
2210
options: Arc::new(options),
2211
};
2212
LazyFrame::from_logical_plan(lp, self.opt_state)
2213
}
2214
}
2215
2216
#[must_use]
2217
pub struct JoinBuilder {
2218
lf: LazyFrame,
2219
how: JoinType,
2220
other: Option<LazyFrame>,
2221
left_on: Vec<Expr>,
2222
right_on: Vec<Expr>,
2223
allow_parallel: bool,
2224
force_parallel: bool,
2225
suffix: Option<PlSmallStr>,
2226
validation: JoinValidation,
2227
nulls_equal: bool,
2228
coalesce: JoinCoalesce,
2229
maintain_order: MaintainOrderJoin,
2230
build_side: Option<JoinBuildSide>,
2231
}
2232
impl JoinBuilder {
2233
/// Create the `JoinBuilder` with the provided `LazyFrame` as the left table.
2234
pub fn new(lf: LazyFrame) -> Self {
2235
Self {
2236
lf,
2237
other: None,
2238
how: JoinType::Inner,
2239
left_on: vec![],
2240
right_on: vec![],
2241
allow_parallel: true,
2242
force_parallel: false,
2243
suffix: None,
2244
validation: Default::default(),
2245
nulls_equal: false,
2246
coalesce: Default::default(),
2247
maintain_order: Default::default(),
2248
build_side: None,
2249
}
2250
}
2251
2252
/// The right table in the join.
2253
pub fn with(mut self, other: LazyFrame) -> Self {
2254
self.other = Some(other);
2255
self
2256
}
2257
2258
/// Select the join type.
2259
pub fn how(mut self, how: JoinType) -> Self {
2260
self.how = how;
2261
self
2262
}
2263
2264
pub fn validate(mut self, validation: JoinValidation) -> Self {
2265
self.validation = validation;
2266
self
2267
}
2268
2269
/// The expressions you want to join both tables on.
2270
///
2271
/// The passed expressions must be valid in both `LazyFrame`s in the join.
2272
pub fn on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2273
let on = on.as_ref().to_vec();
2274
self.left_on.clone_from(&on);
2275
self.right_on = on;
2276
self
2277
}
2278
2279
/// The expressions you want to join the left table on.
2280
///
2281
/// The passed expressions must be valid in the left table.
2282
pub fn left_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2283
self.left_on = on.as_ref().to_vec();
2284
self
2285
}
2286
2287
/// The expressions you want to join the right table on.
2288
///
2289
/// The passed expressions must be valid in the right table.
2290
pub fn right_on<E: AsRef<[Expr]>>(mut self, on: E) -> Self {
2291
self.right_on = on.as_ref().to_vec();
2292
self
2293
}
2294
2295
/// Allow parallel table evaluation.
2296
pub fn allow_parallel(mut self, allow: bool) -> Self {
2297
self.allow_parallel = allow;
2298
self
2299
}
2300
2301
/// Force parallel table evaluation.
2302
pub fn force_parallel(mut self, force: bool) -> Self {
2303
self.force_parallel = force;
2304
self
2305
}
2306
2307
/// Join on null values. By default null values will never produce matches.
2308
pub fn join_nulls(mut self, nulls_equal: bool) -> Self {
2309
self.nulls_equal = nulls_equal;
2310
self
2311
}
2312
2313
/// Suffix to add duplicate column names in join.
2314
/// Defaults to `"_right"` if this method is never called.
2315
pub fn suffix<S>(mut self, suffix: S) -> Self
2316
where
2317
S: Into<PlSmallStr>,
2318
{
2319
self.suffix = Some(suffix.into());
2320
self
2321
}
2322
2323
/// Whether to coalesce join columns.
2324
pub fn coalesce(mut self, coalesce: JoinCoalesce) -> Self {
2325
self.coalesce = coalesce;
2326
self
2327
}
2328
2329
/// Whether to preserve the row order.
2330
pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
2331
self.maintain_order = maintain_order;
2332
self
2333
}
2334
2335
/// Whether to prefer a specific build side.
2336
pub fn build_side(mut self, build_side: Option<JoinBuildSide>) -> Self {
2337
self.build_side = build_side;
2338
self
2339
}
2340
2341
/// Finish builder
2342
pub fn finish(self) -> LazyFrame {
2343
let opt_state = self.lf.opt_state;
2344
let other = self.other.expect("'with' not set in join builder");
2345
2346
let args = JoinArgs {
2347
how: self.how,
2348
validation: self.validation,
2349
suffix: self.suffix,
2350
slice: None,
2351
nulls_equal: self.nulls_equal,
2352
coalesce: self.coalesce,
2353
maintain_order: self.maintain_order,
2354
build_side: self.build_side,
2355
};
2356
2357
let lp = self
2358
.lf
2359
.get_plan_builder()
2360
.join(
2361
other.logical_plan,
2362
self.left_on,
2363
self.right_on,
2364
JoinOptions {
2365
allow_parallel: self.allow_parallel,
2366
force_parallel: self.force_parallel,
2367
args,
2368
}
2369
.into(),
2370
)
2371
.build();
2372
LazyFrame::from_logical_plan(lp, opt_state)
2373
}
2374
2375
// Finish with join predicates
2376
pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
2377
let opt_state = self.lf.opt_state;
2378
let other = self.other.expect("with not set");
2379
2380
// Decompose `And` conjunctions into their component expressions
2381
fn decompose_and(predicate: Expr, expanded_predicates: &mut Vec<Expr>) {
2382
if let Expr::BinaryExpr {
2383
op: Operator::And,
2384
left,
2385
right,
2386
} = predicate
2387
{
2388
decompose_and((*left).clone(), expanded_predicates);
2389
decompose_and((*right).clone(), expanded_predicates);
2390
} else {
2391
expanded_predicates.push(predicate);
2392
}
2393
}
2394
let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2395
for predicate in predicates {
2396
decompose_and(predicate, &mut expanded_predicates);
2397
}
2398
let predicates: Vec<Expr> = expanded_predicates;
2399
2400
// Decompose `is_between` predicates to allow for cleaner expression of range joins
2401
#[cfg(feature = "is_between")]
2402
let predicates: Vec<Expr> = {
2403
let mut expanded_predicates = Vec::with_capacity(predicates.len() * 2);
2404
for predicate in predicates {
2405
if let Expr::Function {
2406
function: FunctionExpr::Boolean(BooleanFunction::IsBetween { closed }),
2407
input,
2408
..
2409
} = &predicate
2410
{
2411
if let [expr, lower, upper] = input.as_slice() {
2412
match closed {
2413
ClosedInterval::Both => {
2414
expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2415
expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2416
},
2417
ClosedInterval::Right => {
2418
expanded_predicates.push(expr.clone().gt(lower.clone()));
2419
expanded_predicates.push(expr.clone().lt_eq(upper.clone()));
2420
},
2421
ClosedInterval::Left => {
2422
expanded_predicates.push(expr.clone().gt_eq(lower.clone()));
2423
expanded_predicates.push(expr.clone().lt(upper.clone()));
2424
},
2425
ClosedInterval::None => {
2426
expanded_predicates.push(expr.clone().gt(lower.clone()));
2427
expanded_predicates.push(expr.clone().lt(upper.clone()));
2428
},
2429
}
2430
continue;
2431
}
2432
}
2433
expanded_predicates.push(predicate);
2434
}
2435
expanded_predicates
2436
};
2437
2438
let args = JoinArgs {
2439
how: self.how,
2440
validation: self.validation,
2441
suffix: self.suffix,
2442
slice: None,
2443
nulls_equal: self.nulls_equal,
2444
coalesce: self.coalesce,
2445
maintain_order: self.maintain_order,
2446
build_side: self.build_side,
2447
};
2448
let options = JoinOptions {
2449
allow_parallel: self.allow_parallel,
2450
force_parallel: self.force_parallel,
2451
args,
2452
};
2453
2454
let lp = DslPlan::Join {
2455
input_left: Arc::new(self.lf.logical_plan),
2456
input_right: Arc::new(other.logical_plan),
2457
left_on: Default::default(),
2458
right_on: Default::default(),
2459
predicates,
2460
options: Arc::from(options),
2461
};
2462
2463
LazyFrame::from_logical_plan(lp, opt_state)
2464
}
2465
}
2466
2467
pub const BUILD_STREAMING_EXECUTOR: Option<polars_mem_engine::StreamingExecutorBuilder> = {
2468
#[cfg(not(feature = "new_streaming"))]
2469
{
2470
None
2471
}
2472
#[cfg(feature = "new_streaming")]
2473
{
2474
Some(polars_stream::build_streaming_query_executor)
2475
}
2476
};
2477
2478
pub struct CollectBatches {
2479
recv: Receiver<PolarsResult<DataFrame>>,
2480
runner: Option<Box<dyn FnOnce() + Send + 'static>>,
2481
}
2482
2483
impl CollectBatches {
2484
/// Start running the query, if not already.
2485
pub fn start(&mut self) {
2486
if let Some(runner) = self.runner.take() {
2487
runner()
2488
}
2489
}
2490
}
2491
2492
impl Iterator for CollectBatches {
2493
type Item = PolarsResult<DataFrame>;
2494
2495
fn next(&mut self) -> Option<Self::Item> {
2496
self.start();
2497
self.recv.recv().ok()
2498
}
2499
}
2500
2501