Path: blob/main/crates/polars-python/src/functions/lazy.rs
7889 views
use polars::lazy::dsl;1use polars::prelude::*;2use polars_plan::plans::DynLiteralValue;3use polars_plan::prelude::UnionArgs;4use polars_utils::python_function::PythonObject;5use pyo3::exceptions::{PyTypeError, PyValueError};6use pyo3::prelude::*;7use pyo3::types::{PyBool, PyBytes, PyFloat, PyInt, PyString};89use crate::conversion::any_value::py_object_to_any_value;10use crate::conversion::{Wrap, get_lf};11use crate::error::PyPolarsErr;12use crate::expr::ToExprs;13use crate::expr::datatype::PyDataTypeExpr;14use crate::lazyframe::PyOptFlags;15use crate::utils::EnterPolarsExt;16use crate::{PyDataFrame, PyExpr, PyLazyFrame, PySeries, map};1718macro_rules! set_unwrapped_or_0 {19($($var:ident),+ $(,)?) => {20$(let $var = $var.map(|e| e.inner).unwrap_or(dsl::lit(0));)+21};22}2324#[pyfunction]25pub fn rolling_corr(26x: PyExpr,27y: PyExpr,28window_size: IdxSize,29min_periods: IdxSize,30ddof: u8,31) -> PyExpr {32dsl::rolling_corr(33x.inner,34y.inner,35RollingCovOptions {36min_periods,37window_size,38ddof,39},40)41.into()42}4344#[pyfunction]45pub fn rolling_cov(46x: PyExpr,47y: PyExpr,48window_size: IdxSize,49min_periods: IdxSize,50ddof: u8,51) -> PyExpr {52dsl::rolling_cov(53x.inner,54y.inner,55RollingCovOptions {56min_periods,57window_size,58ddof,59},60)61.into()62}6364#[pyfunction]65pub fn arg_sort_by(66by: Vec<PyExpr>,67descending: Vec<bool>,68nulls_last: Vec<bool>,69multithreaded: bool,70maintain_order: bool,71) -> PyExpr {72let by = by.into_iter().map(|e| e.inner).collect::<Vec<Expr>>();73dsl::arg_sort_by(74by,75SortMultipleOptions {76descending,77nulls_last,78multithreaded,79maintain_order,80limit: None,81},82)83.into()84}85#[pyfunction]86pub fn arg_where(condition: PyExpr) -> PyExpr {87dsl::arg_where(condition.inner).into()88}8990#[pyfunction]91pub fn as_struct(exprs: Vec<PyExpr>) -> PyResult<PyExpr> {92let exprs = exprs.to_exprs();93if exprs.is_empty() {94return Err(PyValueError::new_err(95"expected at least 1 expression in 'as_struct'",96));97}98Ok(dsl::as_struct(exprs).into())99}100101#[pyfunction]102pub fn field(names: Vec<String>) -> PyExpr {103dsl::Expr::Field(names.into_iter().map(|x| x.into()).collect()).into()104}105106#[pyfunction]107pub fn coalesce(exprs: Vec<PyExpr>) -> PyExpr {108let exprs = exprs.to_exprs();109dsl::coalesce(&exprs).into()110}111112#[pyfunction]113pub fn col(name: &str) -> PyExpr {114dsl::col(name).into()115}116117#[pyfunction]118pub fn element() -> PyExpr {119dsl::element().into()120}121122fn lfs_to_plans(lfs: Vec<PyLazyFrame>) -> Vec<DslPlan> {123lfs.into_iter()124.map(|lf| lf.ldf.into_inner().logical_plan)125.collect()126}127128#[pyfunction]129pub fn collect_all(130lfs: Vec<PyLazyFrame>,131engine: Wrap<Engine>,132optflags: PyOptFlags,133py: Python<'_>,134) -> PyResult<Vec<PyDataFrame>> {135let plans = lfs_to_plans(lfs);136let dfs = py.enter_polars(|| {137LazyFrame::collect_all_with_engine(plans, engine.0, optflags.inner.into_inner())138})?;139Ok(dfs.into_iter().map(Into::into).collect())140}141142#[pyfunction]143pub fn explain_all(lfs: Vec<PyLazyFrame>, optflags: PyOptFlags, py: Python) -> PyResult<String> {144let plans = lfs_to_plans(lfs);145let explained =146py.enter_polars(|| LazyFrame::explain_all(plans, optflags.inner.into_inner()))?;147Ok(explained)148}149150#[pyfunction]151pub fn collect_all_with_callback(152lfs: Vec<PyLazyFrame>,153engine: Wrap<Engine>,154optflags: PyOptFlags,155lambda: Py<PyAny>,156py: Python<'_>,157) {158let plans = lfs159.into_iter()160.map(|lf| lf.ldf.into_inner().logical_plan)161.collect();162let result = py163.enter_polars(|| {164LazyFrame::collect_all_with_engine(plans, engine.0, optflags.inner.into_inner())165})166.map(|dfs| {167dfs.into_iter()168.map(Into::into)169.collect::<Vec<PyDataFrame>>()170});171172Python::attach(|py| match result {173Ok(dfs) => {174lambda.call1(py, (dfs,)).map_err(|err| err.restore(py)).ok();175},176Err(err) => {177lambda178.call1(py, (PyErr::from(err),))179.map_err(|err| err.restore(py))180.ok();181},182})183}184185#[pyfunction]186pub fn concat_lf(187seq: &Bound<'_, PyAny>,188rechunk: bool,189parallel: bool,190to_supertypes: bool,191maintain_order: bool,192) -> PyResult<PyLazyFrame> {193let len = seq.len()?;194let mut lfs = Vec::with_capacity(len);195196for res in seq.try_iter()? {197let item = res?;198let lf = get_lf(&item)?;199lfs.push(lf);200}201202let lf = dsl::concat(203lfs,204UnionArgs {205rechunk,206parallel,207to_supertypes,208maintain_order,209..Default::default()210},211)212.map_err(PyPolarsErr::from)?;213Ok(lf.into())214}215216#[pyfunction]217pub fn concat_list(s: Vec<PyExpr>) -> PyResult<PyExpr> {218let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();219let expr = dsl::concat_list(s).map_err(PyPolarsErr::from)?;220Ok(expr.into())221}222223#[pyfunction]224pub fn concat_arr(s: Vec<PyExpr>) -> PyResult<PyExpr> {225let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();226let expr = dsl::concat_arr(s).map_err(PyPolarsErr::from)?;227Ok(expr.into())228}229230#[pyfunction]231pub fn concat_str(s: Vec<PyExpr>, separator: &str, ignore_nulls: bool) -> PyExpr {232let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();233dsl::concat_str(s, separator, ignore_nulls).into()234}235236#[pyfunction]237pub fn len() -> PyExpr {238dsl::len().into()239}240241#[pyfunction]242pub fn cov(a: PyExpr, b: PyExpr, ddof: u8) -> PyExpr {243dsl::cov(a.inner, b.inner, ddof).into()244}245246#[pyfunction]247#[cfg(feature = "trigonometry")]248pub fn arctan2(y: PyExpr, x: PyExpr) -> PyExpr {249y.inner.arctan2(x.inner).into()250}251252#[pyfunction]253pub fn cum_fold(254acc: PyExpr,255lambda: Py<PyAny>,256exprs: Vec<PyExpr>,257returns_scalar: bool,258return_dtype: Option<PyDataTypeExpr>,259include_init: bool,260) -> PyExpr {261let exprs = exprs.to_exprs();262let func = PlanCallback::new_python(PythonObject(lambda));263dsl::cum_fold_exprs(264acc.inner,265func,266exprs,267returns_scalar,268return_dtype.map(|v| v.inner),269include_init,270)271.into()272}273274#[pyfunction]275pub fn cum_reduce(276lambda: Py<PyAny>,277exprs: Vec<PyExpr>,278returns_scalar: bool,279return_dtype: Option<PyDataTypeExpr>,280) -> PyExpr {281let exprs = exprs.to_exprs();282283let func = PlanCallback::new_python(PythonObject(lambda));284dsl::cum_reduce_exprs(func, exprs, returns_scalar, return_dtype.map(|v| v.inner)).into()285}286287#[pyfunction]288#[pyo3(signature = (year, month, day, hour=None, minute=None, second=None, microsecond=None, time_unit=Wrap(TimeUnit::Microseconds), time_zone=Wrap(None), ambiguous=PyExpr::from(dsl::lit(String::from("raise")))))]289pub fn datetime(290year: PyExpr,291month: PyExpr,292day: PyExpr,293hour: Option<PyExpr>,294minute: Option<PyExpr>,295second: Option<PyExpr>,296microsecond: Option<PyExpr>,297time_unit: Wrap<TimeUnit>,298time_zone: Wrap<Option<TimeZone>>,299ambiguous: PyExpr,300) -> PyExpr {301let year = year.inner;302let month = month.inner;303let day = day.inner;304set_unwrapped_or_0!(hour, minute, second, microsecond);305let ambiguous = ambiguous.inner;306let time_unit = time_unit.0;307let time_zone = time_zone.0;308let args = DatetimeArgs {309year,310month,311day,312hour,313minute,314second,315microsecond,316time_unit,317time_zone,318ambiguous,319};320dsl::datetime(args).into()321}322323#[pyfunction]324pub fn concat_lf_diagonal(325lfs: &Bound<'_, PyAny>,326rechunk: bool,327parallel: bool,328to_supertypes: bool,329maintain_order: bool,330) -> PyResult<PyLazyFrame> {331let iter = lfs.try_iter()?;332333let lfs = iter334.map(|item| {335let item = item?;336get_lf(&item)337})338.collect::<PyResult<Vec<_>>>()?;339340let lf = dsl::functions::concat_lf_diagonal(341lfs,342UnionArgs {343rechunk,344parallel,345to_supertypes,346maintain_order,347..Default::default()348},349)350.map_err(PyPolarsErr::from)?;351Ok(lf.into())352}353354#[pyfunction]355pub fn concat_lf_horizontal(356lfs: &Bound<'_, PyAny>,357parallel: bool,358strict: bool,359) -> PyResult<PyLazyFrame> {360let iter = lfs.try_iter()?;361362let lfs = iter363.map(|item| {364let item = item?;365get_lf(&item)366})367.collect::<PyResult<Vec<_>>>()?;368369let args = UnionArgs {370rechunk: false, // No need to rechunk with horizontal concatenation371parallel,372to_supertypes: false,373strict,374..Default::default()375};376let lf = dsl::functions::concat_lf_horizontal(lfs, args).map_err(PyPolarsErr::from)?;377Ok(lf.into())378}379380#[pyfunction]381pub fn concat_expr(e: Vec<PyExpr>, rechunk: bool) -> PyResult<PyExpr> {382let e = e.to_exprs();383let e = dsl::functions::concat_expr(e, rechunk).map_err(PyPolarsErr::from)?;384Ok(e.into())385}386387#[pyfunction]388#[pyo3(signature = (weeks, days, hours, minutes, seconds, milliseconds, microseconds, nanoseconds, time_unit))]389pub fn duration(390weeks: Option<PyExpr>,391days: Option<PyExpr>,392hours: Option<PyExpr>,393minutes: Option<PyExpr>,394seconds: Option<PyExpr>,395milliseconds: Option<PyExpr>,396microseconds: Option<PyExpr>,397nanoseconds: Option<PyExpr>,398time_unit: Wrap<TimeUnit>,399) -> PyExpr {400set_unwrapped_or_0!(401weeks,402days,403hours,404minutes,405seconds,406milliseconds,407microseconds,408nanoseconds,409);410let args = DurationArgs {411weeks,412days,413hours,414minutes,415seconds,416milliseconds,417microseconds,418nanoseconds,419time_unit: time_unit.0,420};421dsl::duration(args).into()422}423424#[pyfunction]425pub fn fold(426acc: PyExpr,427lambda: Py<PyAny>,428exprs: Vec<PyExpr>,429returns_scalar: bool,430return_dtype: Option<PyDataTypeExpr>,431) -> PyExpr {432let exprs = exprs.to_exprs();433let func = PlanCallback::new_python(PythonObject(lambda));434dsl::fold_exprs(435acc.inner,436func,437exprs,438returns_scalar,439return_dtype.map(|w| w.inner),440)441.into()442}443444#[pyfunction]445pub fn lit(value: &Bound<'_, PyAny>, allow_object: bool, is_scalar: bool) -> PyResult<PyExpr> {446let py = value.py();447if value.is_instance_of::<PyBool>() {448let val = value.extract::<bool>()?;449Ok(dsl::lit(val).into())450} else if let Ok(int) = value.downcast::<PyInt>() {451let v = int452.extract::<i128>()453.map_err(|e| polars_err!(InvalidOperation: "integer too large for Polars: {e}"))454.map_err(PyPolarsErr::from)?;455Ok(Expr::Literal(LiteralValue::Dyn(DynLiteralValue::Int(v))).into())456} else if let Ok(float) = value.downcast::<PyFloat>() {457let val = float.extract::<f64>()?;458Ok(Expr::Literal(LiteralValue::Dyn(DynLiteralValue::Float(val))).into())459} else if let Ok(pystr) = value.downcast::<PyString>() {460Ok(dsl::lit(pystr.to_string()).into())461} else if let Ok(series) = value.extract::<PySeries>() {462let s = series.series.into_inner();463if is_scalar {464let av = s465.get(0)466.map_err(|_| PyValueError::new_err("expected at least 1 value"))?;467let av = av.into_static();468Ok(dsl::lit(Scalar::new(s.dtype().clone(), av)).into())469} else {470Ok(dsl::lit(s).into())471}472} else if value.is_none() {473Ok(dsl::lit(Null {}).into())474} else if let Ok(value) = value.downcast::<PyBytes>() {475Ok(dsl::lit(value.as_bytes()).into())476} else {477let av = py_object_to_any_value(value, true, allow_object).map_err(|_| {478PyTypeError::new_err(479format!(480"cannot create expression literal for value of type {}.\481\n\nHint: Pass `allow_object=True` to accept any value and create a literal of type Object.",482value.get_type().qualname().map(|s|s.to_string()).unwrap_or("unknown".to_owned()),483)484)485})?;486match av {487#[cfg(feature = "object")]488AnyValue::ObjectOwned(_) => {489let s = PySeries::new_object(py, "", vec![value.extract()?], false)490.series491.into_inner();492Ok(dsl::lit(s).into())493},494_ => Ok(Expr::Literal(LiteralValue::from(av)).into()),495}496}497}498499#[pyfunction]500#[pyo3(signature = (pyexpr, lambda, output_type, is_elementwise, returns_scalar))]501pub fn map_expr(502pyexpr: Vec<PyExpr>,503lambda: Py<PyAny>,504output_type: Option<PyDataTypeExpr>,505is_elementwise: bool,506returns_scalar: bool,507) -> PyExpr {508map::lazy::map_expr(&pyexpr, lambda, output_type, is_elementwise, returns_scalar)509}510511#[pyfunction]512pub fn pearson_corr(a: PyExpr, b: PyExpr) -> PyExpr {513dsl::pearson_corr(a.inner, b.inner).into()514}515516#[pyfunction]517pub fn reduce(518lambda: Py<PyAny>,519exprs: Vec<PyExpr>,520returns_scalar: bool,521return_dtype: Option<PyDataTypeExpr>,522) -> PyExpr {523let exprs = exprs.to_exprs();524let func = PlanCallback::new_python(PythonObject(lambda));525dsl::reduce_exprs(func, exprs, returns_scalar, return_dtype.map(|v| v.inner)).into()526}527528#[pyfunction]529#[pyo3(signature = (value, n, dtype=None))]530pub fn repeat(value: PyExpr, n: PyExpr, dtype: Option<Wrap<DataType>>) -> PyExpr {531let mut value = value.inner;532let n = n.inner;533534if let Some(dtype) = dtype {535value = value.cast(dtype.0);536}537538dsl::repeat(value, n).into()539}540541#[pyfunction]542pub fn spearman_rank_corr(a: PyExpr, b: PyExpr, propagate_nans: bool) -> PyExpr {543#[cfg(feature = "propagate_nans")]544{545dsl::spearman_rank_corr(a.inner, b.inner, propagate_nans).into()546}547#[cfg(not(feature = "propagate_nans"))]548{549panic!("activate 'propagate_nans'")550}551}552553#[pyfunction]554#[cfg(feature = "sql")]555pub fn sql_expr(sql: &str) -> PyResult<PyExpr> {556let expr = polars::sql::sql_expr(sql).map_err(PyPolarsErr::from)?;557Ok(expr.into())558}559560561