Path: blob/main/crates/polars-python/src/dataframe/construction.rs
7889 views
use polars::frame::row::{Row, rows_to_schema_supertypes, rows_to_supertypes};1use polars::prelude::*;2use pyo3::prelude::*;3use pyo3::types::{PyDict, PyMapping, PyString};45use super::PyDataFrame;6use crate::conversion::any_value::py_object_to_any_value;7use crate::conversion::{Wrap, vec_extract_wrapped};8use crate::error::PyPolarsErr;9use crate::interop;10use crate::utils::EnterPolarsExt;1112#[pymethods]13impl PyDataFrame {14#[staticmethod]15#[pyo3(signature = (data, schema=None, infer_schema_length=None))]16pub fn from_rows(17py: Python<'_>,18data: Vec<Wrap<Row>>,19schema: Option<Wrap<Schema>>,20infer_schema_length: Option<usize>,21) -> PyResult<Self> {22let data = vec_extract_wrapped(data);23let schema = schema.map(|wrap| wrap.0);24py.enter_polars(move || finish_from_rows(data, schema, None, infer_schema_length))25}2627#[staticmethod]28#[pyo3(signature = (data, schema=None, schema_overrides=None, strict=true, infer_schema_length=None))]29pub fn from_dicts(30py: Python<'_>,31data: &Bound<PyAny>,32schema: Option<Wrap<Schema>>,33schema_overrides: Option<Wrap<Schema>>,34strict: bool,35infer_schema_length: Option<usize>,36) -> PyResult<Self> {37let schema = schema.map(|wrap| wrap.0);38let schema_overrides = schema_overrides.map(|wrap| wrap.0);3940// determine row extraction strategy from the first item:41// PyDict (faster), or PyMapping (more generic, slower)42let from_mapping = data.len()? > 0 && {43let mut iter = data.try_iter()?;44loop {45match iter.next() {46Some(Ok(item)) if !item.is_none() => break !item.is_instance_of::<PyDict>(),47Some(Err(e)) => return Err(e),48Some(_) => continue,49None => break false,50}51}52};5354// read (or infer) field names, then extract row values55let names = get_schema_names(data, schema.as_ref(), infer_schema_length, from_mapping)?;56let rows = if from_mapping {57mappings_to_rows(data, &names, strict)?58} else {59dicts_to_rows(data, &names, strict)?60};6162let schema = schema.or_else(|| {63Some(columns_names_to_empty_schema(64names.iter().map(String::as_str),65))66});67py.enter_polars(move || {68finish_from_rows(rows, schema, schema_overrides, infer_schema_length)69})70}7172#[staticmethod]73pub fn from_arrow_record_batches(74py: Python<'_>,75rb: Vec<Bound<PyAny>>,76schema: Bound<PyAny>,77) -> PyResult<Self> {78let df = interop::arrow::to_rust::to_rust_df(py, &rb, schema)?;79Ok(Self::from(df))80}81}8283fn finish_from_rows(84rows: Vec<Row>,85schema: Option<Schema>,86schema_overrides: Option<Schema>,87infer_schema_length: Option<usize>,88) -> PyResult<PyDataFrame> {89let schema = if let Some(mut schema) = schema {90resolve_schema_overrides(&mut schema, schema_overrides);91update_schema_from_rows(&mut schema, &rows, infer_schema_length)?;92schema93} else {94rows_to_schema_supertypes(&rows, infer_schema_length).map_err(PyPolarsErr::from)?95};9697let df = DataFrame::from_rows_and_schema(&rows, &schema).map_err(PyPolarsErr::from)?;98Ok(df.into())99}100101fn update_schema_from_rows(102schema: &mut Schema,103rows: &[Row],104infer_schema_length: Option<usize>,105) -> PyResult<()> {106let schema_is_complete = schema.iter_values().all(|dtype| dtype.is_known());107if schema_is_complete {108return Ok(());109}110111// TODO: Only infer dtypes for columns with an unknown dtype112let inferred_dtypes =113rows_to_supertypes(rows, infer_schema_length).map_err(PyPolarsErr::from)?;114let inferred_dtypes_slice = inferred_dtypes.as_slice();115116for (i, dtype) in schema.iter_values_mut().enumerate() {117if !dtype.is_known() {118*dtype = inferred_dtypes_slice.get(i).ok_or_else(|| {119polars_err!(SchemaMismatch: "the number of columns in the schema does not match the data")120})121.map_err(PyPolarsErr::from)?122.clone();123}124}125Ok(())126}127128/// Override the data type of certain schema fields.129///130/// Overrides for nonexistent columns are ignored.131fn resolve_schema_overrides(schema: &mut Schema, schema_overrides: Option<Schema>) {132if let Some(overrides) = schema_overrides {133for (name, dtype) in overrides.into_iter() {134schema.set_dtype(name.as_str(), dtype);135}136}137}138139fn columns_names_to_empty_schema<'a, I>(column_names: I) -> Schema140where141I: IntoIterator<Item = &'a str>,142{143let fields = column_names144.into_iter()145.map(|c| Field::new(c.into(), DataType::Unknown(Default::default())));146Schema::from_iter(fields)147}148149fn dicts_to_rows(150data: &Bound<'_, PyAny>,151names: &[String],152strict: bool,153) -> PyResult<Vec<Row<'static>>> {154let py = data.py();155let mut rows = Vec::with_capacity(data.len()?);156let null_row = Row::new(vec![AnyValue::Null; names.len()]);157158// pre-convert keys/names so we don't repeatedly create them in the loop159let py_keys: Vec<Py<PyString>> = names.iter().map(|k| PyString::new(py, k).into()).collect();160161for d in data.try_iter()? {162let d = d?;163if d.is_none() {164rows.push(null_row.clone())165} else {166let d = d.downcast::<PyDict>()?;167let mut row = Vec::with_capacity(names.len());168for k in &py_keys {169let val = match d.get_item(k)? {170None => AnyValue::Null,171Some(py_val) => py_object_to_any_value(&py_val.as_borrowed(), strict, true)?,172};173row.push(val)174}175rows.push(Row(row))176}177}178Ok(rows)179}180181fn mappings_to_rows(182data: &Bound<'_, PyAny>,183names: &[String],184strict: bool,185) -> PyResult<Vec<Row<'static>>> {186let py = data.py();187let mut rows = Vec::with_capacity(data.len()?);188let null_row = Row::new(vec![AnyValue::Null; names.len()]);189190// pre-convert keys/names so we don't repeatedly create them in the loop191let py_keys: Vec<Py<PyString>> = names.iter().map(|k| PyString::new(py, k).into()).collect();192193for d in data.try_iter()? {194let d = d?;195if d.is_none() {196rows.push(null_row.clone())197} else {198let d = d.downcast::<PyMapping>()?;199let mut row = Vec::with_capacity(names.len());200for k in &py_keys {201let py_val = d.get_item(k)?;202let val = if py_val.is_none() {203AnyValue::Null204} else {205py_object_to_any_value(&py_val, strict, true)?206};207row.push(val)208}209rows.push(Row(row))210}211}212Ok(rows)213}214215/// Either read the given schema, or infer the schema names from the data.216fn get_schema_names(217data: &Bound<PyAny>,218schema: Option<&Schema>,219infer_schema_length: Option<usize>,220from_mapping: bool,221) -> PyResult<Vec<String>> {222if let Some(schema) = schema {223Ok(schema.iter_names().map(|n| n.to_string()).collect())224} else {225let data_len = data.len()?;226let infer_schema_length = infer_schema_length227.map(|n| std::cmp::max(1, n))228.unwrap_or(data_len);229230if from_mapping {231infer_schema_names_from_mapping_data(data, infer_schema_length)232} else {233infer_schema_names_from_dict_data(data, infer_schema_length)234}235}236}237238/// Infer schema names from an iterable of dictionaries.239///240/// The resulting schema order is determined by the order241/// in which the names are encountered in the data.242fn infer_schema_names_from_dict_data(243data: &Bound<PyAny>,244infer_schema_length: usize,245) -> PyResult<Vec<String>> {246let mut names = PlIndexSet::new();247for d in data.try_iter()?.take(infer_schema_length) {248let d = d?;249if !d.is_none() {250let d = d.downcast::<PyDict>()?;251let keys = d.keys().iter();252for name in keys {253let name = name.extract::<String>()?;254names.insert(name);255}256}257}258Ok(names.into_iter().collect())259}260261/// Infer schema names from an iterable of mapping objects.262///263/// The resulting schema order is determined by the order264/// in which the names are encountered in the data.265fn infer_schema_names_from_mapping_data(266data: &Bound<PyAny>,267infer_schema_length: usize,268) -> PyResult<Vec<String>> {269let mut names = PlIndexSet::new();270for d in data.try_iter()?.take(infer_schema_length) {271let d = d?;272if !d.is_none() {273let d = d.downcast::<PyMapping>()?;274let keys = d.keys()?;275for name in keys {276let name = name.extract::<String>()?;277names.insert(name);278}279}280}281Ok(names.into_iter().collect())282}283284285