Path: blob/main/crates/polars-python/src/dataframe/map.rs
7889 views
use polars::frame::row::{Row, rows_to_schema_first_non_null};1use polars_core::utils::CustomIterTools;2use pyo3::IntoPyObjectExt;3use pyo3::prelude::*;4use pyo3::types::PyTuple;56use super::*;7use crate::error::PyPolarsErr;8use crate::prelude::*;9#[cfg(feature = "object")]10use crate::series::construction::series_from_objects;11use crate::{PySeries, raise_err};1213#[pymethods]14impl PyDataFrame {15#[pyo3(signature = (lambda, output_type, inference_size))]16pub fn map_rows(17&self,18py: Python<'_>,19lambda: Bound<PyAny>,20output_type: Option<Wrap<DataType>>,21inference_size: usize,22) -> PyResult<(Py<PyAny>, bool)> {23let df = self.df.read();24let height = df.height();25let col_series: Vec<_> = df26.get_columns()27.iter()28.map(|s| s.as_materialized_series().clone())29.collect();30let mut iters: Vec<_> = col_series.iter().map(|c| c.iter()).collect();31drop(df); // Release lock before calling lambda.3233let lambda_result_iter = (0..height).map(move |_| {34let iter = iters.iter_mut().map(|it| Wrap(it.next().unwrap()));35let tpl = (PyTuple::new(py, iter).unwrap(),);36lambda.call1(tpl)37});3839// Simple case: return type set.40if let Some(output_type) = &output_type {41// If the output type is Object we should not go through AnyValue.42#[cfg(feature = "object")]43if let DataType::Object(_) = output_type.0 {44let objects = lambda_result_iter45.map(|res| {46Ok(ObjectValue {47inner: res?.unbind(),48})49})50.collect::<PyResult<Vec<_>>>()?;51let s = series_from_objects(py, PlSmallStr::from_static("map"), objects);52return Ok((PySeries::from(s).into_py_any(py)?, false));53}5455let avs = lambda_result_iter56.map(|res| res?.extract::<Wrap<AnyValue>>().map(|w| w.0))57.collect::<PyResult<Vec<AnyValue>>>()?;58let s = Series::from_any_values_and_dtype(59PlSmallStr::from_static("map"),60&avs,61&output_type.0,62true,63)64.map_err(PyPolarsErr::from)?;65return Ok((PySeries::from(s).into_py_any(py)?, false));66}6768// Disambiguate returning a DataFrame vs Series by checking the69// first non-null output value.70let mut peek_iter = lambda_result_iter.peekable();71let mut null_count = 0;72while let Some(ret) = peek_iter.peek() {73if let Ok(v) = ret74&& v.is_none()75{76null_count += 1;77peek_iter.next();78} else {79break;80}81}8283let first_val = match peek_iter.peek() {84Some(Ok(v)) => v,85Some(Err(e)) => return Err(e.clone_ref(py)),86None => {87let msg = "The output type of the 'map_rows' function cannot be determined.\n\88All returned values are None, consider setting the 'return_dtype'.";89raise_err!(msg, ComputeError)90},91};9293if let Ok(first_row) = first_val.downcast::<PyTuple>() {94let width = first_row.len();95let out_df = collect_lambda_ret_with_rows_output(96height,97width,98null_count,99inference_size,100peek_iter,101)102.map_err(PyPolarsErr::from)?;103Ok((PyDataFrame::from(out_df).into_py_any(py)?, true))104} else {105let avs = peek_iter106.map(|res| res?.extract::<Wrap<AnyValue>>().map(|w| w.0))107.collect::<PyResult<Vec<AnyValue>>>()?;108let s = Series::from_any_values(PlSmallStr::from_static("map"), &avs, true)109.map_err(PyPolarsErr::from)?;110111let out = if null_count > 0 {112let mut tmp = Series::full_null(s.name().clone(), null_count, s.dtype());113tmp.append_owned(s).map_err(PyPolarsErr::from)?;114tmp115} else {116s117};118Ok((PySeries::from(out).into_py_any(py)?, false))119}120}121}122123fn collect_lambda_ret_with_rows_output<'py>(124height: usize,125width: usize,126init_null_count: usize,127inference_size: usize,128ret_iter: impl Iterator<Item = PyResult<Bound<'py, PyAny>>>,129) -> PolarsResult<DataFrame> {130let null_row = Row::new(vec![AnyValue::Null; width]);131132let mut row_buf = Row::default();133let mut row_iter = ret_iter.map(|retval| {134let retval = retval?;135if retval.is_none() {136Ok(&null_row)137} else {138let tuple = retval.downcast::<PyTuple>().map_err(|_| polars_err!(ComputeError: format!("expected tuple, got {}", retval.get_type().qualname().unwrap())))?;139row_buf.0.clear();140for v in tuple {141let v = v.extract::<Wrap<AnyValue>>().unwrap().0;142row_buf.0.push(v);143}144let ptr = &row_buf as *const Row;145// SAFETY:146// we know that row constructor of polars dataframe does not keep a reference147// to the row. Before we mutate the row buf again, the reference is dropped.148// we only cannot prove it to the compiler.149// we still to this because it save a Vec allocation in a hot loop.150Ok(unsafe { &*ptr })151}152});153154// First rows for schema inference.155let mut buf = Vec::with_capacity(inference_size);156for v in (&mut row_iter).take(inference_size) {157buf.push(v?.clone());158}159160let schema = rows_to_schema_first_non_null(&buf, Some(50))?;161162if init_null_count > 0 {163// SAFETY: we know the iterators size.164let iter = unsafe {165(0..init_null_count)166.map(|_| Ok(&null_row))167.chain(buf.iter().map(Ok))168.chain(row_iter)169.trust_my_length(height)170};171DataFrame::try_from_rows_iter_and_schema(iter, &schema)172} else {173// SAFETY: we know the iterators size.174let iter = unsafe { buf.iter().map(Ok).chain(row_iter).trust_my_length(height) };175DataFrame::try_from_rows_iter_and_schema(iter, &schema)176}177}178179180