Path: blob/main/crates/polars-python/src/lazyframe/general.rs
7889 views
use std::collections::HashMap;1use std::num::NonZeroUsize;23use either::Either;4use polars::io::RowIndex;5use polars::time::*;6use polars_core::prelude::*;7#[cfg(feature = "parquet")]8use polars_parquet::arrow::write::StatisticsOptions;9use polars_plan::dsl::ScanSources;10use polars_plan::plans::{AExpr, HintIR, IR, Sorted};11use polars_utils::arena::{Arena, Node};12use polars_utils::python_function::PythonObject;13use pyo3::exceptions::{PyTypeError, PyValueError};14use pyo3::prelude::*;15use pyo3::pybacked::PyBackedStr;16use pyo3::types::{PyDict, PyDictMethods, PyList};1718use super::{PyLazyFrame, PyOptFlags};19use crate::error::PyPolarsErr;20use crate::expr::ToExprs;21use crate::expr::datatype::PyDataTypeExpr;22use crate::expr::selector::PySelector;23use crate::interop::arrow::to_rust::pyarrow_schema_to_rust;24use crate::io::scan_options::PyScanOptions;25use crate::io::sink_options::PySinkOptions;26use crate::io::sink_output::PyFileSinkDestination;27use crate::lazyframe::visit::NodeTraverser;28use crate::prelude::*;29use crate::utils::{EnterPolarsExt, to_py_err};30use crate::{PyDataFrame, PyExpr, PyLazyGroupBy};3132fn pyobject_to_first_path_and_scan_sources(33obj: Py<PyAny>,34) -> PyResult<(Option<PlPath>, ScanSources)> {35use crate::file::{PythonScanSourceInput, get_python_scan_source_input};36Ok(match get_python_scan_source_input(obj, false)? {37PythonScanSourceInput::Path(path) => (38Some(path.clone()),39ScanSources::Paths(FromIterator::from_iter([path])),40),41PythonScanSourceInput::File(file) => (None, ScanSources::Files([file.into()].into())),42PythonScanSourceInput::Buffer(buff) => (None, ScanSources::Buffers([buff].into())),43})44}4546fn post_opt_callback(47lambda: &Py<PyAny>,48root: Node,49lp_arena: &mut Arena<IR>,50expr_arena: &mut Arena<AExpr>,51duration_since_start: Option<std::time::Duration>,52) -> PolarsResult<()> {53Python::attach(|py| {54let nt = NodeTraverser::new(root, std::mem::take(lp_arena), std::mem::take(expr_arena));5556// Get a copy of the arenas.57let arenas = nt.get_arenas();5859// Pass the node visitor which allows the python callback to replace parts of the query plan.60// Remove "cuda" or specify better once we have multiple post-opt callbacks.61lambda62.call1(py, (nt, duration_since_start.map(|t| t.as_nanos() as u64)))63.map_err(|e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e))?;6465// Unpack the arenas.66// At this point the `nt` is useless.6768std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap());69std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap());7071Ok(())72})73}7475#[pymethods]76#[allow(clippy::should_implement_trait)]77impl PyLazyFrame {78#[staticmethod]79#[cfg(feature = "json")]80#[allow(clippy::too_many_arguments)]81#[pyo3(signature = (82source, sources, infer_schema_length, schema, schema_overrides, batch_size, n_rows, low_memory, rechunk,83row_index, ignore_errors, include_file_paths, cloud_options, credential_provider, retries, file_cache_ttl84))]85fn new_from_ndjson(86source: Option<Py<PyAny>>,87sources: Wrap<ScanSources>,88infer_schema_length: Option<usize>,89schema: Option<Wrap<Schema>>,90schema_overrides: Option<Wrap<Schema>>,91batch_size: Option<NonZeroUsize>,92n_rows: Option<usize>,93low_memory: bool,94rechunk: bool,95row_index: Option<(String, IdxSize)>,96ignore_errors: bool,97include_file_paths: Option<String>,98cloud_options: Option<Vec<(String, String)>>,99credential_provider: Option<Py<PyAny>>,100retries: usize,101file_cache_ttl: Option<u64>,102) -> PyResult<Self> {103use cloud::credential_provider::PlCredentialProvider;104let row_index = row_index.map(|(name, offset)| RowIndex {105name: name.into(),106offset,107});108109let sources = sources.0;110let (first_path, sources) = match source {111None => (sources.first_path().map(|p| p.into_owned()), sources),112Some(source) => pyobject_to_first_path_and_scan_sources(source)?,113};114115let mut r = LazyJsonLineReader::new_with_sources(sources);116117#[cfg(feature = "cloud")]118if let Some(first_path) = first_path {119let first_path_url = first_path.to_str();120121let mut cloud_options = parse_cloud_options(122CloudScheme::from_uri(first_path_url),123cloud_options.unwrap_or_default(),124)?;125cloud_options = cloud_options126.with_max_retries(retries)127.with_credential_provider(128credential_provider.map(PlCredentialProvider::from_python_builder),129);130131if let Some(file_cache_ttl) = file_cache_ttl {132cloud_options.file_cache_ttl = file_cache_ttl;133}134135r = r.with_cloud_options(Some(cloud_options));136};137138let lf = r139.with_infer_schema_length(infer_schema_length.and_then(NonZeroUsize::new))140.with_batch_size(batch_size)141.with_n_rows(n_rows)142.low_memory(low_memory)143.with_rechunk(rechunk)144.with_schema(schema.map(|schema| Arc::new(schema.0)))145.with_schema_overwrite(schema_overrides.map(|x| Arc::new(x.0)))146.with_row_index(row_index)147.with_ignore_errors(ignore_errors)148.with_include_file_paths(include_file_paths.map(|x| x.into()))149.finish()150.map_err(PyPolarsErr::from)?;151152Ok(lf.into())153}154155#[staticmethod]156#[cfg(feature = "csv")]157#[pyo3(signature = (source, sources, separator, has_header, ignore_errors, skip_rows, skip_lines, n_rows, cache, overwrite_dtype,158low_memory, comment_prefix, quote_char, null_values, missing_utf8_is_empty_string,159infer_schema_length, with_schema_modify, rechunk, skip_rows_after_header,160encoding, row_index, try_parse_dates, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, glob, schema,161cloud_options, credential_provider, retries, file_cache_ttl, include_file_paths162)163)]164fn new_from_csv(165source: Option<Py<PyAny>>,166sources: Wrap<ScanSources>,167separator: &str,168has_header: bool,169ignore_errors: bool,170skip_rows: usize,171skip_lines: usize,172n_rows: Option<usize>,173cache: bool,174overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,175low_memory: bool,176comment_prefix: Option<&str>,177quote_char: Option<&str>,178null_values: Option<Wrap<NullValues>>,179missing_utf8_is_empty_string: bool,180infer_schema_length: Option<usize>,181with_schema_modify: Option<Py<PyAny>>,182rechunk: bool,183skip_rows_after_header: usize,184encoding: Wrap<CsvEncoding>,185row_index: Option<(String, IdxSize)>,186try_parse_dates: bool,187eol_char: &str,188raise_if_empty: bool,189truncate_ragged_lines: bool,190decimal_comma: bool,191glob: bool,192schema: Option<Wrap<Schema>>,193cloud_options: Option<Vec<(String, String)>>,194credential_provider: Option<Py<PyAny>>,195retries: usize,196file_cache_ttl: Option<u64>,197include_file_paths: Option<String>,198) -> PyResult<Self> {199#[cfg(feature = "cloud")]200use cloud::credential_provider::PlCredentialProvider;201202let null_values = null_values.map(|w| w.0);203let quote_char = quote_char.and_then(|s| s.as_bytes().first()).copied();204let separator = separator205.as_bytes()206.first()207.ok_or_else(|| polars_err!(InvalidOperation: "`separator` cannot be empty"))208.copied()209.map_err(PyPolarsErr::from)?;210let eol_char = eol_char211.as_bytes()212.first()213.ok_or_else(|| polars_err!(InvalidOperation: "`eol_char` cannot be empty"))214.copied()215.map_err(PyPolarsErr::from)?;216let row_index = row_index.map(|(name, offset)| RowIndex {217name: name.into(),218offset,219});220221let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {222overwrite_dtype223.into_iter()224.map(|(name, dtype)| Field::new((&*name).into(), dtype.0))225.collect::<Schema>()226});227228let sources = sources.0;229let (first_path, sources) = match source {230None => (sources.first_path().map(|p| p.into_owned()), sources),231Some(source) => pyobject_to_first_path_and_scan_sources(source)?,232};233234let mut r = LazyCsvReader::new_with_sources(sources);235236#[cfg(feature = "cloud")]237if let Some(first_path) = first_path {238let first_path_url = first_path.to_str();239240let mut cloud_options = parse_cloud_options(241CloudScheme::from_uri(first_path_url),242cloud_options.unwrap_or_default(),243)?;244if let Some(file_cache_ttl) = file_cache_ttl {245cloud_options.file_cache_ttl = file_cache_ttl;246}247cloud_options = cloud_options248.with_max_retries(retries)249.with_credential_provider(250credential_provider.map(PlCredentialProvider::from_python_builder),251);252r = r.with_cloud_options(Some(cloud_options));253}254255let mut r = r256.with_infer_schema_length(infer_schema_length)257.with_separator(separator)258.with_has_header(has_header)259.with_ignore_errors(ignore_errors)260.with_skip_rows(skip_rows)261.with_skip_lines(skip_lines)262.with_n_rows(n_rows)263.with_cache(cache)264.with_dtype_overwrite(overwrite_dtype.map(Arc::new))265.with_schema(schema.map(|schema| Arc::new(schema.0)))266.with_low_memory(low_memory)267.with_comment_prefix(comment_prefix.map(|x| x.into()))268.with_quote_char(quote_char)269.with_eol_char(eol_char)270.with_rechunk(rechunk)271.with_skip_rows_after_header(skip_rows_after_header)272.with_encoding(encoding.0)273.with_row_index(row_index)274.with_try_parse_dates(try_parse_dates)275.with_null_values(null_values)276.with_missing_is_null(!missing_utf8_is_empty_string)277.with_truncate_ragged_lines(truncate_ragged_lines)278.with_decimal_comma(decimal_comma)279.with_glob(glob)280.with_raise_if_empty(raise_if_empty)281.with_include_file_paths(include_file_paths.map(|x| x.into()));282283if let Some(lambda) = with_schema_modify {284let f = |schema: Schema| {285let iter = schema.iter_names().map(|s| s.as_str());286Python::attach(|py| {287let names = PyList::new(py, iter).unwrap();288289let out = lambda.call1(py, (names,)).expect("python function failed");290let new_names = out291.extract::<Vec<String>>(py)292.expect("python function should return List[str]");293polars_ensure!(new_names.len() == schema.len(),294ShapeMismatch: "The length of the new names list should be equal to or less than the original column length",295);296Ok(schema297.iter_values()298.zip(new_names)299.map(|(dtype, name)| Field::new(name.into(), dtype.clone()))300.collect())301})302};303r = r.with_schema_modify(f).map_err(PyPolarsErr::from)?304}305306Ok(r.finish().map_err(PyPolarsErr::from)?.into())307}308309#[cfg(feature = "parquet")]310#[staticmethod]311#[pyo3(signature = (312sources, schema, scan_options, parallel, low_memory, use_statistics313))]314fn new_from_parquet(315sources: Wrap<ScanSources>,316schema: Option<Wrap<Schema>>,317scan_options: PyScanOptions,318parallel: Wrap<ParallelStrategy>,319low_memory: bool,320use_statistics: bool,321) -> PyResult<Self> {322use crate::utils::to_py_err;323324let parallel = parallel.0;325326let options = ParquetOptions {327schema: schema.map(|x| Arc::new(x.0)),328parallel,329low_memory,330use_statistics,331};332333let sources = sources.0;334let first_path = sources.first_path().map(|p| p.into_owned());335336let unified_scan_args = scan_options.extract_unified_scan_args(337first_path338.as_ref()339.and_then(|p| CloudScheme::from_uri(p.to_str())),340)?;341342let lf: LazyFrame = DslBuilder::scan_parquet(sources, options, unified_scan_args)343.map_err(to_py_err)?344.build()345.into();346347Ok(lf.into())348}349350#[cfg(feature = "ipc")]351#[staticmethod]352#[pyo3(signature = (sources, scan_options, file_cache_ttl))]353fn new_from_ipc(354sources: Wrap<ScanSources>,355scan_options: PyScanOptions,356file_cache_ttl: Option<u64>,357) -> PyResult<Self> {358let options = IpcScanOptions;359360let sources = sources.0;361let first_path = sources.first_path().map(|p| p.into_owned());362363let mut unified_scan_args = scan_options.extract_unified_scan_args(364first_path365.as_ref()366.map(|p| p.as_ref())367.and_then(|x| CloudScheme::from_uri(x.to_str())),368)?;369370if let Some(file_cache_ttl) = file_cache_ttl {371unified_scan_args372.cloud_options373.get_or_insert_default()374.file_cache_ttl = file_cache_ttl;375}376377let lf = LazyFrame::scan_ipc_sources(sources, options, unified_scan_args)378.map_err(PyPolarsErr::from)?;379Ok(lf.into())380}381382#[cfg(feature = "scan_lines")]383#[staticmethod]384#[pyo3(signature = (sources, scan_options, name, file_cache_ttl))]385fn new_from_scan_lines(386sources: Wrap<ScanSources>,387scan_options: PyScanOptions,388name: PyBackedStr,389file_cache_ttl: Option<u64>,390) -> PyResult<Self> {391let sources = sources.0;392let first_path = sources.first_path().map(|p| p.into_owned());393394let mut unified_scan_args = scan_options.extract_unified_scan_args(395first_path396.as_ref()397.map(|p| p.as_ref())398.and_then(|x| CloudScheme::from_uri(x.to_str())),399)?;400401if let Some(file_cache_ttl) = file_cache_ttl {402unified_scan_args403.cloud_options404.get_or_insert_default()405.file_cache_ttl = file_cache_ttl;406}407408let dsl: DslPlan = DslBuilder::scan_lines(sources, unified_scan_args, (&*name).into())409.map_err(to_py_err)?410.build();411let lf: LazyFrame = dsl.into();412413Ok(lf.into())414}415416#[staticmethod]417#[pyo3(signature = (418dataset_object419))]420fn new_from_dataset_object(dataset_object: Py<PyAny>) -> PyResult<Self> {421let lf =422LazyFrame::from(DslBuilder::scan_python_dataset(PythonObject(dataset_object)).build())423.into();424425Ok(lf)426}427428#[staticmethod]429fn scan_from_python_function_arrow_schema(430schema: &Bound<'_, PyList>,431scan_fn: Py<PyAny>,432pyarrow: bool,433validate_schema: bool,434is_pure: bool,435) -> PyResult<Self> {436let schema = Arc::new(pyarrow_schema_to_rust(schema)?);437438Ok(LazyFrame::scan_from_python_function(439Either::Right(schema),440scan_fn,441pyarrow,442validate_schema,443is_pure,444)445.into())446}447448#[staticmethod]449fn scan_from_python_function_pl_schema(450schema: Vec<(PyBackedStr, Wrap<DataType>)>,451scan_fn: Py<PyAny>,452pyarrow: bool,453validate_schema: bool,454is_pure: bool,455) -> PyResult<Self> {456let schema = Arc::new(Schema::from_iter(457schema458.into_iter()459.map(|(name, dt)| Field::new((&*name).into(), dt.0)),460));461Ok(LazyFrame::scan_from_python_function(462Either::Right(schema),463scan_fn,464pyarrow,465validate_schema,466is_pure,467)468.into())469}470471#[staticmethod]472fn scan_from_python_function_schema_function(473schema_fn: Py<PyAny>,474scan_fn: Py<PyAny>,475validate_schema: bool,476is_pure: bool,477) -> PyResult<Self> {478Ok(LazyFrame::scan_from_python_function(479Either::Left(schema_fn),480scan_fn,481false,482validate_schema,483is_pure,484)485.into())486}487488fn describe_plan(&self, py: Python) -> PyResult<String> {489py.enter_polars(|| self.ldf.read().describe_plan())490}491492fn describe_optimized_plan(&self, py: Python) -> PyResult<String> {493py.enter_polars(|| self.ldf.read().describe_optimized_plan())494}495496fn describe_plan_tree(&self, py: Python) -> PyResult<String> {497py.enter_polars(|| self.ldf.read().describe_plan_tree())498}499500fn describe_optimized_plan_tree(&self, py: Python) -> PyResult<String> {501py.enter_polars(|| self.ldf.read().describe_optimized_plan_tree())502}503504fn to_dot(&self, py: Python<'_>, optimized: bool) -> PyResult<String> {505py.enter_polars(|| self.ldf.read().to_dot(optimized))506}507508#[cfg(feature = "new_streaming")]509fn to_dot_streaming_phys(&self, py: Python, optimized: bool) -> PyResult<String> {510py.enter_polars(|| self.ldf.read().to_dot_streaming_phys(optimized))511}512513fn sort(514&self,515by_column: &str,516descending: bool,517nulls_last: bool,518maintain_order: bool,519multithreaded: bool,520) -> Self {521let ldf = self.ldf.read().clone();522ldf.sort(523[by_column],524SortMultipleOptions {525descending: vec![descending],526nulls_last: vec![nulls_last],527multithreaded,528maintain_order,529limit: None,530},531)532.into()533}534535fn sort_by_exprs(536&self,537by: Vec<PyExpr>,538descending: Vec<bool>,539nulls_last: Vec<bool>,540maintain_order: bool,541multithreaded: bool,542) -> Self {543let ldf = self.ldf.read().clone();544let exprs = by.to_exprs();545ldf.sort_by_exprs(546exprs,547SortMultipleOptions {548descending,549nulls_last,550maintain_order,551multithreaded,552limit: None,553},554)555.into()556}557558fn top_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {559let ldf = self.ldf.read().clone();560let exprs = by.to_exprs();561ldf.top_k(562k,563exprs,564SortMultipleOptions::new().with_order_descending_multi(reverse),565)566.into()567}568569fn bottom_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {570let ldf = self.ldf.read().clone();571let exprs = by.to_exprs();572ldf.bottom_k(573k,574exprs,575SortMultipleOptions::new().with_order_descending_multi(reverse),576)577.into()578}579580fn cache(&self) -> Self {581let ldf = self.ldf.read().clone();582ldf.cache().into()583}584585#[pyo3(signature = (optflags))]586fn with_optimizations(&self, optflags: PyOptFlags) -> Self {587let ldf = self.ldf.read().clone();588ldf.with_optimizations(optflags.inner.into_inner()).into()589}590591#[pyo3(signature = (lambda_post_opt))]592fn profile(593&self,594py: Python<'_>,595lambda_post_opt: Option<Py<PyAny>>,596) -> PyResult<(PyDataFrame, PyDataFrame)> {597let (df, time_df) = py.enter_polars(|| {598let ldf = self.ldf.read().clone();599if let Some(lambda) = lambda_post_opt {600ldf._profile_post_opt(|root, lp_arena, expr_arena, duration_since_start| {601post_opt_callback(&lambda, root, lp_arena, expr_arena, duration_since_start)602})603} else {604ldf.profile()605}606})?;607Ok((df.into(), time_df.into()))608}609610#[pyo3(signature = (engine, lambda_post_opt))]611fn collect(612&self,613py: Python<'_>,614engine: Wrap<Engine>,615lambda_post_opt: Option<Py<PyAny>>,616) -> PyResult<PyDataFrame> {617py.enter_polars_df(|| {618let ldf = self.ldf.read().clone();619if let Some(lambda) = lambda_post_opt {620ldf._collect_post_opt(|root, lp_arena, expr_arena, _| {621post_opt_callback(&lambda, root, lp_arena, expr_arena, None)622})623} else {624ldf.collect_with_engine(engine.0)625}626})627}628629#[pyo3(signature = (engine, lambda))]630fn collect_with_callback(631&self,632py: Python<'_>,633engine: Wrap<Engine>,634lambda: Py<PyAny>,635) -> PyResult<()> {636py.enter_polars_ok(|| {637let ldf = self.ldf.read().clone();638639polars_core::POOL.spawn(move || {640let result = ldf641.collect_with_engine(engine.0)642.map(PyDataFrame::new)643.map_err(PyPolarsErr::from);644645Python::attach(|py| match result {646Ok(df) => {647lambda.call1(py, (df,)).map_err(|err| err.restore(py)).ok();648},649Err(err) => {650lambda651.call1(py, (PyErr::from(err),))652.map_err(|err| err.restore(py))653.ok();654},655});656});657})658}659660#[cfg(feature = "parquet")]661#[pyo3(signature = (662target, sink_options, compression, compression_level, statistics, row_group_size, data_page_size,663metadata, field_overwrites,664))]665fn sink_parquet(666&self,667py: Python<'_>,668target: PyFileSinkDestination,669sink_options: PySinkOptions,670compression: &str,671compression_level: Option<i32>,672statistics: Wrap<StatisticsOptions>,673row_group_size: Option<usize>,674data_page_size: Option<usize>,675metadata: Wrap<Option<KeyValueMetadata>>,676field_overwrites: Vec<Wrap<ParquetFieldOverwrites>>,677) -> PyResult<PyLazyFrame> {678let compression = parse_parquet_compression(compression, compression_level)?;679680let options = ParquetWriteOptions {681compression,682statistics: statistics.0,683row_group_size,684data_page_size,685key_value_metadata: metadata.0,686field_overwrites: field_overwrites.into_iter().map(|f| f.0).collect(),687};688689let target = target.extract_file_sink_destination()?;690let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;691692py.enter_polars(|| {693self.ldf694.read()695.clone()696.sink(target, FileType::Parquet(options), unified_sink_args)697.into()698})699.map(Into::into)700.map_err(Into::into)701}702703#[cfg(feature = "ipc")]704#[pyo3(signature = (705target, sink_options, compression, compat_level706))]707fn sink_ipc(708&self,709py: Python<'_>,710target: PyFileSinkDestination,711sink_options: PySinkOptions,712compression: Wrap<Option<IpcCompression>>,713compat_level: PyCompatLevel,714) -> PyResult<PyLazyFrame> {715let options = IpcWriterOptions {716compression: compression.0,717compat_level: compat_level.0,718..Default::default()719};720721let target = target.extract_file_sink_destination()?;722let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;723724py.enter_polars(|| {725self.ldf726.read()727.clone()728.sink(target, FileType::Ipc(options), unified_sink_args)729.into()730})731.map(Into::into)732.map_err(Into::into)733}734735#[cfg(feature = "csv")]736#[pyo3(signature = (737target, sink_options, include_bom, include_header, separator, line_terminator, quote_char, batch_size,738datetime_format, date_format, time_format, float_scientific, float_precision, decimal_comma, null_value,739quote_style740))]741fn sink_csv(742&self,743py: Python<'_>,744target: PyFileSinkDestination,745sink_options: PySinkOptions,746include_bom: bool,747include_header: bool,748separator: u8,749line_terminator: String,750quote_char: u8,751batch_size: NonZeroUsize,752datetime_format: Option<String>,753date_format: Option<String>,754time_format: Option<String>,755float_scientific: Option<bool>,756float_precision: Option<usize>,757decimal_comma: bool,758null_value: Option<String>,759quote_style: Option<Wrap<QuoteStyle>>,760) -> PyResult<PyLazyFrame> {761let quote_style = quote_style.map_or(QuoteStyle::default(), |wrap| wrap.0);762let null_value = null_value.unwrap_or(SerializeOptions::default().null);763764let serialize_options = SerializeOptions {765date_format,766time_format,767datetime_format,768float_scientific,769float_precision,770decimal_comma,771separator,772quote_char,773null: null_value,774line_terminator,775quote_style,776};777778let options = CsvWriterOptions {779include_bom,780include_header,781batch_size,782serialize_options,783};784785let target = target.extract_file_sink_destination()?;786let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;787788py.enter_polars(|| {789self.ldf790.read()791.clone()792.sink(target, FileType::Csv(options), unified_sink_args)793.into()794})795.map(Into::into)796.map_err(Into::into)797}798799#[allow(clippy::too_many_arguments)]800#[cfg(feature = "json")]801#[pyo3(signature = (target, sink_options))]802fn sink_json(803&self,804py: Python<'_>,805target: PyFileSinkDestination,806sink_options: PySinkOptions,807) -> PyResult<PyLazyFrame> {808let options = JsonWriterOptions {};809810let target = target.extract_file_sink_destination()?;811let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;812813py.enter_polars(|| {814self.ldf815.read()816.clone()817.sink(target, FileType::Json(options), unified_sink_args)818.into()819})820.map(Into::into)821.map_err(Into::into)822}823824#[pyo3(signature = (function, maintain_order, chunk_size))]825pub fn sink_batches(826&self,827py: Python<'_>,828function: Py<PyAny>,829maintain_order: bool,830chunk_size: Option<NonZeroUsize>,831) -> PyResult<PyLazyFrame> {832let ldf = self.ldf.read().clone();833py.enter_polars(|| {834ldf.sink_batches(835PlanCallback::new_python(PythonObject(function)),836maintain_order,837chunk_size,838)839})840.map(Into::into)841.map_err(Into::into)842}843844fn filter(&self, predicate: PyExpr) -> Self {845self.ldf.read().clone().filter(predicate.inner).into()846}847848fn remove(&self, predicate: PyExpr) -> Self {849let ldf = self.ldf.read().clone();850ldf.remove(predicate.inner).into()851}852853fn select(&self, exprs: Vec<PyExpr>) -> Self {854let ldf = self.ldf.read().clone();855let exprs = exprs.to_exprs();856ldf.select(exprs).into()857}858859fn select_seq(&self, exprs: Vec<PyExpr>) -> Self {860let ldf = self.ldf.read().clone();861let exprs = exprs.to_exprs();862ldf.select_seq(exprs).into()863}864865fn group_by(&self, by: Vec<PyExpr>, maintain_order: bool) -> PyLazyGroupBy {866let ldf = self.ldf.read().clone();867let by = by.to_exprs();868let lazy_gb = if maintain_order {869ldf.group_by_stable(by)870} else {871ldf.group_by(by)872};873874PyLazyGroupBy { lgb: Some(lazy_gb) }875}876877fn rolling(878&self,879index_column: PyExpr,880period: &str,881offset: &str,882closed: Wrap<ClosedWindow>,883by: Vec<PyExpr>,884) -> PyResult<PyLazyGroupBy> {885let closed_window = closed.0;886let ldf = self.ldf.read().clone();887let by = by888.into_iter()889.map(|pyexpr| pyexpr.inner)890.collect::<Vec<_>>();891let lazy_gb = ldf.rolling(892index_column.inner,893by,894RollingGroupOptions {895index_column: "".into(),896period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,897offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,898closed_window,899},900);901902Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })903}904905fn group_by_dynamic(906&self,907index_column: PyExpr,908every: &str,909period: &str,910offset: &str,911label: Wrap<Label>,912include_boundaries: bool,913closed: Wrap<ClosedWindow>,914group_by: Vec<PyExpr>,915start_by: Wrap<StartBy>,916) -> PyResult<PyLazyGroupBy> {917let closed_window = closed.0;918let group_by = group_by919.into_iter()920.map(|pyexpr| pyexpr.inner)921.collect::<Vec<_>>();922let ldf = self.ldf.read().clone();923let lazy_gb = ldf.group_by_dynamic(924index_column.inner,925group_by,926DynamicGroupOptions {927every: Duration::try_parse(every).map_err(PyPolarsErr::from)?,928period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,929offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,930label: label.0,931include_boundaries,932closed_window,933start_by: start_by.0,934..Default::default()935},936);937938Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })939}940941fn with_context(&self, contexts: Vec<Self>) -> Self {942let contexts = contexts943.into_iter()944.map(|ldf| ldf.ldf.into_inner())945.collect::<Vec<_>>();946self.ldf.read().clone().with_context(contexts).into()947}948949#[cfg(feature = "asof_join")]950#[pyo3(signature = (other, left_on, right_on, left_by, right_by, allow_parallel, force_parallel, suffix, strategy, tolerance, tolerance_str, coalesce, allow_eq, check_sortedness))]951fn join_asof(952&self,953other: Self,954left_on: PyExpr,955right_on: PyExpr,956left_by: Option<Vec<PyBackedStr>>,957right_by: Option<Vec<PyBackedStr>>,958allow_parallel: bool,959force_parallel: bool,960suffix: String,961strategy: Wrap<AsofStrategy>,962tolerance: Option<Wrap<AnyValue<'_>>>,963tolerance_str: Option<String>,964coalesce: bool,965allow_eq: bool,966check_sortedness: bool,967) -> PyResult<Self> {968let coalesce = if coalesce {969JoinCoalesce::CoalesceColumns970} else {971JoinCoalesce::KeepColumns972};973let ldf = self.ldf.read().clone();974let other = other.ldf.into_inner();975let left_on = left_on.inner;976let right_on = right_on.inner;977Ok(ldf978.join_builder()979.with(other)980.left_on([left_on])981.right_on([right_on])982.allow_parallel(allow_parallel)983.force_parallel(force_parallel)984.coalesce(coalesce)985.how(JoinType::AsOf(Box::new(AsOfOptions {986strategy: strategy.0,987left_by: left_by.map(strings_to_pl_smallstr),988right_by: right_by.map(strings_to_pl_smallstr),989tolerance: tolerance.map(|t| {990let av = t.0.into_static();991let dtype = av.dtype();992Scalar::new(dtype, av)993}),994tolerance_str: tolerance_str.map(|s| s.into()),995allow_eq,996check_sortedness,997})))998.suffix(suffix)999.finish()1000.into())1001}10021003#[pyo3(signature = (other, left_on, right_on, allow_parallel, force_parallel, nulls_equal, how, suffix, validate, maintain_order, coalesce=None))]1004fn join(1005&self,1006other: Self,1007left_on: Vec<PyExpr>,1008right_on: Vec<PyExpr>,1009allow_parallel: bool,1010force_parallel: bool,1011nulls_equal: bool,1012how: Wrap<JoinType>,1013suffix: String,1014validate: Wrap<JoinValidation>,1015maintain_order: Wrap<MaintainOrderJoin>,1016coalesce: Option<bool>,1017) -> PyResult<Self> {1018let coalesce = match coalesce {1019None => JoinCoalesce::JoinSpecific,1020Some(true) => JoinCoalesce::CoalesceColumns,1021Some(false) => JoinCoalesce::KeepColumns,1022};1023let ldf = self.ldf.read().clone();1024let other = other.ldf.into_inner();1025let left_on = left_on1026.into_iter()1027.map(|pyexpr| pyexpr.inner)1028.collect::<Vec<_>>();1029let right_on = right_on1030.into_iter()1031.map(|pyexpr| pyexpr.inner)1032.collect::<Vec<_>>();10331034Ok(ldf1035.join_builder()1036.with(other)1037.left_on(left_on)1038.right_on(right_on)1039.allow_parallel(allow_parallel)1040.force_parallel(force_parallel)1041.join_nulls(nulls_equal)1042.how(how.0)1043.suffix(suffix)1044.validate(validate.0)1045.coalesce(coalesce)1046.maintain_order(maintain_order.0)1047.finish()1048.into())1049}10501051fn join_where(&self, other: Self, predicates: Vec<PyExpr>, suffix: String) -> PyResult<Self> {1052let ldf = self.ldf.read().clone();1053let other = other.ldf.into_inner();10541055let predicates = predicates.to_exprs();10561057Ok(ldf1058.join_builder()1059.with(other)1060.suffix(suffix)1061.join_where(predicates)1062.into())1063}10641065fn with_columns(&self, exprs: Vec<PyExpr>) -> Self {1066let ldf = self.ldf.read().clone();1067ldf.with_columns(exprs.to_exprs()).into()1068}10691070fn with_columns_seq(&self, exprs: Vec<PyExpr>) -> Self {1071let ldf = self.ldf.read().clone();1072ldf.with_columns_seq(exprs.to_exprs()).into()1073}10741075fn match_to_schema<'py>(1076&self,1077schema: Wrap<Schema>,1078missing_columns: &Bound<'py, PyAny>,1079missing_struct_fields: &Bound<'py, PyAny>,1080extra_columns: Wrap<ExtraColumnsPolicy>,1081extra_struct_fields: &Bound<'py, PyAny>,1082integer_cast: &Bound<'py, PyAny>,1083float_cast: &Bound<'py, PyAny>,1084) -> PyResult<Self> {1085fn parse_missing_columns<'py>(1086schema: &Schema,1087missing_columns: &Bound<'py, PyAny>,1088) -> PyResult<Vec<MissingColumnsPolicyOrExpr>> {1089let mut out = Vec::with_capacity(schema.len());1090if let Ok(policy) = missing_columns.extract::<Wrap<MissingColumnsPolicyOrExpr>>() {1091out.extend(std::iter::repeat_n(policy.0, schema.len()));1092} else if let Ok(dict) = missing_columns.downcast::<PyDict>() {1093out.extend(std::iter::repeat_n(1094MissingColumnsPolicyOrExpr::Raise,1095schema.len(),1096));1097for (key, value) in dict.iter() {1098let key = key.extract::<String>()?;1099let value = value.extract::<Wrap<MissingColumnsPolicyOrExpr>>()?;1100out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;1101}1102} else {1103return Err(PyTypeError::new_err("Invalid value for `missing_columns`"));1104}1105Ok(out)1106}1107fn parse_missing_struct_fields<'py>(1108schema: &Schema,1109missing_struct_fields: &Bound<'py, PyAny>,1110) -> PyResult<Vec<MissingColumnsPolicy>> {1111let mut out = Vec::with_capacity(schema.len());1112if let Ok(policy) = missing_struct_fields.extract::<Wrap<MissingColumnsPolicy>>() {1113out.extend(std::iter::repeat_n(policy.0, schema.len()));1114} else if let Ok(dict) = missing_struct_fields.downcast::<PyDict>() {1115out.extend(std::iter::repeat_n(1116MissingColumnsPolicy::Raise,1117schema.len(),1118));1119for (key, value) in dict.iter() {1120let key = key.extract::<String>()?;1121let value = value.extract::<Wrap<MissingColumnsPolicy>>()?;1122out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;1123}1124} else {1125return Err(PyTypeError::new_err(1126"Invalid value for `missing_struct_fields`",1127));1128}1129Ok(out)1130}1131fn parse_extra_struct_fields<'py>(1132schema: &Schema,1133extra_struct_fields: &Bound<'py, PyAny>,1134) -> PyResult<Vec<ExtraColumnsPolicy>> {1135let mut out = Vec::with_capacity(schema.len());1136if let Ok(policy) = extra_struct_fields.extract::<Wrap<ExtraColumnsPolicy>>() {1137out.extend(std::iter::repeat_n(policy.0, schema.len()));1138} else if let Ok(dict) = extra_struct_fields.downcast::<PyDict>() {1139out.extend(std::iter::repeat_n(ExtraColumnsPolicy::Raise, schema.len()));1140for (key, value) in dict.iter() {1141let key = key.extract::<String>()?;1142let value = value.extract::<Wrap<ExtraColumnsPolicy>>()?;1143out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;1144}1145} else {1146return Err(PyTypeError::new_err(1147"Invalid value for `extra_struct_fields`",1148));1149}1150Ok(out)1151}1152fn parse_cast<'py>(1153schema: &Schema,1154cast: &Bound<'py, PyAny>,1155) -> PyResult<Vec<UpcastOrForbid>> {1156let mut out = Vec::with_capacity(schema.len());1157if let Ok(policy) = cast.extract::<Wrap<UpcastOrForbid>>() {1158out.extend(std::iter::repeat_n(policy.0, schema.len()));1159} else if let Ok(dict) = cast.downcast::<PyDict>() {1160out.extend(std::iter::repeat_n(UpcastOrForbid::Forbid, schema.len()));1161for (key, value) in dict.iter() {1162let key = key.extract::<String>()?;1163let value = value.extract::<Wrap<UpcastOrForbid>>()?;1164out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;1165}1166} else {1167return Err(PyTypeError::new_err(1168"Invalid value for `integer_cast` / `float_cast`",1169));1170}1171Ok(out)1172}11731174let missing_columns = parse_missing_columns(&schema.0, missing_columns)?;1175let missing_struct_fields = parse_missing_struct_fields(&schema.0, missing_struct_fields)?;1176let extra_struct_fields = parse_extra_struct_fields(&schema.0, extra_struct_fields)?;1177let integer_cast = parse_cast(&schema.0, integer_cast)?;1178let float_cast = parse_cast(&schema.0, float_cast)?;11791180let per_column = (0..schema.0.len())1181.map(|i| MatchToSchemaPerColumn {1182missing_columns: missing_columns[i].clone(),1183missing_struct_fields: missing_struct_fields[i],1184extra_struct_fields: extra_struct_fields[i],1185integer_cast: integer_cast[i],1186float_cast: float_cast[i],1187})1188.collect();11891190let ldf = self.ldf.read().clone();1191Ok(ldf1192.match_to_schema(Arc::new(schema.0), per_column, extra_columns.0)1193.into())1194}11951196fn pipe_with_schema(&self, callback: Py<PyAny>) -> Self {1197let ldf = self.ldf.read().clone();1198let function = PythonObject(callback);1199ldf.pipe_with_schema(PlanCallback::new_python(function))1200.into()1201}12021203fn rename(&self, existing: Vec<String>, new: Vec<String>, strict: bool) -> Self {1204let ldf = self.ldf.read().clone();1205ldf.rename(existing, new, strict).into()1206}12071208fn reverse(&self) -> Self {1209let ldf = self.ldf.read().clone();1210ldf.reverse().into()1211}12121213#[pyo3(signature = (n, fill_value=None))]1214fn shift(&self, n: PyExpr, fill_value: Option<PyExpr>) -> Self {1215let lf = self.ldf.read().clone();1216let out = match fill_value {1217Some(v) => lf.shift_and_fill(n.inner, v.inner),1218None => lf.shift(n.inner),1219};1220out.into()1221}12221223fn fill_nan(&self, fill_value: PyExpr) -> Self {1224let ldf = self.ldf.read().clone();1225ldf.fill_nan(fill_value.inner).into()1226}12271228fn min(&self) -> Self {1229let ldf = self.ldf.read().clone();1230let out = ldf.min();1231out.into()1232}12331234fn max(&self) -> Self {1235let ldf = self.ldf.read().clone();1236let out = ldf.max();1237out.into()1238}12391240fn sum(&self) -> Self {1241let ldf = self.ldf.read().clone();1242let out = ldf.sum();1243out.into()1244}12451246fn mean(&self) -> Self {1247let ldf = self.ldf.read().clone();1248let out = ldf.mean();1249out.into()1250}12511252fn std(&self, ddof: u8) -> Self {1253let ldf = self.ldf.read().clone();1254let out = ldf.std(ddof);1255out.into()1256}12571258fn var(&self, ddof: u8) -> Self {1259let ldf = self.ldf.read().clone();1260let out = ldf.var(ddof);1261out.into()1262}12631264fn median(&self) -> Self {1265let ldf = self.ldf.read().clone();1266let out = ldf.median();1267out.into()1268}12691270fn quantile(&self, quantile: PyExpr, interpolation: Wrap<QuantileMethod>) -> Self {1271let ldf = self.ldf.read().clone();1272let out = ldf.quantile(quantile.inner, interpolation.0);1273out.into()1274}12751276fn explode(&self, subset: PySelector, empty_as_null: bool, keep_nulls: bool) -> Self {1277self.ldf1278.read()1279.clone()1280.explode(1281subset.inner,1282ExplodeOptions {1283empty_as_null,1284keep_nulls,1285},1286)1287.into()1288}12891290fn null_count(&self) -> Self {1291let ldf = self.ldf.read().clone();1292ldf.null_count().into()1293}12941295#[pyo3(signature = (maintain_order, subset, keep))]1296fn unique(1297&self,1298maintain_order: bool,1299subset: Option<Vec<PyExpr>>,1300keep: Wrap<UniqueKeepStrategy>,1301) -> Self {1302let ldf = self.ldf.read().clone();1303let subset = subset.map(|exprs| exprs.into_iter().map(|e| e.inner).collect());1304match maintain_order {1305true => ldf.unique_stable_generic(subset, keep.0),1306false => ldf.unique_generic(subset, keep.0),1307}1308.into()1309}13101311fn drop_nans(&self, subset: Option<PySelector>) -> Self {1312self.ldf1313.read()1314.clone()1315.drop_nans(subset.map(|e| e.inner))1316.into()1317}13181319fn drop_nulls(&self, subset: Option<PySelector>) -> Self {1320self.ldf1321.read()1322.clone()1323.drop_nulls(subset.map(|e| e.inner))1324.into()1325}13261327#[pyo3(signature = (offset, len=None))]1328fn slice(&self, offset: i64, len: Option<IdxSize>) -> Self {1329let ldf = self.ldf.read().clone();1330ldf.slice(offset, len.unwrap_or(IdxSize::MAX)).into()1331}13321333fn tail(&self, n: IdxSize) -> Self {1334let ldf = self.ldf.read().clone();1335ldf.tail(n).into()1336}13371338#[cfg(feature = "pivot")]1339#[pyo3(signature = (on, on_columns, index, values, agg, maintain_order, separator))]1340fn pivot(1341&self,1342on: PySelector,1343on_columns: PyDataFrame,1344index: PySelector,1345values: PySelector,1346agg: PyExpr,1347maintain_order: bool,1348separator: String,1349) -> Self {1350let ldf = self.ldf.read().clone();1351ldf.pivot(1352on.inner,1353Arc::new(on_columns.df.read().clone()),1354index.inner,1355values.inner,1356agg.inner,1357maintain_order,1358separator.into(),1359)1360.into()1361}13621363#[cfg(feature = "pivot")]1364#[pyo3(signature = (on, index, value_name, variable_name))]1365fn unpivot(1366&self,1367on: Option<PySelector>,1368index: PySelector,1369value_name: Option<String>,1370variable_name: Option<String>,1371) -> Self {1372let args = UnpivotArgsDSL {1373on: on.map(|on| on.inner),1374index: index.inner,1375value_name: value_name.map(|s| s.into()),1376variable_name: variable_name.map(|s| s.into()),1377};13781379let ldf = self.ldf.read().clone();1380ldf.unpivot(args).into()1381}13821383#[pyo3(signature = (name, offset=None))]1384fn with_row_index(&self, name: &str, offset: Option<IdxSize>) -> Self {1385let ldf = self.ldf.read().clone();1386ldf.with_row_index(name, offset).into()1387}13881389#[pyo3(signature = (function, predicate_pushdown, projection_pushdown, slice_pushdown, streamable, schema, validate_output))]1390fn map_batches(1391&self,1392function: Py<PyAny>,1393predicate_pushdown: bool,1394projection_pushdown: bool,1395slice_pushdown: bool,1396streamable: bool,1397schema: Option<Wrap<Schema>>,1398validate_output: bool,1399) -> Self {1400let mut opt = OptFlags::default();1401opt.set(OptFlags::PREDICATE_PUSHDOWN, predicate_pushdown);1402opt.set(OptFlags::PROJECTION_PUSHDOWN, projection_pushdown);1403opt.set(OptFlags::SLICE_PUSHDOWN, slice_pushdown);1404opt.set(OptFlags::NEW_STREAMING, streamable);14051406self.ldf1407.read()1408.clone()1409.map_python(1410function.into(),1411opt,1412schema.map(|s| Arc::new(s.0)),1413validate_output,1414)1415.into()1416}14171418fn drop(&self, columns: PySelector) -> Self {1419self.ldf.read().clone().drop(columns.inner).into()1420}14211422fn cast(&self, dtypes: HashMap<PyBackedStr, Wrap<DataType>>, strict: bool) -> Self {1423let mut cast_map = PlHashMap::with_capacity(dtypes.len());1424cast_map.extend(dtypes.iter().map(|(k, v)| (k.as_ref(), v.0.clone())));1425self.ldf.read().clone().cast(cast_map, strict).into()1426}14271428fn cast_all(&self, dtype: PyDataTypeExpr, strict: bool) -> Self {1429self.ldf.read().clone().cast_all(dtype.inner, strict).into()1430}14311432fn clone(&self) -> Self {1433self.ldf.read().clone().into()1434}14351436fn collect_schema<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {1437let schema = py.enter_polars(|| self.ldf.write().collect_schema())?;14381439let schema_dict = PyDict::new(py);1440schema.iter_fields().for_each(|fld| {1441schema_dict1442.set_item(fld.name().as_str(), &Wrap(fld.dtype().clone()))1443.unwrap()1444});1445Ok(schema_dict)1446}14471448fn unnest(&self, columns: PySelector, separator: Option<&str>) -> Self {1449self.ldf1450.read()1451.clone()1452.unnest(columns.inner, separator.map(PlSmallStr::from_str))1453.into()1454}14551456fn count(&self) -> Self {1457let ldf = self.ldf.read().clone();1458ldf.count().into()1459}14601461#[cfg(feature = "merge_sorted")]1462fn merge_sorted(&self, other: Self, key: &str) -> PyResult<Self> {1463let out = self1464.ldf1465.read()1466.clone()1467.merge_sorted(other.ldf.into_inner(), key)1468.map_err(PyPolarsErr::from)?;1469Ok(out.into())1470}14711472fn hint_sorted(1473&self,1474columns: Vec<String>,1475descending: Vec<bool>,1476nulls_last: Vec<bool>,1477) -> PyResult<Self> {1478if columns.len() != descending.len() && descending.len() != 1 {1479return Err(PyValueError::new_err(1480"`set_sorted` expects the same amount of `columns` as `descending` values.",1481));1482}1483if columns.len() != nulls_last.len() && nulls_last.len() != 1 {1484return Err(PyValueError::new_err(1485"`set_sorted` expects the same amount of `columns` as `nulls_last` values.",1486));1487}14881489let mut sorted = columns1490.iter()1491.map(|c| Sorted {1492column: PlSmallStr::from_str(c.as_str()),1493descending: Some(false),1494nulls_last: Some(false),1495})1496.collect::<Vec<_>>();14971498if !columns.is_empty() {1499if descending.len() != 1 {1500sorted1501.iter_mut()1502.zip(descending)1503.for_each(|(s, d)| s.descending = Some(d));1504} else if descending[0] {1505sorted.iter_mut().for_each(|s| s.descending = Some(true));1506}15071508if nulls_last.len() != 1 {1509sorted1510.iter_mut()1511.zip(nulls_last)1512.for_each(|(s, d)| s.nulls_last = Some(d));1513} else if nulls_last[0] {1514sorted.iter_mut().for_each(|s| s.nulls_last = Some(true));1515}1516}15171518let out = self1519.ldf1520.read()1521.clone()1522.hint(HintIR::Sorted(sorted.into()))1523.map_err(PyPolarsErr::from)?;1524Ok(out.into())1525}1526}15271528#[cfg(feature = "parquet")]1529impl<'py> FromPyObject<'py> for Wrap<polars_io::parquet::write::ParquetFieldOverwrites> {1530fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {1531use polars_io::parquet::write::ParquetFieldOverwrites;15321533let parsed = ob.extract::<pyo3::Bound<'_, PyDict>>()?;15341535let name = PyDictMethods::get_item(&parsed, "name")?1536.map(|v| PyResult::Ok(v.extract::<String>()?.into()))1537.transpose()?;1538let children = PyDictMethods::get_item(&parsed, "children")?.map_or(1539PyResult::Ok(ChildFieldOverwrites::None),1540|v| {1541Ok(1542if let Ok(overwrites) = v.extract::<Vec<Wrap<ParquetFieldOverwrites>>>() {1543ChildFieldOverwrites::Struct(overwrites.into_iter().map(|v| v.0).collect())1544} else {1545ChildFieldOverwrites::ListLike(Box::new(1546v.extract::<Wrap<ParquetFieldOverwrites>>()?.0,1547))1548},1549)1550},1551)?;15521553let field_id = PyDictMethods::get_item(&parsed, "field_id")?1554.map(|v| v.extract::<i32>())1555.transpose()?;15561557let metadata = PyDictMethods::get_item(&parsed, "metadata")?1558.map(|v| v.extract::<Vec<(String, Option<String>)>>())1559.transpose()?;1560let metadata = metadata.map(|v| {1561v.into_iter()1562.map(|v| MetadataKeyValue {1563key: v.0.into(),1564value: v.1.map(|v| v.into()),1565})1566.collect()1567});15681569let required = PyDictMethods::get_item(&parsed, "required")?1570.map(|v| v.extract::<bool>())1571.transpose()?;15721573Ok(Wrap(ParquetFieldOverwrites {1574name,1575children,1576field_id,1577metadata,1578required,1579}))1580}1581}158215831584