Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-python/src/functions/lazy.rs
8368 views
1
use polars::lazy::dsl;
2
use polars::prelude::*;
3
use polars_plan::plans::DynLiteralValue;
4
use polars_plan::prelude::UnionArgs;
5
use polars_utils::python_function::PythonObject;
6
use pyo3::exceptions::{PyTypeError, PyValueError};
7
use pyo3::prelude::*;
8
use pyo3::types::{PyBool, PyBytes, PyFloat, PyInt, PyString};
9
10
use crate::conversion::any_value::py_object_to_any_value;
11
use crate::conversion::{Wrap, get_lf};
12
use crate::error::PyPolarsErr;
13
use crate::expr::ToExprs;
14
use crate::expr::datatype::PyDataTypeExpr;
15
use crate::lazyframe::PyOptFlags;
16
use crate::utils::EnterPolarsExt;
17
use crate::{PyDataFrame, PyExpr, PyLazyFrame, PySeries, map};
18
19
macro_rules! set_unwrapped_or_0 {
20
($($var:ident),+ $(,)?) => {
21
$(let $var = $var.map(|e| e.inner).unwrap_or(dsl::lit(0));)+
22
};
23
}
24
25
#[pyfunction]
26
pub fn rolling_corr(
27
x: PyExpr,
28
y: PyExpr,
29
window_size: IdxSize,
30
min_periods: IdxSize,
31
ddof: u8,
32
) -> PyExpr {
33
dsl::rolling_corr(
34
x.inner,
35
y.inner,
36
RollingCovOptions {
37
min_periods,
38
window_size,
39
ddof,
40
},
41
)
42
.into()
43
}
44
45
#[pyfunction]
46
pub fn rolling_cov(
47
x: PyExpr,
48
y: PyExpr,
49
window_size: IdxSize,
50
min_periods: IdxSize,
51
ddof: u8,
52
) -> PyExpr {
53
dsl::rolling_cov(
54
x.inner,
55
y.inner,
56
RollingCovOptions {
57
min_periods,
58
window_size,
59
ddof,
60
},
61
)
62
.into()
63
}
64
65
#[pyfunction]
66
pub fn arg_sort_by(
67
by: Vec<PyExpr>,
68
descending: Vec<bool>,
69
nulls_last: Vec<bool>,
70
multithreaded: bool,
71
maintain_order: bool,
72
) -> PyExpr {
73
let by = by.into_iter().map(|e| e.inner).collect::<Vec<Expr>>();
74
dsl::arg_sort_by(
75
by,
76
SortMultipleOptions {
77
descending,
78
nulls_last,
79
multithreaded,
80
maintain_order,
81
limit: None,
82
},
83
)
84
.into()
85
}
86
#[pyfunction]
87
pub fn arg_where(condition: PyExpr) -> PyExpr {
88
dsl::arg_where(condition.inner).into()
89
}
90
91
#[pyfunction]
92
pub fn as_struct(exprs: Vec<PyExpr>) -> PyResult<PyExpr> {
93
let exprs = exprs.to_exprs();
94
if exprs.is_empty() {
95
return Err(PyValueError::new_err(
96
"expected at least 1 expression in 'as_struct'",
97
));
98
}
99
Ok(dsl::as_struct(exprs).into())
100
}
101
102
#[pyfunction]
103
pub fn field(names: Vec<String>) -> PyExpr {
104
dsl::Expr::Field(names.into_iter().map(|x| x.into()).collect()).into()
105
}
106
107
#[pyfunction]
108
pub fn coalesce(exprs: Vec<PyExpr>) -> PyExpr {
109
let exprs = exprs.to_exprs();
110
dsl::coalesce(&exprs).into()
111
}
112
113
#[pyfunction]
114
pub fn col(name: &str) -> PyExpr {
115
dsl::col(name).into()
116
}
117
118
#[pyfunction]
119
pub fn element() -> PyExpr {
120
dsl::element().into()
121
}
122
123
fn lfs_to_plans(lfs: Vec<PyLazyFrame>) -> Vec<DslPlan> {
124
lfs.into_iter()
125
.map(|lf| lf.ldf.into_inner().logical_plan)
126
.collect()
127
}
128
129
#[pyfunction]
130
pub fn collect_all(
131
lfs: Vec<PyLazyFrame>,
132
engine: Wrap<Engine>,
133
optflags: PyOptFlags,
134
py: Python<'_>,
135
) -> PyResult<Vec<PyDataFrame>> {
136
let plans = lfs_to_plans(lfs);
137
let dfs = py.enter_polars(|| {
138
LazyFrame::collect_all_with_engine(plans, engine.0, optflags.inner.into_inner())
139
})?;
140
Ok(dfs.into_iter().map(Into::into).collect())
141
}
142
143
#[pyfunction]
144
pub fn collect_all_lazy(lfs: Vec<PyLazyFrame>, optflags: PyOptFlags) -> PyResult<PyLazyFrame> {
145
let plans = lfs_to_plans(lfs);
146
147
for plan in &plans {
148
if !matches!(plan, DslPlan::Sink { .. }) {
149
return Err(PyValueError::new_err(
150
"all LazyFrames must end with a sink to use 'collect_all(lazy=True)'",
151
));
152
}
153
}
154
155
Ok(LazyFrame::from_logical_plan(
156
DslPlan::SinkMultiple { inputs: plans },
157
optflags.inner.into_inner(),
158
)
159
.into())
160
}
161
162
#[pyfunction]
163
pub fn explain_all(lfs: Vec<PyLazyFrame>, optflags: PyOptFlags, py: Python) -> PyResult<String> {
164
let plans = lfs_to_plans(lfs);
165
let explained =
166
py.enter_polars(|| LazyFrame::explain_all(plans, optflags.inner.into_inner()))?;
167
Ok(explained)
168
}
169
170
#[pyfunction]
171
pub fn collect_all_with_callback(
172
lfs: Vec<PyLazyFrame>,
173
engine: Wrap<Engine>,
174
optflags: PyOptFlags,
175
lambda: Py<PyAny>,
176
py: Python<'_>,
177
) {
178
let plans = lfs
179
.into_iter()
180
.map(|lf| lf.ldf.into_inner().logical_plan)
181
.collect();
182
let result = py
183
.enter_polars(|| {
184
LazyFrame::collect_all_with_engine(plans, engine.0, optflags.inner.into_inner())
185
})
186
.map(|dfs| {
187
dfs.into_iter()
188
.map(Into::into)
189
.collect::<Vec<PyDataFrame>>()
190
});
191
192
Python::attach(|py| match result {
193
Ok(dfs) => {
194
lambda.call1(py, (dfs,)).map_err(|err| err.restore(py)).ok();
195
},
196
Err(err) => {
197
lambda
198
.call1(py, (PyErr::from(err),))
199
.map_err(|err| err.restore(py))
200
.ok();
201
},
202
})
203
}
204
205
#[pyfunction]
206
pub fn concat_lf(
207
seq: &Bound<'_, PyAny>,
208
rechunk: bool,
209
parallel: bool,
210
to_supertypes: bool,
211
maintain_order: bool,
212
) -> PyResult<PyLazyFrame> {
213
let len = seq.len()?;
214
let mut lfs = Vec::with_capacity(len);
215
216
for res in seq.try_iter()? {
217
let item = res?;
218
let lf = get_lf(&item)?;
219
lfs.push(lf);
220
}
221
222
let lf = dsl::concat(
223
lfs,
224
UnionArgs {
225
rechunk,
226
parallel,
227
to_supertypes,
228
maintain_order,
229
..Default::default()
230
},
231
)
232
.map_err(PyPolarsErr::from)?;
233
Ok(lf.into())
234
}
235
236
#[pyfunction]
237
pub fn concat_list(s: Vec<PyExpr>) -> PyResult<PyExpr> {
238
let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
239
let expr = dsl::concat_list(s).map_err(PyPolarsErr::from)?;
240
Ok(expr.into())
241
}
242
243
#[pyfunction]
244
pub fn concat_arr(s: Vec<PyExpr>) -> PyResult<PyExpr> {
245
let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
246
let expr = dsl::concat_arr(s).map_err(PyPolarsErr::from)?;
247
Ok(expr.into())
248
}
249
250
#[pyfunction]
251
pub fn concat_str(s: Vec<PyExpr>, separator: &str, ignore_nulls: bool) -> PyExpr {
252
let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
253
dsl::concat_str(s, separator, ignore_nulls).into()
254
}
255
256
#[pyfunction]
257
pub fn len() -> PyExpr {
258
dsl::len().into()
259
}
260
261
#[pyfunction]
262
pub fn cov(a: PyExpr, b: PyExpr, ddof: u8) -> PyExpr {
263
dsl::cov(a.inner, b.inner, ddof).into()
264
}
265
266
#[pyfunction]
267
#[cfg(feature = "trigonometry")]
268
pub fn arctan2(y: PyExpr, x: PyExpr) -> PyExpr {
269
y.inner.arctan2(x.inner).into()
270
}
271
272
#[pyfunction]
273
pub fn cum_fold(
274
acc: PyExpr,
275
lambda: Py<PyAny>,
276
exprs: Vec<PyExpr>,
277
returns_scalar: bool,
278
return_dtype: Option<PyDataTypeExpr>,
279
include_init: bool,
280
) -> PyExpr {
281
let exprs = exprs.to_exprs();
282
let func = PlanCallback::new_python(PythonObject(lambda));
283
dsl::cum_fold_exprs(
284
acc.inner,
285
func,
286
exprs,
287
returns_scalar,
288
return_dtype.map(|v| v.inner),
289
include_init,
290
)
291
.into()
292
}
293
294
#[pyfunction]
295
pub fn cum_reduce(
296
lambda: Py<PyAny>,
297
exprs: Vec<PyExpr>,
298
returns_scalar: bool,
299
return_dtype: Option<PyDataTypeExpr>,
300
) -> PyExpr {
301
let exprs = exprs.to_exprs();
302
303
let func = PlanCallback::new_python(PythonObject(lambda));
304
dsl::cum_reduce_exprs(func, exprs, returns_scalar, return_dtype.map(|v| v.inner)).into()
305
}
306
307
#[pyfunction]
308
#[pyo3(signature = (year, month, day, hour=None, minute=None, second=None, microsecond=None, time_unit=Wrap(TimeUnit::Microseconds), time_zone=Wrap(None), ambiguous=PyExpr::from(dsl::lit(String::from("raise")))))]
309
pub fn datetime(
310
year: PyExpr,
311
month: PyExpr,
312
day: PyExpr,
313
hour: Option<PyExpr>,
314
minute: Option<PyExpr>,
315
second: Option<PyExpr>,
316
microsecond: Option<PyExpr>,
317
time_unit: Wrap<TimeUnit>,
318
time_zone: Wrap<Option<TimeZone>>,
319
ambiguous: PyExpr,
320
) -> PyExpr {
321
let year = year.inner;
322
let month = month.inner;
323
let day = day.inner;
324
set_unwrapped_or_0!(hour, minute, second, microsecond);
325
let ambiguous = ambiguous.inner;
326
let time_unit = time_unit.0;
327
let time_zone = time_zone.0;
328
let args = DatetimeArgs {
329
year,
330
month,
331
day,
332
hour,
333
minute,
334
second,
335
microsecond,
336
time_unit,
337
time_zone,
338
ambiguous,
339
};
340
dsl::datetime(args).into()
341
}
342
343
#[pyfunction]
344
pub fn concat_lf_diagonal(
345
lfs: &Bound<'_, PyAny>,
346
rechunk: bool,
347
parallel: bool,
348
to_supertypes: bool,
349
maintain_order: bool,
350
) -> PyResult<PyLazyFrame> {
351
let iter = lfs.try_iter()?;
352
353
let lfs = iter
354
.map(|item| {
355
let item = item?;
356
get_lf(&item)
357
})
358
.collect::<PyResult<Vec<_>>>()?;
359
360
let lf = dsl::functions::concat_lf_diagonal(
361
lfs,
362
UnionArgs {
363
rechunk,
364
parallel,
365
to_supertypes,
366
maintain_order,
367
..Default::default()
368
},
369
)
370
.map_err(PyPolarsErr::from)?;
371
Ok(lf.into())
372
}
373
374
#[pyfunction]
375
pub fn concat_lf_horizontal(
376
lfs: &Bound<'_, PyAny>,
377
parallel: bool,
378
strict: bool,
379
) -> PyResult<PyLazyFrame> {
380
let iter = lfs.try_iter()?;
381
382
let lfs = iter
383
.map(|item| {
384
let item = item?;
385
get_lf(&item)
386
})
387
.collect::<PyResult<Vec<_>>>()?;
388
389
let lf = dsl::functions::concat_lf_horizontal(
390
lfs,
391
HConcatOptions {
392
parallel,
393
strict,
394
broadcast_unit_length: Default::default(),
395
},
396
)
397
.map_err(PyPolarsErr::from)?;
398
Ok(lf.into())
399
}
400
401
#[pyfunction]
402
pub fn concat_expr(e: Vec<PyExpr>, rechunk: bool) -> PyResult<PyExpr> {
403
let e = e.to_exprs();
404
let e = dsl::functions::concat_expr(e, rechunk).map_err(PyPolarsErr::from)?;
405
Ok(e.into())
406
}
407
408
#[pyfunction]
409
#[pyo3(signature = (weeks, days, hours, minutes, seconds, milliseconds, microseconds, nanoseconds, time_unit))]
410
pub fn duration(
411
weeks: Option<PyExpr>,
412
days: Option<PyExpr>,
413
hours: Option<PyExpr>,
414
minutes: Option<PyExpr>,
415
seconds: Option<PyExpr>,
416
milliseconds: Option<PyExpr>,
417
microseconds: Option<PyExpr>,
418
nanoseconds: Option<PyExpr>,
419
time_unit: Wrap<TimeUnit>,
420
) -> PyExpr {
421
set_unwrapped_or_0!(
422
weeks,
423
days,
424
hours,
425
minutes,
426
seconds,
427
milliseconds,
428
microseconds,
429
nanoseconds,
430
);
431
let args = DurationArgs {
432
weeks,
433
days,
434
hours,
435
minutes,
436
seconds,
437
milliseconds,
438
microseconds,
439
nanoseconds,
440
time_unit: time_unit.0,
441
};
442
dsl::duration(args).into()
443
}
444
445
#[pyfunction]
446
pub fn fold(
447
acc: PyExpr,
448
lambda: Py<PyAny>,
449
exprs: Vec<PyExpr>,
450
returns_scalar: bool,
451
return_dtype: Option<PyDataTypeExpr>,
452
) -> PyExpr {
453
let exprs = exprs.to_exprs();
454
let func = PlanCallback::new_python(PythonObject(lambda));
455
dsl::fold_exprs(
456
acc.inner,
457
func,
458
exprs,
459
returns_scalar,
460
return_dtype.map(|w| w.inner),
461
)
462
.into()
463
}
464
465
#[pyfunction]
466
pub fn lit(value: &Bound<'_, PyAny>, allow_object: bool, is_scalar: bool) -> PyResult<PyExpr> {
467
let py = value.py();
468
if value.is_instance_of::<PyBool>() {
469
let val = value.extract::<bool>()?;
470
Ok(dsl::lit(val).into())
471
} else if let Ok(int) = value.cast::<PyInt>() {
472
let v = int
473
.extract::<i128>()
474
.map_err(|e| polars_err!(InvalidOperation: "integer too large for Polars: {e}"))
475
.map_err(PyPolarsErr::from)?;
476
Ok(Expr::Literal(LiteralValue::Dyn(DynLiteralValue::Int(v))).into())
477
} else if let Ok(float) = value.cast::<PyFloat>() {
478
let val = float.extract::<f64>()?;
479
Ok(Expr::Literal(LiteralValue::Dyn(DynLiteralValue::Float(val))).into())
480
} else if let Ok(pystr) = value.cast::<PyString>() {
481
Ok(dsl::lit(pystr.to_string()).into())
482
} else if let Ok(series) = value.extract::<PySeries>() {
483
let s = series.series.into_inner();
484
if is_scalar {
485
let av = s
486
.get(0)
487
.map_err(|_| PyValueError::new_err("expected at least 1 value"))?;
488
let av = av.into_static();
489
Ok(dsl::lit(Scalar::new(s.dtype().clone(), av)).into())
490
} else {
491
Ok(dsl::lit(s).into())
492
}
493
} else if value.is_none() {
494
Ok(dsl::lit(Null {}).into())
495
} else if let Ok(value) = value.cast::<PyBytes>() {
496
Ok(dsl::lit(value.as_bytes()).into())
497
} else {
498
let raise = || {
499
PyTypeError::new_err(format!(
500
"cannot create expression literal for value of type {}.\
501
\n\nHint: Pass `allow_object=True` to accept any value and create a literal of type Object.",
502
value
503
.get_type()
504
.qualname()
505
.map(|s| s.to_string())
506
.unwrap_or("unknown".to_owned()),
507
))
508
};
509
510
let av = py_object_to_any_value(value, true, allow_object).map_err(|_| raise())?;
511
match av {
512
#[cfg(feature = "object")]
513
AnyValue::ObjectOwned(_) => {
514
// Check again for object allowance as for cached addresses this is not checked.
515
if allow_object {
516
let s = PySeries::new_object(py, "", vec![value.extract()?], false)
517
.series
518
.into_inner();
519
Ok(dsl::lit(s).into())
520
} else {
521
Err(raise())
522
}
523
},
524
_ => Ok(Expr::Literal(LiteralValue::from(av)).into()),
525
}
526
}
527
}
528
529
#[pyfunction]
530
#[pyo3(signature = (pyexpr, lambda, output_type, is_elementwise, returns_scalar))]
531
pub fn map_expr(
532
pyexpr: Vec<PyExpr>,
533
lambda: Py<PyAny>,
534
output_type: Option<PyDataTypeExpr>,
535
is_elementwise: bool,
536
returns_scalar: bool,
537
) -> PyExpr {
538
map::lazy::map_expr(&pyexpr, lambda, output_type, is_elementwise, returns_scalar)
539
}
540
541
#[pyfunction]
542
pub fn pearson_corr(a: PyExpr, b: PyExpr) -> PyExpr {
543
dsl::pearson_corr(a.inner, b.inner).into()
544
}
545
546
#[pyfunction]
547
pub fn reduce(
548
lambda: Py<PyAny>,
549
exprs: Vec<PyExpr>,
550
returns_scalar: bool,
551
return_dtype: Option<PyDataTypeExpr>,
552
) -> PyExpr {
553
let exprs = exprs.to_exprs();
554
let func = PlanCallback::new_python(PythonObject(lambda));
555
dsl::reduce_exprs(func, exprs, returns_scalar, return_dtype.map(|v| v.inner)).into()
556
}
557
558
#[pyfunction]
559
#[pyo3(signature = (value, n, dtype=None))]
560
pub fn repeat(value: PyExpr, n: PyExpr, dtype: Option<Wrap<DataType>>) -> PyExpr {
561
let mut value = value.inner;
562
let n = n.inner;
563
564
if let Some(dtype) = dtype {
565
value = value.cast(dtype.0);
566
}
567
568
dsl::repeat(value, n).into()
569
}
570
571
#[pyfunction]
572
pub fn spearman_rank_corr(a: PyExpr, b: PyExpr, propagate_nans: bool) -> PyExpr {
573
#[cfg(feature = "propagate_nans")]
574
{
575
dsl::spearman_rank_corr(a.inner, b.inner, propagate_nans).into()
576
}
577
#[cfg(not(feature = "propagate_nans"))]
578
{
579
panic!("activate 'propagate_nans'")
580
}
581
}
582
583
#[pyfunction]
584
#[cfg(feature = "sql")]
585
pub fn sql_expr(sql: &str) -> PyResult<PyExpr> {
586
let expr = polars::sql::sql_expr(sql).map_err(PyPolarsErr::from)?;
587
Ok(expr.into())
588
}
589
590