Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-python/src/functions/lazy.rs
7889 views
1
use polars::lazy::dsl;
2
use polars::prelude::*;
3
use polars_plan::plans::DynLiteralValue;
4
use polars_plan::prelude::UnionArgs;
5
use polars_utils::python_function::PythonObject;
6
use pyo3::exceptions::{PyTypeError, PyValueError};
7
use pyo3::prelude::*;
8
use pyo3::types::{PyBool, PyBytes, PyFloat, PyInt, PyString};
9
10
use crate::conversion::any_value::py_object_to_any_value;
11
use crate::conversion::{Wrap, get_lf};
12
use crate::error::PyPolarsErr;
13
use crate::expr::ToExprs;
14
use crate::expr::datatype::PyDataTypeExpr;
15
use crate::lazyframe::PyOptFlags;
16
use crate::utils::EnterPolarsExt;
17
use crate::{PyDataFrame, PyExpr, PyLazyFrame, PySeries, map};
18
19
macro_rules! set_unwrapped_or_0 {
20
($($var:ident),+ $(,)?) => {
21
$(let $var = $var.map(|e| e.inner).unwrap_or(dsl::lit(0));)+
22
};
23
}
24
25
#[pyfunction]
26
pub fn rolling_corr(
27
x: PyExpr,
28
y: PyExpr,
29
window_size: IdxSize,
30
min_periods: IdxSize,
31
ddof: u8,
32
) -> PyExpr {
33
dsl::rolling_corr(
34
x.inner,
35
y.inner,
36
RollingCovOptions {
37
min_periods,
38
window_size,
39
ddof,
40
},
41
)
42
.into()
43
}
44
45
#[pyfunction]
46
pub fn rolling_cov(
47
x: PyExpr,
48
y: PyExpr,
49
window_size: IdxSize,
50
min_periods: IdxSize,
51
ddof: u8,
52
) -> PyExpr {
53
dsl::rolling_cov(
54
x.inner,
55
y.inner,
56
RollingCovOptions {
57
min_periods,
58
window_size,
59
ddof,
60
},
61
)
62
.into()
63
}
64
65
#[pyfunction]
66
pub fn arg_sort_by(
67
by: Vec<PyExpr>,
68
descending: Vec<bool>,
69
nulls_last: Vec<bool>,
70
multithreaded: bool,
71
maintain_order: bool,
72
) -> PyExpr {
73
let by = by.into_iter().map(|e| e.inner).collect::<Vec<Expr>>();
74
dsl::arg_sort_by(
75
by,
76
SortMultipleOptions {
77
descending,
78
nulls_last,
79
multithreaded,
80
maintain_order,
81
limit: None,
82
},
83
)
84
.into()
85
}
86
#[pyfunction]
87
pub fn arg_where(condition: PyExpr) -> PyExpr {
88
dsl::arg_where(condition.inner).into()
89
}
90
91
#[pyfunction]
92
pub fn as_struct(exprs: Vec<PyExpr>) -> PyResult<PyExpr> {
93
let exprs = exprs.to_exprs();
94
if exprs.is_empty() {
95
return Err(PyValueError::new_err(
96
"expected at least 1 expression in 'as_struct'",
97
));
98
}
99
Ok(dsl::as_struct(exprs).into())
100
}
101
102
#[pyfunction]
103
pub fn field(names: Vec<String>) -> PyExpr {
104
dsl::Expr::Field(names.into_iter().map(|x| x.into()).collect()).into()
105
}
106
107
#[pyfunction]
108
pub fn coalesce(exprs: Vec<PyExpr>) -> PyExpr {
109
let exprs = exprs.to_exprs();
110
dsl::coalesce(&exprs).into()
111
}
112
113
#[pyfunction]
114
pub fn col(name: &str) -> PyExpr {
115
dsl::col(name).into()
116
}
117
118
#[pyfunction]
119
pub fn element() -> PyExpr {
120
dsl::element().into()
121
}
122
123
fn lfs_to_plans(lfs: Vec<PyLazyFrame>) -> Vec<DslPlan> {
124
lfs.into_iter()
125
.map(|lf| lf.ldf.into_inner().logical_plan)
126
.collect()
127
}
128
129
#[pyfunction]
130
pub fn collect_all(
131
lfs: Vec<PyLazyFrame>,
132
engine: Wrap<Engine>,
133
optflags: PyOptFlags,
134
py: Python<'_>,
135
) -> PyResult<Vec<PyDataFrame>> {
136
let plans = lfs_to_plans(lfs);
137
let dfs = py.enter_polars(|| {
138
LazyFrame::collect_all_with_engine(plans, engine.0, optflags.inner.into_inner())
139
})?;
140
Ok(dfs.into_iter().map(Into::into).collect())
141
}
142
143
#[pyfunction]
144
pub fn explain_all(lfs: Vec<PyLazyFrame>, optflags: PyOptFlags, py: Python) -> PyResult<String> {
145
let plans = lfs_to_plans(lfs);
146
let explained =
147
py.enter_polars(|| LazyFrame::explain_all(plans, optflags.inner.into_inner()))?;
148
Ok(explained)
149
}
150
151
#[pyfunction]
152
pub fn collect_all_with_callback(
153
lfs: Vec<PyLazyFrame>,
154
engine: Wrap<Engine>,
155
optflags: PyOptFlags,
156
lambda: Py<PyAny>,
157
py: Python<'_>,
158
) {
159
let plans = lfs
160
.into_iter()
161
.map(|lf| lf.ldf.into_inner().logical_plan)
162
.collect();
163
let result = py
164
.enter_polars(|| {
165
LazyFrame::collect_all_with_engine(plans, engine.0, optflags.inner.into_inner())
166
})
167
.map(|dfs| {
168
dfs.into_iter()
169
.map(Into::into)
170
.collect::<Vec<PyDataFrame>>()
171
});
172
173
Python::attach(|py| match result {
174
Ok(dfs) => {
175
lambda.call1(py, (dfs,)).map_err(|err| err.restore(py)).ok();
176
},
177
Err(err) => {
178
lambda
179
.call1(py, (PyErr::from(err),))
180
.map_err(|err| err.restore(py))
181
.ok();
182
},
183
})
184
}
185
186
#[pyfunction]
187
pub fn concat_lf(
188
seq: &Bound<'_, PyAny>,
189
rechunk: bool,
190
parallel: bool,
191
to_supertypes: bool,
192
maintain_order: bool,
193
) -> PyResult<PyLazyFrame> {
194
let len = seq.len()?;
195
let mut lfs = Vec::with_capacity(len);
196
197
for res in seq.try_iter()? {
198
let item = res?;
199
let lf = get_lf(&item)?;
200
lfs.push(lf);
201
}
202
203
let lf = dsl::concat(
204
lfs,
205
UnionArgs {
206
rechunk,
207
parallel,
208
to_supertypes,
209
maintain_order,
210
..Default::default()
211
},
212
)
213
.map_err(PyPolarsErr::from)?;
214
Ok(lf.into())
215
}
216
217
#[pyfunction]
218
pub fn concat_list(s: Vec<PyExpr>) -> PyResult<PyExpr> {
219
let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
220
let expr = dsl::concat_list(s).map_err(PyPolarsErr::from)?;
221
Ok(expr.into())
222
}
223
224
#[pyfunction]
225
pub fn concat_arr(s: Vec<PyExpr>) -> PyResult<PyExpr> {
226
let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
227
let expr = dsl::concat_arr(s).map_err(PyPolarsErr::from)?;
228
Ok(expr.into())
229
}
230
231
#[pyfunction]
232
pub fn concat_str(s: Vec<PyExpr>, separator: &str, ignore_nulls: bool) -> PyExpr {
233
let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();
234
dsl::concat_str(s, separator, ignore_nulls).into()
235
}
236
237
#[pyfunction]
238
pub fn len() -> PyExpr {
239
dsl::len().into()
240
}
241
242
#[pyfunction]
243
pub fn cov(a: PyExpr, b: PyExpr, ddof: u8) -> PyExpr {
244
dsl::cov(a.inner, b.inner, ddof).into()
245
}
246
247
#[pyfunction]
248
#[cfg(feature = "trigonometry")]
249
pub fn arctan2(y: PyExpr, x: PyExpr) -> PyExpr {
250
y.inner.arctan2(x.inner).into()
251
}
252
253
#[pyfunction]
254
pub fn cum_fold(
255
acc: PyExpr,
256
lambda: Py<PyAny>,
257
exprs: Vec<PyExpr>,
258
returns_scalar: bool,
259
return_dtype: Option<PyDataTypeExpr>,
260
include_init: bool,
261
) -> PyExpr {
262
let exprs = exprs.to_exprs();
263
let func = PlanCallback::new_python(PythonObject(lambda));
264
dsl::cum_fold_exprs(
265
acc.inner,
266
func,
267
exprs,
268
returns_scalar,
269
return_dtype.map(|v| v.inner),
270
include_init,
271
)
272
.into()
273
}
274
275
#[pyfunction]
276
pub fn cum_reduce(
277
lambda: Py<PyAny>,
278
exprs: Vec<PyExpr>,
279
returns_scalar: bool,
280
return_dtype: Option<PyDataTypeExpr>,
281
) -> PyExpr {
282
let exprs = exprs.to_exprs();
283
284
let func = PlanCallback::new_python(PythonObject(lambda));
285
dsl::cum_reduce_exprs(func, exprs, returns_scalar, return_dtype.map(|v| v.inner)).into()
286
}
287
288
#[pyfunction]
289
#[pyo3(signature = (year, month, day, hour=None, minute=None, second=None, microsecond=None, time_unit=Wrap(TimeUnit::Microseconds), time_zone=Wrap(None), ambiguous=PyExpr::from(dsl::lit(String::from("raise")))))]
290
pub fn datetime(
291
year: PyExpr,
292
month: PyExpr,
293
day: PyExpr,
294
hour: Option<PyExpr>,
295
minute: Option<PyExpr>,
296
second: Option<PyExpr>,
297
microsecond: Option<PyExpr>,
298
time_unit: Wrap<TimeUnit>,
299
time_zone: Wrap<Option<TimeZone>>,
300
ambiguous: PyExpr,
301
) -> PyExpr {
302
let year = year.inner;
303
let month = month.inner;
304
let day = day.inner;
305
set_unwrapped_or_0!(hour, minute, second, microsecond);
306
let ambiguous = ambiguous.inner;
307
let time_unit = time_unit.0;
308
let time_zone = time_zone.0;
309
let args = DatetimeArgs {
310
year,
311
month,
312
day,
313
hour,
314
minute,
315
second,
316
microsecond,
317
time_unit,
318
time_zone,
319
ambiguous,
320
};
321
dsl::datetime(args).into()
322
}
323
324
#[pyfunction]
325
pub fn concat_lf_diagonal(
326
lfs: &Bound<'_, PyAny>,
327
rechunk: bool,
328
parallel: bool,
329
to_supertypes: bool,
330
maintain_order: bool,
331
) -> PyResult<PyLazyFrame> {
332
let iter = lfs.try_iter()?;
333
334
let lfs = iter
335
.map(|item| {
336
let item = item?;
337
get_lf(&item)
338
})
339
.collect::<PyResult<Vec<_>>>()?;
340
341
let lf = dsl::functions::concat_lf_diagonal(
342
lfs,
343
UnionArgs {
344
rechunk,
345
parallel,
346
to_supertypes,
347
maintain_order,
348
..Default::default()
349
},
350
)
351
.map_err(PyPolarsErr::from)?;
352
Ok(lf.into())
353
}
354
355
#[pyfunction]
356
pub fn concat_lf_horizontal(
357
lfs: &Bound<'_, PyAny>,
358
parallel: bool,
359
strict: bool,
360
) -> PyResult<PyLazyFrame> {
361
let iter = lfs.try_iter()?;
362
363
let lfs = iter
364
.map(|item| {
365
let item = item?;
366
get_lf(&item)
367
})
368
.collect::<PyResult<Vec<_>>>()?;
369
370
let args = UnionArgs {
371
rechunk: false, // No need to rechunk with horizontal concatenation
372
parallel,
373
to_supertypes: false,
374
strict,
375
..Default::default()
376
};
377
let lf = dsl::functions::concat_lf_horizontal(lfs, args).map_err(PyPolarsErr::from)?;
378
Ok(lf.into())
379
}
380
381
#[pyfunction]
382
pub fn concat_expr(e: Vec<PyExpr>, rechunk: bool) -> PyResult<PyExpr> {
383
let e = e.to_exprs();
384
let e = dsl::functions::concat_expr(e, rechunk).map_err(PyPolarsErr::from)?;
385
Ok(e.into())
386
}
387
388
#[pyfunction]
389
#[pyo3(signature = (weeks, days, hours, minutes, seconds, milliseconds, microseconds, nanoseconds, time_unit))]
390
pub fn duration(
391
weeks: Option<PyExpr>,
392
days: Option<PyExpr>,
393
hours: Option<PyExpr>,
394
minutes: Option<PyExpr>,
395
seconds: Option<PyExpr>,
396
milliseconds: Option<PyExpr>,
397
microseconds: Option<PyExpr>,
398
nanoseconds: Option<PyExpr>,
399
time_unit: Wrap<TimeUnit>,
400
) -> PyExpr {
401
set_unwrapped_or_0!(
402
weeks,
403
days,
404
hours,
405
minutes,
406
seconds,
407
milliseconds,
408
microseconds,
409
nanoseconds,
410
);
411
let args = DurationArgs {
412
weeks,
413
days,
414
hours,
415
minutes,
416
seconds,
417
milliseconds,
418
microseconds,
419
nanoseconds,
420
time_unit: time_unit.0,
421
};
422
dsl::duration(args).into()
423
}
424
425
#[pyfunction]
426
pub fn fold(
427
acc: PyExpr,
428
lambda: Py<PyAny>,
429
exprs: Vec<PyExpr>,
430
returns_scalar: bool,
431
return_dtype: Option<PyDataTypeExpr>,
432
) -> PyExpr {
433
let exprs = exprs.to_exprs();
434
let func = PlanCallback::new_python(PythonObject(lambda));
435
dsl::fold_exprs(
436
acc.inner,
437
func,
438
exprs,
439
returns_scalar,
440
return_dtype.map(|w| w.inner),
441
)
442
.into()
443
}
444
445
#[pyfunction]
446
pub fn lit(value: &Bound<'_, PyAny>, allow_object: bool, is_scalar: bool) -> PyResult<PyExpr> {
447
let py = value.py();
448
if value.is_instance_of::<PyBool>() {
449
let val = value.extract::<bool>()?;
450
Ok(dsl::lit(val).into())
451
} else if let Ok(int) = value.downcast::<PyInt>() {
452
let v = int
453
.extract::<i128>()
454
.map_err(|e| polars_err!(InvalidOperation: "integer too large for Polars: {e}"))
455
.map_err(PyPolarsErr::from)?;
456
Ok(Expr::Literal(LiteralValue::Dyn(DynLiteralValue::Int(v))).into())
457
} else if let Ok(float) = value.downcast::<PyFloat>() {
458
let val = float.extract::<f64>()?;
459
Ok(Expr::Literal(LiteralValue::Dyn(DynLiteralValue::Float(val))).into())
460
} else if let Ok(pystr) = value.downcast::<PyString>() {
461
Ok(dsl::lit(pystr.to_string()).into())
462
} else if let Ok(series) = value.extract::<PySeries>() {
463
let s = series.series.into_inner();
464
if is_scalar {
465
let av = s
466
.get(0)
467
.map_err(|_| PyValueError::new_err("expected at least 1 value"))?;
468
let av = av.into_static();
469
Ok(dsl::lit(Scalar::new(s.dtype().clone(), av)).into())
470
} else {
471
Ok(dsl::lit(s).into())
472
}
473
} else if value.is_none() {
474
Ok(dsl::lit(Null {}).into())
475
} else if let Ok(value) = value.downcast::<PyBytes>() {
476
Ok(dsl::lit(value.as_bytes()).into())
477
} else {
478
let av = py_object_to_any_value(value, true, allow_object).map_err(|_| {
479
PyTypeError::new_err(
480
format!(
481
"cannot create expression literal for value of type {}.\
482
\n\nHint: Pass `allow_object=True` to accept any value and create a literal of type Object.",
483
value.get_type().qualname().map(|s|s.to_string()).unwrap_or("unknown".to_owned()),
484
)
485
)
486
})?;
487
match av {
488
#[cfg(feature = "object")]
489
AnyValue::ObjectOwned(_) => {
490
let s = PySeries::new_object(py, "", vec![value.extract()?], false)
491
.series
492
.into_inner();
493
Ok(dsl::lit(s).into())
494
},
495
_ => Ok(Expr::Literal(LiteralValue::from(av)).into()),
496
}
497
}
498
}
499
500
#[pyfunction]
501
#[pyo3(signature = (pyexpr, lambda, output_type, is_elementwise, returns_scalar))]
502
pub fn map_expr(
503
pyexpr: Vec<PyExpr>,
504
lambda: Py<PyAny>,
505
output_type: Option<PyDataTypeExpr>,
506
is_elementwise: bool,
507
returns_scalar: bool,
508
) -> PyExpr {
509
map::lazy::map_expr(&pyexpr, lambda, output_type, is_elementwise, returns_scalar)
510
}
511
512
#[pyfunction]
513
pub fn pearson_corr(a: PyExpr, b: PyExpr) -> PyExpr {
514
dsl::pearson_corr(a.inner, b.inner).into()
515
}
516
517
#[pyfunction]
518
pub fn reduce(
519
lambda: Py<PyAny>,
520
exprs: Vec<PyExpr>,
521
returns_scalar: bool,
522
return_dtype: Option<PyDataTypeExpr>,
523
) -> PyExpr {
524
let exprs = exprs.to_exprs();
525
let func = PlanCallback::new_python(PythonObject(lambda));
526
dsl::reduce_exprs(func, exprs, returns_scalar, return_dtype.map(|v| v.inner)).into()
527
}
528
529
#[pyfunction]
530
#[pyo3(signature = (value, n, dtype=None))]
531
pub fn repeat(value: PyExpr, n: PyExpr, dtype: Option<Wrap<DataType>>) -> PyExpr {
532
let mut value = value.inner;
533
let n = n.inner;
534
535
if let Some(dtype) = dtype {
536
value = value.cast(dtype.0);
537
}
538
539
dsl::repeat(value, n).into()
540
}
541
542
#[pyfunction]
543
pub fn spearman_rank_corr(a: PyExpr, b: PyExpr, propagate_nans: bool) -> PyExpr {
544
#[cfg(feature = "propagate_nans")]
545
{
546
dsl::spearman_rank_corr(a.inner, b.inner, propagate_nans).into()
547
}
548
#[cfg(not(feature = "propagate_nans"))]
549
{
550
panic!("activate 'propagate_nans'")
551
}
552
}
553
554
#[pyfunction]
555
#[cfg(feature = "sql")]
556
pub fn sql_expr(sql: &str) -> PyResult<PyExpr> {
557
let expr = polars::sql::sql_expr(sql).map_err(PyPolarsErr::from)?;
558
Ok(expr.into())
559
}
560
561