Path: blob/main/crates/polars-python/src/functions/lazy.rs
8368 views
use polars::lazy::dsl;1use polars::prelude::*;2use polars_plan::plans::DynLiteralValue;3use polars_plan::prelude::UnionArgs;4use polars_utils::python_function::PythonObject;5use pyo3::exceptions::{PyTypeError, PyValueError};6use pyo3::prelude::*;7use pyo3::types::{PyBool, PyBytes, PyFloat, PyInt, PyString};89use crate::conversion::any_value::py_object_to_any_value;10use crate::conversion::{Wrap, get_lf};11use crate::error::PyPolarsErr;12use crate::expr::ToExprs;13use crate::expr::datatype::PyDataTypeExpr;14use crate::lazyframe::PyOptFlags;15use crate::utils::EnterPolarsExt;16use crate::{PyDataFrame, PyExpr, PyLazyFrame, PySeries, map};1718macro_rules! set_unwrapped_or_0 {19($($var:ident),+ $(,)?) => {20$(let $var = $var.map(|e| e.inner).unwrap_or(dsl::lit(0));)+21};22}2324#[pyfunction]25pub fn rolling_corr(26x: PyExpr,27y: PyExpr,28window_size: IdxSize,29min_periods: IdxSize,30ddof: u8,31) -> PyExpr {32dsl::rolling_corr(33x.inner,34y.inner,35RollingCovOptions {36min_periods,37window_size,38ddof,39},40)41.into()42}4344#[pyfunction]45pub fn rolling_cov(46x: PyExpr,47y: PyExpr,48window_size: IdxSize,49min_periods: IdxSize,50ddof: u8,51) -> PyExpr {52dsl::rolling_cov(53x.inner,54y.inner,55RollingCovOptions {56min_periods,57window_size,58ddof,59},60)61.into()62}6364#[pyfunction]65pub fn arg_sort_by(66by: Vec<PyExpr>,67descending: Vec<bool>,68nulls_last: Vec<bool>,69multithreaded: bool,70maintain_order: bool,71) -> PyExpr {72let by = by.into_iter().map(|e| e.inner).collect::<Vec<Expr>>();73dsl::arg_sort_by(74by,75SortMultipleOptions {76descending,77nulls_last,78multithreaded,79maintain_order,80limit: None,81},82)83.into()84}85#[pyfunction]86pub fn arg_where(condition: PyExpr) -> PyExpr {87dsl::arg_where(condition.inner).into()88}8990#[pyfunction]91pub fn as_struct(exprs: Vec<PyExpr>) -> PyResult<PyExpr> {92let exprs = exprs.to_exprs();93if exprs.is_empty() {94return Err(PyValueError::new_err(95"expected at least 1 expression in 'as_struct'",96));97}98Ok(dsl::as_struct(exprs).into())99}100101#[pyfunction]102pub fn field(names: Vec<String>) -> PyExpr {103dsl::Expr::Field(names.into_iter().map(|x| x.into()).collect()).into()104}105106#[pyfunction]107pub fn coalesce(exprs: Vec<PyExpr>) -> PyExpr {108let exprs = exprs.to_exprs();109dsl::coalesce(&exprs).into()110}111112#[pyfunction]113pub fn col(name: &str) -> PyExpr {114dsl::col(name).into()115}116117#[pyfunction]118pub fn element() -> PyExpr {119dsl::element().into()120}121122fn lfs_to_plans(lfs: Vec<PyLazyFrame>) -> Vec<DslPlan> {123lfs.into_iter()124.map(|lf| lf.ldf.into_inner().logical_plan)125.collect()126}127128#[pyfunction]129pub fn collect_all(130lfs: Vec<PyLazyFrame>,131engine: Wrap<Engine>,132optflags: PyOptFlags,133py: Python<'_>,134) -> PyResult<Vec<PyDataFrame>> {135let plans = lfs_to_plans(lfs);136let dfs = py.enter_polars(|| {137LazyFrame::collect_all_with_engine(plans, engine.0, optflags.inner.into_inner())138})?;139Ok(dfs.into_iter().map(Into::into).collect())140}141142#[pyfunction]143pub fn collect_all_lazy(lfs: Vec<PyLazyFrame>, optflags: PyOptFlags) -> PyResult<PyLazyFrame> {144let plans = lfs_to_plans(lfs);145146for plan in &plans {147if !matches!(plan, DslPlan::Sink { .. }) {148return Err(PyValueError::new_err(149"all LazyFrames must end with a sink to use 'collect_all(lazy=True)'",150));151}152}153154Ok(LazyFrame::from_logical_plan(155DslPlan::SinkMultiple { inputs: plans },156optflags.inner.into_inner(),157)158.into())159}160161#[pyfunction]162pub fn explain_all(lfs: Vec<PyLazyFrame>, optflags: PyOptFlags, py: Python) -> PyResult<String> {163let plans = lfs_to_plans(lfs);164let explained =165py.enter_polars(|| LazyFrame::explain_all(plans, optflags.inner.into_inner()))?;166Ok(explained)167}168169#[pyfunction]170pub fn collect_all_with_callback(171lfs: Vec<PyLazyFrame>,172engine: Wrap<Engine>,173optflags: PyOptFlags,174lambda: Py<PyAny>,175py: Python<'_>,176) {177let plans = lfs178.into_iter()179.map(|lf| lf.ldf.into_inner().logical_plan)180.collect();181let result = py182.enter_polars(|| {183LazyFrame::collect_all_with_engine(plans, engine.0, optflags.inner.into_inner())184})185.map(|dfs| {186dfs.into_iter()187.map(Into::into)188.collect::<Vec<PyDataFrame>>()189});190191Python::attach(|py| match result {192Ok(dfs) => {193lambda.call1(py, (dfs,)).map_err(|err| err.restore(py)).ok();194},195Err(err) => {196lambda197.call1(py, (PyErr::from(err),))198.map_err(|err| err.restore(py))199.ok();200},201})202}203204#[pyfunction]205pub fn concat_lf(206seq: &Bound<'_, PyAny>,207rechunk: bool,208parallel: bool,209to_supertypes: bool,210maintain_order: bool,211) -> PyResult<PyLazyFrame> {212let len = seq.len()?;213let mut lfs = Vec::with_capacity(len);214215for res in seq.try_iter()? {216let item = res?;217let lf = get_lf(&item)?;218lfs.push(lf);219}220221let lf = dsl::concat(222lfs,223UnionArgs {224rechunk,225parallel,226to_supertypes,227maintain_order,228..Default::default()229},230)231.map_err(PyPolarsErr::from)?;232Ok(lf.into())233}234235#[pyfunction]236pub fn concat_list(s: Vec<PyExpr>) -> PyResult<PyExpr> {237let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();238let expr = dsl::concat_list(s).map_err(PyPolarsErr::from)?;239Ok(expr.into())240}241242#[pyfunction]243pub fn concat_arr(s: Vec<PyExpr>) -> PyResult<PyExpr> {244let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();245let expr = dsl::concat_arr(s).map_err(PyPolarsErr::from)?;246Ok(expr.into())247}248249#[pyfunction]250pub fn concat_str(s: Vec<PyExpr>, separator: &str, ignore_nulls: bool) -> PyExpr {251let s = s.into_iter().map(|e| e.inner).collect::<Vec<_>>();252dsl::concat_str(s, separator, ignore_nulls).into()253}254255#[pyfunction]256pub fn len() -> PyExpr {257dsl::len().into()258}259260#[pyfunction]261pub fn cov(a: PyExpr, b: PyExpr, ddof: u8) -> PyExpr {262dsl::cov(a.inner, b.inner, ddof).into()263}264265#[pyfunction]266#[cfg(feature = "trigonometry")]267pub fn arctan2(y: PyExpr, x: PyExpr) -> PyExpr {268y.inner.arctan2(x.inner).into()269}270271#[pyfunction]272pub fn cum_fold(273acc: PyExpr,274lambda: Py<PyAny>,275exprs: Vec<PyExpr>,276returns_scalar: bool,277return_dtype: Option<PyDataTypeExpr>,278include_init: bool,279) -> PyExpr {280let exprs = exprs.to_exprs();281let func = PlanCallback::new_python(PythonObject(lambda));282dsl::cum_fold_exprs(283acc.inner,284func,285exprs,286returns_scalar,287return_dtype.map(|v| v.inner),288include_init,289)290.into()291}292293#[pyfunction]294pub fn cum_reduce(295lambda: Py<PyAny>,296exprs: Vec<PyExpr>,297returns_scalar: bool,298return_dtype: Option<PyDataTypeExpr>,299) -> PyExpr {300let exprs = exprs.to_exprs();301302let func = PlanCallback::new_python(PythonObject(lambda));303dsl::cum_reduce_exprs(func, exprs, returns_scalar, return_dtype.map(|v| v.inner)).into()304}305306#[pyfunction]307#[pyo3(signature = (year, month, day, hour=None, minute=None, second=None, microsecond=None, time_unit=Wrap(TimeUnit::Microseconds), time_zone=Wrap(None), ambiguous=PyExpr::from(dsl::lit(String::from("raise")))))]308pub fn datetime(309year: PyExpr,310month: PyExpr,311day: PyExpr,312hour: Option<PyExpr>,313minute: Option<PyExpr>,314second: Option<PyExpr>,315microsecond: Option<PyExpr>,316time_unit: Wrap<TimeUnit>,317time_zone: Wrap<Option<TimeZone>>,318ambiguous: PyExpr,319) -> PyExpr {320let year = year.inner;321let month = month.inner;322let day = day.inner;323set_unwrapped_or_0!(hour, minute, second, microsecond);324let ambiguous = ambiguous.inner;325let time_unit = time_unit.0;326let time_zone = time_zone.0;327let args = DatetimeArgs {328year,329month,330day,331hour,332minute,333second,334microsecond,335time_unit,336time_zone,337ambiguous,338};339dsl::datetime(args).into()340}341342#[pyfunction]343pub fn concat_lf_diagonal(344lfs: &Bound<'_, PyAny>,345rechunk: bool,346parallel: bool,347to_supertypes: bool,348maintain_order: bool,349) -> PyResult<PyLazyFrame> {350let iter = lfs.try_iter()?;351352let lfs = iter353.map(|item| {354let item = item?;355get_lf(&item)356})357.collect::<PyResult<Vec<_>>>()?;358359let lf = dsl::functions::concat_lf_diagonal(360lfs,361UnionArgs {362rechunk,363parallel,364to_supertypes,365maintain_order,366..Default::default()367},368)369.map_err(PyPolarsErr::from)?;370Ok(lf.into())371}372373#[pyfunction]374pub fn concat_lf_horizontal(375lfs: &Bound<'_, PyAny>,376parallel: bool,377strict: bool,378) -> PyResult<PyLazyFrame> {379let iter = lfs.try_iter()?;380381let lfs = iter382.map(|item| {383let item = item?;384get_lf(&item)385})386.collect::<PyResult<Vec<_>>>()?;387388let lf = dsl::functions::concat_lf_horizontal(389lfs,390HConcatOptions {391parallel,392strict,393broadcast_unit_length: Default::default(),394},395)396.map_err(PyPolarsErr::from)?;397Ok(lf.into())398}399400#[pyfunction]401pub fn concat_expr(e: Vec<PyExpr>, rechunk: bool) -> PyResult<PyExpr> {402let e = e.to_exprs();403let e = dsl::functions::concat_expr(e, rechunk).map_err(PyPolarsErr::from)?;404Ok(e.into())405}406407#[pyfunction]408#[pyo3(signature = (weeks, days, hours, minutes, seconds, milliseconds, microseconds, nanoseconds, time_unit))]409pub fn duration(410weeks: Option<PyExpr>,411days: Option<PyExpr>,412hours: Option<PyExpr>,413minutes: Option<PyExpr>,414seconds: Option<PyExpr>,415milliseconds: Option<PyExpr>,416microseconds: Option<PyExpr>,417nanoseconds: Option<PyExpr>,418time_unit: Wrap<TimeUnit>,419) -> PyExpr {420set_unwrapped_or_0!(421weeks,422days,423hours,424minutes,425seconds,426milliseconds,427microseconds,428nanoseconds,429);430let args = DurationArgs {431weeks,432days,433hours,434minutes,435seconds,436milliseconds,437microseconds,438nanoseconds,439time_unit: time_unit.0,440};441dsl::duration(args).into()442}443444#[pyfunction]445pub fn fold(446acc: PyExpr,447lambda: Py<PyAny>,448exprs: Vec<PyExpr>,449returns_scalar: bool,450return_dtype: Option<PyDataTypeExpr>,451) -> PyExpr {452let exprs = exprs.to_exprs();453let func = PlanCallback::new_python(PythonObject(lambda));454dsl::fold_exprs(455acc.inner,456func,457exprs,458returns_scalar,459return_dtype.map(|w| w.inner),460)461.into()462}463464#[pyfunction]465pub fn lit(value: &Bound<'_, PyAny>, allow_object: bool, is_scalar: bool) -> PyResult<PyExpr> {466let py = value.py();467if value.is_instance_of::<PyBool>() {468let val = value.extract::<bool>()?;469Ok(dsl::lit(val).into())470} else if let Ok(int) = value.cast::<PyInt>() {471let v = int472.extract::<i128>()473.map_err(|e| polars_err!(InvalidOperation: "integer too large for Polars: {e}"))474.map_err(PyPolarsErr::from)?;475Ok(Expr::Literal(LiteralValue::Dyn(DynLiteralValue::Int(v))).into())476} else if let Ok(float) = value.cast::<PyFloat>() {477let val = float.extract::<f64>()?;478Ok(Expr::Literal(LiteralValue::Dyn(DynLiteralValue::Float(val))).into())479} else if let Ok(pystr) = value.cast::<PyString>() {480Ok(dsl::lit(pystr.to_string()).into())481} else if let Ok(series) = value.extract::<PySeries>() {482let s = series.series.into_inner();483if is_scalar {484let av = s485.get(0)486.map_err(|_| PyValueError::new_err("expected at least 1 value"))?;487let av = av.into_static();488Ok(dsl::lit(Scalar::new(s.dtype().clone(), av)).into())489} else {490Ok(dsl::lit(s).into())491}492} else if value.is_none() {493Ok(dsl::lit(Null {}).into())494} else if let Ok(value) = value.cast::<PyBytes>() {495Ok(dsl::lit(value.as_bytes()).into())496} else {497let raise = || {498PyTypeError::new_err(format!(499"cannot create expression literal for value of type {}.\500\n\nHint: Pass `allow_object=True` to accept any value and create a literal of type Object.",501value502.get_type()503.qualname()504.map(|s| s.to_string())505.unwrap_or("unknown".to_owned()),506))507};508509let av = py_object_to_any_value(value, true, allow_object).map_err(|_| raise())?;510match av {511#[cfg(feature = "object")]512AnyValue::ObjectOwned(_) => {513// Check again for object allowance as for cached addresses this is not checked.514if allow_object {515let s = PySeries::new_object(py, "", vec![value.extract()?], false)516.series517.into_inner();518Ok(dsl::lit(s).into())519} else {520Err(raise())521}522},523_ => Ok(Expr::Literal(LiteralValue::from(av)).into()),524}525}526}527528#[pyfunction]529#[pyo3(signature = (pyexpr, lambda, output_type, is_elementwise, returns_scalar))]530pub fn map_expr(531pyexpr: Vec<PyExpr>,532lambda: Py<PyAny>,533output_type: Option<PyDataTypeExpr>,534is_elementwise: bool,535returns_scalar: bool,536) -> PyExpr {537map::lazy::map_expr(&pyexpr, lambda, output_type, is_elementwise, returns_scalar)538}539540#[pyfunction]541pub fn pearson_corr(a: PyExpr, b: PyExpr) -> PyExpr {542dsl::pearson_corr(a.inner, b.inner).into()543}544545#[pyfunction]546pub fn reduce(547lambda: Py<PyAny>,548exprs: Vec<PyExpr>,549returns_scalar: bool,550return_dtype: Option<PyDataTypeExpr>,551) -> PyExpr {552let exprs = exprs.to_exprs();553let func = PlanCallback::new_python(PythonObject(lambda));554dsl::reduce_exprs(func, exprs, returns_scalar, return_dtype.map(|v| v.inner)).into()555}556557#[pyfunction]558#[pyo3(signature = (value, n, dtype=None))]559pub fn repeat(value: PyExpr, n: PyExpr, dtype: Option<Wrap<DataType>>) -> PyExpr {560let mut value = value.inner;561let n = n.inner;562563if let Some(dtype) = dtype {564value = value.cast(dtype.0);565}566567dsl::repeat(value, n).into()568}569570#[pyfunction]571pub fn spearman_rank_corr(a: PyExpr, b: PyExpr, propagate_nans: bool) -> PyExpr {572#[cfg(feature = "propagate_nans")]573{574dsl::spearman_rank_corr(a.inner, b.inner, propagate_nans).into()575}576#[cfg(not(feature = "propagate_nans"))]577{578panic!("activate 'propagate_nans'")579}580}581582#[pyfunction]583#[cfg(feature = "sql")]584pub fn sql_expr(sql: &str) -> PyResult<PyExpr> {585let expr = polars::sql::sql_expr(sql).map_err(PyPolarsErr::from)?;586Ok(expr.into())587}588589590