Path: blob/main/crates/polars-python/src/lazyframe/general.rs
8393 views
use std::collections::HashMap;1use std::ffi::CString;2use std::num::NonZeroUsize;34use arrow::ffi::export_iterator;5use either::Either;6use parking_lot::Mutex;7use polars::io::RowIndex;8use polars::time::*;9use polars_core::prelude::*;10#[cfg(feature = "parquet")]11use polars_parquet::arrow::write::StatisticsOptions;12use polars_plan::dsl::ScanSources;13use polars_plan::plans::{AExpr, HintIR, IR, Sorted};14use polars_utils::arena::{Arena, Node};15use polars_utils::python_function::PythonObject;16use pyo3::exceptions::{PyTypeError, PyValueError};17use pyo3::prelude::*;18use pyo3::pybacked::PyBackedStr;19use pyo3::types::{PyCapsule, PyDict, PyDictMethods, PyList};2021use super::{PyLazyFrame, PyOptFlags};22use crate::error::PyPolarsErr;23use crate::expr::ToExprs;24use crate::expr::datatype::PyDataTypeExpr;25use crate::expr::selector::PySelector;26use crate::interop::arrow::to_rust::pyarrow_schema_to_rust;27#[cfg(feature = "json")]28use crate::io::cloud_options::OptPyCloudOptions;29use crate::io::scan_options::PyScanOptions;30use crate::io::sink_options::PySinkOptions;31use crate::io::sink_output::PyFileSinkDestination;32use crate::lazyframe::visit::NodeTraverser;33use crate::prelude::*;34use crate::utils::{EnterPolarsExt, to_py_err};35use crate::{PyDataFrame, PyExpr, PyLazyGroupBy};3637fn pyobject_to_first_path_and_scan_sources(38obj: Py<PyAny>,39) -> PyResult<(Option<PlRefPath>, ScanSources)> {40use crate::file::{PythonScanSourceInput, get_python_scan_source_input};41Ok(match get_python_scan_source_input(obj, false)? {42PythonScanSourceInput::Path(path) => (43Some(path.clone()),44ScanSources::Paths(FromIterator::from_iter([path])),45),46PythonScanSourceInput::File(file) => (None, ScanSources::Files([file.into()].into())),47PythonScanSourceInput::Buffer(buff) => (None, ScanSources::Buffers([buff].into())),48})49}5051fn post_opt_callback(52lambda: &Py<PyAny>,53root: Node,54lp_arena: &mut Arena<IR>,55expr_arena: &mut Arena<AExpr>,56duration_since_start: Option<std::time::Duration>,57) -> PolarsResult<()> {58Python::attach(|py| {59let nt = NodeTraverser::new(root, std::mem::take(lp_arena), std::mem::take(expr_arena));6061// Get a copy of the arenas.62let arenas = nt.get_arenas();6364// Pass the node visitor which allows the python callback to replace parts of the query plan.65// Remove "cuda" or specify better once we have multiple post-opt callbacks.66lambda67.call1(py, (nt, duration_since_start.map(|t| t.as_nanos() as u64)))68.map_err(|e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e))?;6970// Unpack the arenas.71// At this point the `nt` is useless.7273std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap());74std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap());7576Ok(())77})78}7980#[pymethods]81#[allow(clippy::should_implement_trait)]82impl PyLazyFrame {83#[staticmethod]84#[cfg(feature = "json")]85#[allow(clippy::too_many_arguments)]86#[pyo3(signature = (87source, sources, infer_schema_length, schema, schema_overrides, batch_size, n_rows, low_memory, rechunk,88row_index, ignore_errors, include_file_paths, cloud_options, credential_provider89))]90fn new_from_ndjson(91source: Option<Py<PyAny>>,92sources: Wrap<ScanSources>,93infer_schema_length: Option<usize>,94schema: Option<Wrap<Schema>>,95schema_overrides: Option<Wrap<Schema>>,96batch_size: Option<NonZeroUsize>,97n_rows: Option<usize>,98low_memory: bool,99rechunk: bool,100row_index: Option<(String, IdxSize)>,101ignore_errors: bool,102include_file_paths: Option<String>,103cloud_options: OptPyCloudOptions,104credential_provider: Option<Py<PyAny>>,105) -> PyResult<Self> {106let row_index = row_index.map(|(name, offset)| RowIndex {107name: name.into(),108offset,109});110111let sources = sources.0;112let (first_path, sources) = match source {113None => (sources.first_path().cloned(), sources),114Some(source) => pyobject_to_first_path_and_scan_sources(source)?,115};116117let mut r = LazyJsonLineReader::new_with_sources(sources);118119if let Some(first_path) = first_path {120let first_path_url = first_path.as_str();121122let cloud_options = cloud_options.extract_opt_cloud_options(123CloudScheme::from_path(first_path_url),124credential_provider,125)?;126127r = r.with_cloud_options(cloud_options);128};129130let lf = r131.with_infer_schema_length(infer_schema_length.and_then(NonZeroUsize::new))132.with_batch_size(batch_size)133.with_n_rows(n_rows)134.low_memory(low_memory)135.with_rechunk(rechunk)136.with_schema(schema.map(|schema| Arc::new(schema.0)))137.with_schema_overwrite(schema_overrides.map(|x| Arc::new(x.0)))138.with_row_index(row_index)139.with_ignore_errors(ignore_errors)140.with_include_file_paths(include_file_paths.map(|x| x.into()))141.finish()142.map_err(PyPolarsErr::from)?;143144Ok(lf.into())145}146147#[staticmethod]148#[cfg(feature = "csv")]149#[pyo3(signature = (source, sources, separator, has_header, ignore_errors, skip_rows, skip_lines, n_rows, cache, overwrite_dtype,150low_memory, comment_prefix, quote_char, null_values, missing_utf8_is_empty_string,151infer_schema_length, with_schema_modify, rechunk, skip_rows_after_header,152encoding, row_index, try_parse_dates, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, glob, schema,153cloud_options, credential_provider, include_file_paths154)155)]156fn new_from_csv(157source: Option<Py<PyAny>>,158sources: Wrap<ScanSources>,159separator: &str,160has_header: bool,161ignore_errors: bool,162skip_rows: usize,163skip_lines: usize,164n_rows: Option<usize>,165cache: bool,166overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,167low_memory: bool,168comment_prefix: Option<&str>,169quote_char: Option<&str>,170null_values: Option<Wrap<NullValues>>,171missing_utf8_is_empty_string: bool,172infer_schema_length: Option<usize>,173with_schema_modify: Option<Py<PyAny>>,174rechunk: bool,175skip_rows_after_header: usize,176encoding: Wrap<CsvEncoding>,177row_index: Option<(String, IdxSize)>,178try_parse_dates: bool,179eol_char: &str,180raise_if_empty: bool,181truncate_ragged_lines: bool,182decimal_comma: bool,183glob: bool,184schema: Option<Wrap<Schema>>,185cloud_options: OptPyCloudOptions,186credential_provider: Option<Py<PyAny>>,187include_file_paths: Option<String>,188) -> PyResult<Self> {189let null_values = null_values.map(|w| w.0);190let quote_char = quote_char.and_then(|s| s.as_bytes().first()).copied();191let separator = separator192.as_bytes()193.first()194.ok_or_else(|| polars_err!(InvalidOperation: "`separator` cannot be empty"))195.copied()196.map_err(PyPolarsErr::from)?;197let eol_char = eol_char198.as_bytes()199.first()200.ok_or_else(|| polars_err!(InvalidOperation: "`eol_char` cannot be empty"))201.copied()202.map_err(PyPolarsErr::from)?;203let row_index = row_index.map(|(name, offset)| RowIndex {204name: name.into(),205offset,206});207208let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {209overwrite_dtype210.into_iter()211.map(|(name, dtype)| Field::new((&*name).into(), dtype.0))212.collect::<Schema>()213});214215let sources = sources.0;216let (first_path, sources) = match source {217None => (sources.first_path().cloned(), sources),218Some(source) => pyobject_to_first_path_and_scan_sources(source)?,219};220221let mut r = LazyCsvReader::new_with_sources(sources);222223if let Some(first_path) = first_path {224let first_path_url = first_path.as_str();225let cloud_options = cloud_options.extract_opt_cloud_options(226CloudScheme::from_path(first_path_url),227credential_provider,228)?;229r = r.with_cloud_options(cloud_options);230}231232let mut r = r233.with_infer_schema_length(infer_schema_length)234.with_separator(separator)235.with_has_header(has_header)236.with_ignore_errors(ignore_errors)237.with_skip_rows(skip_rows)238.with_skip_lines(skip_lines)239.with_n_rows(n_rows)240.with_cache(cache)241.with_dtype_overwrite(overwrite_dtype.map(Arc::new))242.with_schema(schema.map(|schema| Arc::new(schema.0)))243.with_low_memory(low_memory)244.with_comment_prefix(comment_prefix.map(|x| x.into()))245.with_quote_char(quote_char)246.with_eol_char(eol_char)247.with_rechunk(rechunk)248.with_skip_rows_after_header(skip_rows_after_header)249.with_encoding(encoding.0)250.with_row_index(row_index)251.with_try_parse_dates(try_parse_dates)252.with_null_values(null_values)253.with_missing_is_null(!missing_utf8_is_empty_string)254.with_truncate_ragged_lines(truncate_ragged_lines)255.with_decimal_comma(decimal_comma)256.with_glob(glob)257.with_raise_if_empty(raise_if_empty)258.with_include_file_paths(include_file_paths.map(|x| x.into()));259260if let Some(lambda) = with_schema_modify {261let f = |schema: Schema| {262let iter = schema.iter_names().map(|s| s.as_str());263Python::attach(|py| {264let names = PyList::new(py, iter).unwrap();265266let out = lambda.call1(py, (names,)).expect("python function failed");267let new_names = out268.extract::<Vec<String>>(py)269.expect("python function should return List[str]");270polars_ensure!(new_names.len() == schema.len(),271ShapeMismatch: "The length of the new names list should be equal to or less than the original column length",272);273Ok(schema274.iter_values()275.zip(new_names)276.map(|(dtype, name)| Field::new(name.into(), dtype.clone()))277.collect())278})279};280r = r.with_schema_modify(f).map_err(PyPolarsErr::from)?281}282283Ok(r.finish().map_err(PyPolarsErr::from)?.into())284}285286#[cfg(feature = "parquet")]287#[staticmethod]288#[pyo3(signature = (289sources, schema, scan_options, parallel, low_memory, use_statistics290))]291fn new_from_parquet(292sources: Wrap<ScanSources>,293schema: Option<Wrap<Schema>>,294scan_options: PyScanOptions,295parallel: Wrap<ParallelStrategy>,296low_memory: bool,297use_statistics: bool,298) -> PyResult<Self> {299use crate::utils::to_py_err;300301let parallel = parallel.0;302303let options = ParquetOptions {304schema: schema.map(|x| Arc::new(x.0)),305parallel,306low_memory,307use_statistics,308};309310let sources = sources.0;311let first_path = sources.first_path();312313let unified_scan_args =314scan_options.extract_unified_scan_args(first_path.and_then(|x| x.scheme()))?;315316let lf: LazyFrame = DslBuilder::scan_parquet(sources, options, unified_scan_args)317.map_err(to_py_err)?318.build()319.into();320321Ok(lf.into())322}323324#[cfg(feature = "ipc")]325#[staticmethod]326#[pyo3(signature = (sources, record_batch_statistics, scan_options))]327fn new_from_ipc(328sources: Wrap<ScanSources>,329record_batch_statistics: bool,330scan_options: PyScanOptions,331) -> PyResult<Self> {332let options = IpcScanOptions {333record_batch_statistics,334checked: Default::default(),335};336337let sources = sources.0;338let first_path = sources.first_path().cloned();339340let unified_scan_args =341scan_options.extract_unified_scan_args(first_path.as_ref().and_then(|x| x.scheme()))?;342343let lf = LazyFrame::scan_ipc_sources(sources, options, unified_scan_args)344.map_err(PyPolarsErr::from)?;345Ok(lf.into())346}347348#[cfg(feature = "scan_lines")]349#[staticmethod]350#[pyo3(signature = (sources, scan_options, name))]351fn new_from_scan_lines(352sources: Wrap<ScanSources>,353scan_options: PyScanOptions,354name: PyBackedStr,355) -> PyResult<Self> {356let sources = sources.0;357let first_path = sources.first_path();358359let unified_scan_args =360scan_options.extract_unified_scan_args(first_path.and_then(|x| x.scheme()))?;361362let dsl: DslPlan = DslBuilder::scan_lines(sources, unified_scan_args, (&*name).into())363.map_err(to_py_err)?364.build();365let lf: LazyFrame = dsl.into();366367Ok(lf.into())368}369370#[staticmethod]371#[pyo3(signature = (372dataset_object373))]374fn new_from_dataset_object(dataset_object: Py<PyAny>) -> PyResult<Self> {375let lf =376LazyFrame::from(DslBuilder::scan_python_dataset(PythonObject(dataset_object)).build())377.into();378379Ok(lf)380}381382#[staticmethod]383fn scan_from_python_function_arrow_schema(384schema: &Bound<'_, PyList>,385scan_fn: Py<PyAny>,386pyarrow: bool,387validate_schema: bool,388is_pure: bool,389) -> PyResult<Self> {390let schema = Arc::new(pyarrow_schema_to_rust(schema)?);391392Ok(LazyFrame::scan_from_python_function(393Either::Right(schema),394scan_fn,395pyarrow,396validate_schema,397is_pure,398)399.into())400}401402#[staticmethod]403fn scan_from_python_function_pl_schema(404schema: Vec<(PyBackedStr, Wrap<DataType>)>,405scan_fn: Py<PyAny>,406pyarrow: bool,407validate_schema: bool,408is_pure: bool,409) -> PyResult<Self> {410let schema = Arc::new(Schema::from_iter(411schema412.into_iter()413.map(|(name, dt)| Field::new((&*name).into(), dt.0)),414));415Ok(LazyFrame::scan_from_python_function(416Either::Right(schema),417scan_fn,418pyarrow,419validate_schema,420is_pure,421)422.into())423}424425#[staticmethod]426fn scan_from_python_function_schema_function(427schema_fn: Py<PyAny>,428scan_fn: Py<PyAny>,429validate_schema: bool,430is_pure: bool,431) -> PyResult<Self> {432Ok(LazyFrame::scan_from_python_function(433Either::Left(schema_fn),434scan_fn,435false,436validate_schema,437is_pure,438)439.into())440}441442fn describe_plan(&self, py: Python) -> PyResult<String> {443py.enter_polars(|| self.ldf.read().describe_plan())444}445446fn describe_optimized_plan(&self, py: Python) -> PyResult<String> {447py.enter_polars(|| self.ldf.read().describe_optimized_plan())448}449450fn describe_plan_tree(&self, py: Python) -> PyResult<String> {451py.enter_polars(|| self.ldf.read().describe_plan_tree())452}453454fn describe_optimized_plan_tree(&self, py: Python) -> PyResult<String> {455py.enter_polars(|| self.ldf.read().describe_optimized_plan_tree())456}457458fn to_dot(&self, py: Python<'_>, optimized: bool) -> PyResult<String> {459py.enter_polars(|| self.ldf.read().to_dot(optimized))460}461462#[cfg(feature = "new_streaming")]463fn to_dot_streaming_phys(&self, py: Python, optimized: bool) -> PyResult<String> {464py.enter_polars(|| self.ldf.read().to_dot_streaming_phys(optimized))465}466467fn sort(468&self,469by_column: &str,470descending: bool,471nulls_last: bool,472maintain_order: bool,473multithreaded: bool,474) -> Self {475let ldf = self.ldf.read().clone();476ldf.sort(477[by_column],478SortMultipleOptions {479descending: vec![descending],480nulls_last: vec![nulls_last],481multithreaded,482maintain_order,483limit: None,484},485)486.into()487}488489fn sort_by_exprs(490&self,491by: Vec<PyExpr>,492descending: Vec<bool>,493nulls_last: Vec<bool>,494maintain_order: bool,495multithreaded: bool,496) -> Self {497let ldf = self.ldf.read().clone();498let exprs = by.to_exprs();499ldf.sort_by_exprs(500exprs,501SortMultipleOptions {502descending,503nulls_last,504maintain_order,505multithreaded,506limit: None,507},508)509.into()510}511512fn top_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {513let ldf = self.ldf.read().clone();514let exprs = by.to_exprs();515ldf.top_k(516k,517exprs,518SortMultipleOptions::new().with_order_descending_multi(reverse),519)520.into()521}522523fn bottom_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {524let ldf = self.ldf.read().clone();525let exprs = by.to_exprs();526ldf.bottom_k(527k,528exprs,529SortMultipleOptions::new().with_order_descending_multi(reverse),530)531.into()532}533534fn cache(&self) -> Self {535let ldf = self.ldf.read().clone();536ldf.cache().into()537}538539#[pyo3(signature = (optflags))]540fn with_optimizations(&self, optflags: PyOptFlags) -> Self {541let ldf = self.ldf.read().clone();542ldf.with_optimizations(optflags.inner.into_inner()).into()543}544545#[pyo3(signature = (lambda_post_opt))]546fn profile(547&self,548py: Python<'_>,549lambda_post_opt: Option<Py<PyAny>>,550) -> PyResult<(PyDataFrame, PyDataFrame)> {551let (df, time_df) = py.enter_polars(|| {552let ldf = self.ldf.read().clone();553if let Some(lambda) = lambda_post_opt {554ldf._profile_post_opt(|root, lp_arena, expr_arena, duration_since_start| {555post_opt_callback(&lambda, root, lp_arena, expr_arena, duration_since_start)556})557} else {558ldf.profile()559}560})?;561Ok((df.into(), time_df.into()))562}563564#[pyo3(signature = (engine, lambda_post_opt))]565fn collect(566&self,567py: Python<'_>,568engine: Wrap<Engine>,569lambda_post_opt: Option<Py<PyAny>>,570) -> PyResult<PyDataFrame> {571py.enter_polars_df(|| {572let ldf = self.ldf.read().clone();573if let Some(lambda) = lambda_post_opt {574ldf._collect_post_opt(|root, lp_arena, expr_arena, _| {575post_opt_callback(&lambda, root, lp_arena, expr_arena, None)576})577} else {578ldf.collect_with_engine(engine.0)579}580})581}582583#[cfg(feature = "async")]584#[pyo3(signature = (engine, lambda))]585fn collect_with_callback(586&self,587py: Python<'_>,588engine: Wrap<Engine>,589lambda: Py<PyAny>,590) -> PyResult<()> {591py.enter_polars_ok(|| {592let ldf = self.ldf.read().clone();593594// We use a tokio spawn_blocking here as it has a high blocking595// thread pool limit.596polars_io::pl_async::get_runtime().spawn_blocking(move || {597let result = ldf598.collect_with_engine(engine.0)599.map(PyDataFrame::new)600.map_err(PyPolarsErr::from);601602Python::attach(|py| match result {603Ok(df) => {604lambda.call1(py, (df,)).map_err(|err| err.restore(py)).ok();605},606Err(err) => {607lambda608.call1(py, (PyErr::from(err),))609.map_err(|err| err.restore(py))610.ok();611},612});613});614})615}616617#[cfg(feature = "async")]618fn collect_batches(619&self,620py: Python<'_>,621engine: Wrap<Engine>,622maintain_order: bool,623chunk_size: Option<NonZeroUsize>,624lazy: bool,625) -> PyResult<PyCollectBatches> {626py.enter_polars(|| {627let ldf = self.ldf.read().clone();628629let collect_batches = ldf630.clone()631.collect_batches(engine.0, maintain_order, chunk_size, lazy)632.map_err(PyPolarsErr::from)?;633634PyResult::Ok(PyCollectBatches {635inner: Arc::new(Mutex::new(collect_batches)),636ldf,637})638})639}640641#[cfg(feature = "parquet")]642#[pyo3(signature = (643target, sink_options, compression, compression_level, statistics, row_group_size, data_page_size,644metadata, arrow_schema645))]646fn sink_parquet(647&self,648py: Python<'_>,649target: PyFileSinkDestination,650sink_options: PySinkOptions,651compression: &str,652compression_level: Option<i32>,653statistics: Wrap<StatisticsOptions>,654row_group_size: Option<usize>,655data_page_size: Option<usize>,656metadata: Wrap<Option<KeyValueMetadata>>,657arrow_schema: Option<Wrap<ArrowSchema>>,658) -> PyResult<PyLazyFrame> {659let compression = parse_parquet_compression(compression, compression_level)?;660661let options = ParquetWriteOptions {662compression,663statistics: statistics.0,664row_group_size,665data_page_size,666key_value_metadata: metadata.0,667arrow_schema: arrow_schema.map(|x| Arc::new(x.0)),668compat_level: None,669};670671let target = target.extract_file_sink_destination()?;672let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;673674py.enter_polars(|| {675self.ldf676.read()677.clone()678.sink(679target,680FileWriteFormat::Parquet(Arc::new(options)),681unified_sink_args,682)683.into()684})685.map(Into::into)686.map_err(Into::into)687}688689#[cfg(feature = "ipc")]690#[pyo3(signature = (691target, sink_options, compression, compat_level, record_batch_size, record_batch_statistics692))]693fn sink_ipc(694&self,695py: Python<'_>,696target: PyFileSinkDestination,697sink_options: PySinkOptions,698compression: Wrap<Option<IpcCompression>>,699compat_level: PyCompatLevel,700record_batch_size: Option<usize>,701record_batch_statistics: bool,702) -> PyResult<PyLazyFrame> {703let options = IpcWriterOptions {704compression: compression.0,705compat_level: compat_level.0,706record_batch_size,707record_batch_statistics,708};709710let target = target.extract_file_sink_destination()?;711let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;712713py.enter_polars(|| {714self.ldf715.read()716.clone()717.sink(target, FileWriteFormat::Ipc(options), unified_sink_args)718.into()719})720.map(Into::into)721.map_err(Into::into)722}723724#[cfg(feature = "csv")]725#[pyo3(signature = (726target, sink_options, include_bom, compression, compression_level, check_extension,727include_header, separator, line_terminator, quote_char, batch_size, datetime_format,728date_format, time_format, float_scientific, float_precision, decimal_comma, null_value,729quote_style730))]731fn sink_csv(732&self,733py: Python<'_>,734target: PyFileSinkDestination,735sink_options: PySinkOptions,736include_bom: bool,737compression: &str,738compression_level: Option<u32>,739check_extension: bool,740include_header: bool,741separator: u8,742line_terminator: Wrap<PlSmallStr>,743quote_char: u8,744batch_size: NonZeroUsize,745datetime_format: Option<Wrap<PlSmallStr>>,746date_format: Option<Wrap<PlSmallStr>>,747time_format: Option<Wrap<PlSmallStr>>,748float_scientific: Option<bool>,749float_precision: Option<usize>,750decimal_comma: bool,751null_value: Option<Wrap<PlSmallStr>>,752quote_style: Option<Wrap<QuoteStyle>>,753) -> PyResult<PyLazyFrame> {754let quote_style = quote_style.map_or(QuoteStyle::default(), |wrap| wrap.0);755let null_value = null_value756.map(|x| x.0)757.unwrap_or(SerializeOptions::default().null);758759let serialize_options = SerializeOptions {760date_format: date_format.map(|x| x.0),761time_format: time_format.map(|x| x.0),762datetime_format: datetime_format.map(|x| x.0),763float_scientific,764float_precision,765decimal_comma,766separator,767quote_char,768null: null_value,769line_terminator: line_terminator.0,770quote_style,771};772773let options = CsvWriterOptions {774include_bom,775compression: ExternalCompression::try_from(compression, compression_level)776.map_err(PyPolarsErr::from)?,777check_extension,778include_header,779batch_size,780serialize_options: serialize_options.into(),781};782783let target = target.extract_file_sink_destination()?;784let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;785786py.enter_polars(|| {787self.ldf788.read()789.clone()790.sink(target, FileWriteFormat::Csv(options), unified_sink_args)791.into()792})793.map(Into::into)794.map_err(Into::into)795}796797#[allow(clippy::too_many_arguments)]798#[cfg(feature = "json")]799#[pyo3(signature = (target, compression, compression_level, check_extension, sink_options))]800fn sink_ndjson(801&self,802py: Python<'_>,803target: PyFileSinkDestination,804compression: &str,805compression_level: Option<u32>,806check_extension: bool,807sink_options: PySinkOptions,808) -> PyResult<PyLazyFrame> {809let options = NDJsonWriterOptions {810compression: ExternalCompression::try_from(compression, compression_level)811.map_err(PyPolarsErr::from)?,812check_extension,813};814815let target = target.extract_file_sink_destination()?;816let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;817818py.enter_polars(|| {819self.ldf820.read()821.clone()822.sink(target, FileWriteFormat::NDJson(options), unified_sink_args)823.into()824})825.map(Into::into)826.map_err(Into::into)827}828829#[pyo3(signature = (function, maintain_order, chunk_size))]830pub fn sink_batches(831&self,832py: Python<'_>,833function: Py<PyAny>,834maintain_order: bool,835chunk_size: Option<NonZeroUsize>,836) -> PyResult<PyLazyFrame> {837let ldf = self.ldf.read().clone();838py.enter_polars(|| {839ldf.sink_batches(840PlanCallback::new_python(PythonObject(function)),841maintain_order,842chunk_size,843)844})845.map(Into::into)846.map_err(Into::into)847}848849fn filter(&self, predicate: PyExpr) -> Self {850self.ldf.read().clone().filter(predicate.inner).into()851}852853fn remove(&self, predicate: PyExpr) -> Self {854let ldf = self.ldf.read().clone();855ldf.remove(predicate.inner).into()856}857858fn select(&self, exprs: Vec<PyExpr>) -> Self {859let ldf = self.ldf.read().clone();860let exprs = exprs.to_exprs();861ldf.select(exprs).into()862}863864fn select_seq(&self, exprs: Vec<PyExpr>) -> Self {865let ldf = self.ldf.read().clone();866let exprs = exprs.to_exprs();867ldf.select_seq(exprs).into()868}869870fn group_by(&self, by: Vec<PyExpr>, maintain_order: bool) -> PyLazyGroupBy {871let ldf = self.ldf.read().clone();872let by = by.to_exprs();873let lazy_gb = if maintain_order {874ldf.group_by_stable(by)875} else {876ldf.group_by(by)877};878879PyLazyGroupBy { lgb: Some(lazy_gb) }880}881882fn rolling(883&self,884index_column: PyExpr,885period: &str,886offset: &str,887closed: Wrap<ClosedWindow>,888by: Vec<PyExpr>,889) -> PyResult<PyLazyGroupBy> {890let closed_window = closed.0;891let ldf = self.ldf.read().clone();892let by = by893.into_iter()894.map(|pyexpr| pyexpr.inner)895.collect::<Vec<_>>();896let lazy_gb = ldf.rolling(897index_column.inner,898by,899RollingGroupOptions {900index_column: "".into(),901period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,902offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,903closed_window,904},905);906907Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })908}909910fn group_by_dynamic(911&self,912index_column: PyExpr,913every: &str,914period: &str,915offset: &str,916label: Wrap<Label>,917include_boundaries: bool,918closed: Wrap<ClosedWindow>,919group_by: Vec<PyExpr>,920start_by: Wrap<StartBy>,921) -> PyResult<PyLazyGroupBy> {922let closed_window = closed.0;923let group_by = group_by924.into_iter()925.map(|pyexpr| pyexpr.inner)926.collect::<Vec<_>>();927let ldf = self.ldf.read().clone();928let lazy_gb = ldf.group_by_dynamic(929index_column.inner,930group_by,931DynamicGroupOptions {932every: Duration::try_parse(every).map_err(PyPolarsErr::from)?,933period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,934offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,935label: label.0,936include_boundaries,937closed_window,938start_by: start_by.0,939..Default::default()940},941);942943Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })944}945946fn with_context(&self, contexts: Vec<Self>) -> Self {947let contexts = contexts948.into_iter()949.map(|ldf| ldf.ldf.into_inner())950.collect::<Vec<_>>();951self.ldf.read().clone().with_context(contexts).into()952}953954#[cfg(feature = "asof_join")]955#[pyo3(signature = (other, left_on, right_on, left_by, right_by, allow_parallel, force_parallel, suffix, strategy, tolerance, tolerance_str, coalesce, allow_eq, check_sortedness))]956fn join_asof(957&self,958other: Self,959left_on: PyExpr,960right_on: PyExpr,961left_by: Option<Vec<PyBackedStr>>,962right_by: Option<Vec<PyBackedStr>>,963allow_parallel: bool,964force_parallel: bool,965suffix: String,966strategy: Wrap<AsofStrategy>,967tolerance: Option<Wrap<AnyValue<'_>>>,968tolerance_str: Option<String>,969coalesce: bool,970allow_eq: bool,971check_sortedness: bool,972) -> PyResult<Self> {973let coalesce = if coalesce {974JoinCoalesce::CoalesceColumns975} else {976JoinCoalesce::KeepColumns977};978let ldf = self.ldf.read().clone();979let other = other.ldf.into_inner();980let left_on = left_on.inner;981let right_on = right_on.inner;982Ok(ldf983.join_builder()984.with(other)985.left_on([left_on])986.right_on([right_on])987.allow_parallel(allow_parallel)988.force_parallel(force_parallel)989.coalesce(coalesce)990.how(JoinType::AsOf(Box::new(AsOfOptions {991strategy: strategy.0,992left_by: left_by.map(strings_to_pl_smallstr),993right_by: right_by.map(strings_to_pl_smallstr),994tolerance: tolerance.map(|t| {995let av = t.0.into_static();996let dtype = av.dtype();997Scalar::new(dtype, av)998}),999tolerance_str: tolerance_str.map(|s| s.into()),1000allow_eq,1001check_sortedness,1002})))1003.suffix(suffix)1004.finish()1005.into())1006}10071008#[pyo3(signature = (other, left_on, right_on, allow_parallel, force_parallel, nulls_equal, how, suffix, validate, maintain_order, coalesce=None))]1009fn join(1010&self,1011other: Self,1012left_on: Vec<PyExpr>,1013right_on: Vec<PyExpr>,1014allow_parallel: bool,1015force_parallel: bool,1016nulls_equal: bool,1017how: Wrap<JoinType>,1018suffix: String,1019validate: Wrap<JoinValidation>,1020maintain_order: Wrap<MaintainOrderJoin>,1021coalesce: Option<bool>,1022) -> PyResult<Self> {1023let coalesce = match coalesce {1024None => JoinCoalesce::JoinSpecific,1025Some(true) => JoinCoalesce::CoalesceColumns,1026Some(false) => JoinCoalesce::KeepColumns,1027};1028let ldf = self.ldf.read().clone();1029let other = other.ldf.into_inner();1030let left_on = left_on1031.into_iter()1032.map(|pyexpr| pyexpr.inner)1033.collect::<Vec<_>>();1034let right_on = right_on1035.into_iter()1036.map(|pyexpr| pyexpr.inner)1037.collect::<Vec<_>>();10381039Ok(ldf1040.join_builder()1041.with(other)1042.left_on(left_on)1043.right_on(right_on)1044.allow_parallel(allow_parallel)1045.force_parallel(force_parallel)1046.join_nulls(nulls_equal)1047.how(how.0)1048.suffix(suffix)1049.validate(validate.0)1050.coalesce(coalesce)1051.maintain_order(maintain_order.0)1052.finish()1053.into())1054}10551056fn join_where(&self, other: Self, predicates: Vec<PyExpr>, suffix: String) -> PyResult<Self> {1057let ldf = self.ldf.read().clone();1058let other = other.ldf.into_inner();10591060let predicates = predicates.to_exprs();10611062Ok(ldf1063.join_builder()1064.with(other)1065.suffix(suffix)1066.join_where(predicates)1067.into())1068}10691070fn with_columns(&self, exprs: Vec<PyExpr>) -> Self {1071let ldf = self.ldf.read().clone();1072ldf.with_columns(exprs.to_exprs()).into()1073}10741075fn with_columns_seq(&self, exprs: Vec<PyExpr>) -> Self {1076let ldf = self.ldf.read().clone();1077ldf.with_columns_seq(exprs.to_exprs()).into()1078}10791080fn match_to_schema<'py>(1081&self,1082schema: Wrap<Schema>,1083missing_columns: &Bound<'py, PyAny>,1084missing_struct_fields: &Bound<'py, PyAny>,1085extra_columns: Wrap<ExtraColumnsPolicy>,1086extra_struct_fields: &Bound<'py, PyAny>,1087integer_cast: &Bound<'py, PyAny>,1088float_cast: &Bound<'py, PyAny>,1089) -> PyResult<Self> {1090fn parse_missing_columns<'py>(1091schema: &Schema,1092missing_columns: &Bound<'py, PyAny>,1093) -> PyResult<Vec<MissingColumnsPolicyOrExpr>> {1094let mut out = Vec::with_capacity(schema.len());1095if let Ok(policy) = missing_columns.extract::<Wrap<MissingColumnsPolicyOrExpr>>() {1096out.extend(std::iter::repeat_n(policy.0, schema.len()));1097} else if let Ok(dict) = missing_columns.cast::<PyDict>() {1098out.extend(std::iter::repeat_n(1099MissingColumnsPolicyOrExpr::Raise,1100schema.len(),1101));1102for (key, value) in dict.iter() {1103let key = key.extract::<String>()?;1104let value = value.extract::<Wrap<MissingColumnsPolicyOrExpr>>()?;1105out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;1106}1107} else {1108return Err(PyTypeError::new_err("Invalid value for `missing_columns`"));1109}1110Ok(out)1111}1112fn parse_missing_struct_fields<'py>(1113schema: &Schema,1114missing_struct_fields: &Bound<'py, PyAny>,1115) -> PyResult<Vec<MissingColumnsPolicy>> {1116let mut out = Vec::with_capacity(schema.len());1117if let Ok(policy) = missing_struct_fields.extract::<Wrap<MissingColumnsPolicy>>() {1118out.extend(std::iter::repeat_n(policy.0, schema.len()));1119} else if let Ok(dict) = missing_struct_fields.cast::<PyDict>() {1120out.extend(std::iter::repeat_n(1121MissingColumnsPolicy::Raise,1122schema.len(),1123));1124for (key, value) in dict.iter() {1125let key = key.extract::<String>()?;1126let value = value.extract::<Wrap<MissingColumnsPolicy>>()?;1127out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;1128}1129} else {1130return Err(PyTypeError::new_err(1131"Invalid value for `missing_struct_fields`",1132));1133}1134Ok(out)1135}1136fn parse_extra_struct_fields<'py>(1137schema: &Schema,1138extra_struct_fields: &Bound<'py, PyAny>,1139) -> PyResult<Vec<ExtraColumnsPolicy>> {1140let mut out = Vec::with_capacity(schema.len());1141if let Ok(policy) = extra_struct_fields.extract::<Wrap<ExtraColumnsPolicy>>() {1142out.extend(std::iter::repeat_n(policy.0, schema.len()));1143} else if let Ok(dict) = extra_struct_fields.cast::<PyDict>() {1144out.extend(std::iter::repeat_n(ExtraColumnsPolicy::Raise, schema.len()));1145for (key, value) in dict.iter() {1146let key = key.extract::<String>()?;1147let value = value.extract::<Wrap<ExtraColumnsPolicy>>()?;1148out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;1149}1150} else {1151return Err(PyTypeError::new_err(1152"Invalid value for `extra_struct_fields`",1153));1154}1155Ok(out)1156}1157fn parse_cast<'py>(1158schema: &Schema,1159cast: &Bound<'py, PyAny>,1160) -> PyResult<Vec<UpcastOrForbid>> {1161let mut out = Vec::with_capacity(schema.len());1162if let Ok(policy) = cast.extract::<Wrap<UpcastOrForbid>>() {1163out.extend(std::iter::repeat_n(policy.0, schema.len()));1164} else if let Ok(dict) = cast.cast::<PyDict>() {1165out.extend(std::iter::repeat_n(UpcastOrForbid::Forbid, schema.len()));1166for (key, value) in dict.iter() {1167let key = key.extract::<String>()?;1168let value = value.extract::<Wrap<UpcastOrForbid>>()?;1169out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;1170}1171} else {1172return Err(PyTypeError::new_err(1173"Invalid value for `integer_cast` / `float_cast`",1174));1175}1176Ok(out)1177}11781179let missing_columns = parse_missing_columns(&schema.0, missing_columns)?;1180let missing_struct_fields = parse_missing_struct_fields(&schema.0, missing_struct_fields)?;1181let extra_struct_fields = parse_extra_struct_fields(&schema.0, extra_struct_fields)?;1182let integer_cast = parse_cast(&schema.0, integer_cast)?;1183let float_cast = parse_cast(&schema.0, float_cast)?;11841185let per_column = (0..schema.0.len())1186.map(|i| MatchToSchemaPerColumn {1187missing_columns: missing_columns[i].clone(),1188missing_struct_fields: missing_struct_fields[i],1189extra_struct_fields: extra_struct_fields[i],1190integer_cast: integer_cast[i],1191float_cast: float_cast[i],1192})1193.collect();11941195let ldf = self.ldf.read().clone();1196Ok(ldf1197.match_to_schema(Arc::new(schema.0), per_column, extra_columns.0)1198.into())1199}12001201fn pipe_with_schema(&self, callback: Py<PyAny>) -> Self {1202let ldf = self.ldf.read().clone();1203let function = PythonObject(callback);1204ldf.pipe_with_schema(PlanCallback::new_python(function))1205.into()1206}12071208fn rename(&self, existing: Vec<String>, new: Vec<String>, strict: bool) -> Self {1209let ldf = self.ldf.read().clone();1210ldf.rename(existing, new, strict).into()1211}12121213fn reverse(&self) -> Self {1214let ldf = self.ldf.read().clone();1215ldf.reverse().into()1216}12171218#[pyo3(signature = (n, fill_value=None))]1219fn shift(&self, n: PyExpr, fill_value: Option<PyExpr>) -> Self {1220let lf = self.ldf.read().clone();1221let out = match fill_value {1222Some(v) => lf.shift_and_fill(n.inner, v.inner),1223None => lf.shift(n.inner),1224};1225out.into()1226}12271228fn fill_nan(&self, fill_value: PyExpr) -> Self {1229let ldf = self.ldf.read().clone();1230ldf.fill_nan(fill_value.inner).into()1231}12321233fn min(&self) -> Self {1234let ldf = self.ldf.read().clone();1235let out = ldf.min();1236out.into()1237}12381239fn max(&self) -> Self {1240let ldf = self.ldf.read().clone();1241let out = ldf.max();1242out.into()1243}12441245fn sum(&self) -> Self {1246let ldf = self.ldf.read().clone();1247let out = ldf.sum();1248out.into()1249}12501251fn mean(&self) -> Self {1252let ldf = self.ldf.read().clone();1253let out = ldf.mean();1254out.into()1255}12561257fn std(&self, ddof: u8) -> Self {1258let ldf = self.ldf.read().clone();1259let out = ldf.std(ddof);1260out.into()1261}12621263fn var(&self, ddof: u8) -> Self {1264let ldf = self.ldf.read().clone();1265let out = ldf.var(ddof);1266out.into()1267}12681269fn median(&self) -> Self {1270let ldf = self.ldf.read().clone();1271let out = ldf.median();1272out.into()1273}12741275fn quantile(&self, quantile: PyExpr, interpolation: Wrap<QuantileMethod>) -> Self {1276let ldf = self.ldf.read().clone();1277let out = ldf.quantile(quantile.inner, interpolation.0);1278out.into()1279}12801281fn explode(&self, subset: PySelector, empty_as_null: bool, keep_nulls: bool) -> Self {1282self.ldf1283.read()1284.clone()1285.explode(1286subset.inner,1287ExplodeOptions {1288empty_as_null,1289keep_nulls,1290},1291)1292.into()1293}12941295fn null_count(&self) -> Self {1296let ldf = self.ldf.read().clone();1297ldf.null_count().into()1298}12991300#[pyo3(signature = (maintain_order, subset, keep))]1301fn unique(1302&self,1303maintain_order: bool,1304subset: Option<Vec<PyExpr>>,1305keep: Wrap<UniqueKeepStrategy>,1306) -> Self {1307let ldf = self.ldf.read().clone();1308let subset = subset.map(|exprs| exprs.into_iter().map(|e| e.inner).collect());1309match maintain_order {1310true => ldf.unique_stable_generic(subset, keep.0),1311false => ldf.unique_generic(subset, keep.0),1312}1313.into()1314}13151316fn drop_nans(&self, subset: Option<PySelector>) -> Self {1317self.ldf1318.read()1319.clone()1320.drop_nans(subset.map(|e| e.inner))1321.into()1322}13231324fn drop_nulls(&self, subset: Option<PySelector>) -> Self {1325self.ldf1326.read()1327.clone()1328.drop_nulls(subset.map(|e| e.inner))1329.into()1330}13311332#[pyo3(signature = (offset, len=None))]1333fn slice(&self, offset: i64, len: Option<IdxSize>) -> Self {1334let ldf = self.ldf.read().clone();1335ldf.slice(offset, len.unwrap_or(IdxSize::MAX)).into()1336}13371338fn tail(&self, n: IdxSize) -> Self {1339let ldf = self.ldf.read().clone();1340ldf.tail(n).into()1341}13421343#[cfg(feature = "pivot")]1344#[pyo3(signature = (on, on_columns, index, values, agg, maintain_order, separator))]1345fn pivot(1346&self,1347on: PySelector,1348on_columns: PyDataFrame,1349index: PySelector,1350values: PySelector,1351agg: PyExpr,1352maintain_order: bool,1353separator: String,1354) -> Self {1355let ldf = self.ldf.read().clone();1356ldf.pivot(1357on.inner,1358Arc::new(on_columns.df.read().clone()),1359index.inner,1360values.inner,1361agg.inner,1362maintain_order,1363separator.into(),1364)1365.into()1366}13671368#[cfg(feature = "pivot")]1369#[pyo3(signature = (on, index, value_name, variable_name))]1370fn unpivot(1371&self,1372on: Option<PySelector>,1373index: PySelector,1374value_name: Option<String>,1375variable_name: Option<String>,1376) -> Self {1377let args = UnpivotArgsDSL {1378on: on.map(|on| on.inner),1379index: index.inner,1380value_name: value_name.map(|s| s.into()),1381variable_name: variable_name.map(|s| s.into()),1382};13831384let ldf = self.ldf.read().clone();1385ldf.unpivot(args).into()1386}13871388#[pyo3(signature = (name, offset=None))]1389fn with_row_index(&self, name: &str, offset: Option<IdxSize>) -> Self {1390let ldf = self.ldf.read().clone();1391ldf.with_row_index(name, offset).into()1392}13931394#[pyo3(signature = (function, predicate_pushdown, projection_pushdown, slice_pushdown, streamable, schema, validate_output))]1395fn map_batches(1396&self,1397function: Py<PyAny>,1398predicate_pushdown: bool,1399projection_pushdown: bool,1400slice_pushdown: bool,1401streamable: bool,1402schema: Option<Wrap<Schema>>,1403validate_output: bool,1404) -> Self {1405let mut opt = OptFlags::default();1406opt.set(OptFlags::PREDICATE_PUSHDOWN, predicate_pushdown);1407opt.set(OptFlags::PROJECTION_PUSHDOWN, projection_pushdown);1408opt.set(OptFlags::SLICE_PUSHDOWN, slice_pushdown);1409opt.set(OptFlags::NEW_STREAMING, streamable);14101411self.ldf1412.read()1413.clone()1414.map_python(1415function.into(),1416opt,1417schema.map(|s| Arc::new(s.0)),1418validate_output,1419)1420.into()1421}14221423fn drop(&self, columns: PySelector) -> Self {1424self.ldf.read().clone().drop(columns.inner).into()1425}14261427fn cast(&self, dtypes: HashMap<PyBackedStr, Wrap<DataType>>, strict: bool) -> Self {1428let mut cast_map = PlHashMap::with_capacity(dtypes.len());1429cast_map.extend(dtypes.iter().map(|(k, v)| (k.as_ref(), v.0.clone())));1430self.ldf.read().clone().cast(cast_map, strict).into()1431}14321433fn cast_all(&self, dtype: PyDataTypeExpr, strict: bool) -> Self {1434self.ldf.read().clone().cast_all(dtype.inner, strict).into()1435}14361437fn clone(&self) -> Self {1438self.ldf.read().clone().into()1439}14401441fn collect_schema<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {1442let schema = py.enter_polars(|| self.ldf.write().collect_schema())?;14431444let schema_dict = PyDict::new(py);1445schema.iter_fields().for_each(|fld| {1446schema_dict1447.set_item(fld.name().as_str(), &Wrap(fld.dtype().clone()))1448.unwrap()1449});1450Ok(schema_dict)1451}14521453fn unnest(&self, columns: PySelector, separator: Option<&str>) -> Self {1454self.ldf1455.read()1456.clone()1457.unnest(columns.inner, separator.map(PlSmallStr::from_str))1458.into()1459}14601461fn count(&self) -> Self {1462let ldf = self.ldf.read().clone();1463ldf.count().into()1464}14651466#[cfg(feature = "merge_sorted")]1467fn merge_sorted(&self, other: Self, key: &str) -> PyResult<Self> {1468let out = self1469.ldf1470.read()1471.clone()1472.merge_sorted(other.ldf.into_inner(), key)1473.map_err(PyPolarsErr::from)?;1474Ok(out.into())1475}14761477fn _node_name(&self) -> &str {1478let plan = &self.ldf.read().logical_plan;1479plan.into()1480}14811482fn hint_sorted(1483&self,1484columns: Vec<String>,1485descending: Vec<bool>,1486nulls_last: Vec<bool>,1487) -> PyResult<Self> {1488if columns.len() != descending.len() && descending.len() != 1 {1489return Err(PyValueError::new_err(1490"`set_sorted` expects the same amount of `columns` as `descending` values.",1491));1492}1493if columns.len() != nulls_last.len() && nulls_last.len() != 1 {1494return Err(PyValueError::new_err(1495"`set_sorted` expects the same amount of `columns` as `nulls_last` values.",1496));1497}14981499let mut sorted = columns1500.iter()1501.map(|c| Sorted {1502column: PlSmallStr::from_str(c.as_str()),1503descending: Some(false),1504nulls_last: Some(false),1505})1506.collect::<Vec<_>>();15071508if !columns.is_empty() {1509if descending.len() != 1 {1510sorted1511.iter_mut()1512.zip(descending)1513.for_each(|(s, d)| s.descending = Some(d));1514} else if descending[0] {1515sorted.iter_mut().for_each(|s| s.descending = Some(true));1516}15171518if nulls_last.len() != 1 {1519sorted1520.iter_mut()1521.zip(nulls_last)1522.for_each(|(s, d)| s.nulls_last = Some(d));1523} else if nulls_last[0] {1524sorted.iter_mut().for_each(|s| s.nulls_last = Some(true));1525}1526}15271528let out = self1529.ldf1530.read()1531.clone()1532.hint(HintIR::Sorted(sorted.into()))1533.map_err(PyPolarsErr::from)?;1534Ok(out.into())1535}1536}15371538#[pyclass(frozen)]1539struct PyCollectBatches {1540inner: Arc<Mutex<CollectBatches>>,1541ldf: LazyFrame,1542}15431544#[pymethods]1545impl PyCollectBatches {1546fn start(&self) {1547self.inner.lock().start();1548}15491550fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {1551slf1552}15531554fn __next__(slf: PyRef<'_, Self>, py: Python) -> PyResult<Option<PyDataFrame>> {1555let inner = Arc::clone(&slf.inner);1556py.enter_polars(|| PolarsResult::Ok(inner.lock().next().transpose()?.map(PyDataFrame::new)))1557}15581559#[allow(unused_variables)]1560#[pyo3(signature = (requested_schema=None))]1561fn __arrow_c_stream__<'py>(1562&self,1563py: Python<'py>,1564requested_schema: Option<Py<PyAny>>,1565) -> PyResult<Bound<'py, PyCapsule>> {1566let mut ldf = self.ldf.clone();1567let schema = ldf1568.collect_schema()1569.map_err(PyPolarsErr::from)?1570.to_arrow(CompatLevel::newest());15711572let dtype = ArrowDataType::Struct(schema.into_iter_values().collect());15731574let iter = Box::new(ArrowStreamIterator::new(self.inner.clone(), dtype.clone()));1575let field = ArrowField::new(PlSmallStr::EMPTY, dtype, false);1576let stream = export_iterator(iter, field);1577let stream_capsule_name = CString::new("arrow_array_stream").unwrap();1578PyCapsule::new(py, stream, Some(stream_capsule_name))1579}1580}15811582pub struct ArrowStreamIterator {1583inner: Arc<Mutex<CollectBatches>>,1584dtype: ArrowDataType,1585}15861587impl ArrowStreamIterator {1588fn new(inner: Arc<Mutex<CollectBatches>>, schema: ArrowDataType) -> Self {1589Self {1590inner,1591dtype: schema,1592}1593}1594}15951596impl Iterator for ArrowStreamIterator {1597type Item = PolarsResult<ArrayRef>;15981599fn next(&mut self) -> Option<Self::Item> {1600let next = self.inner.lock().next();1601match next {1602None => None,1603Some(Err(err)) => Some(Err(err)),1604Some(Ok(df)) => {1605let height = df.height();1606let arrays = df.rechunk_into_arrow(CompatLevel::newest());1607Some(Ok(Box::new(arrow::array::StructArray::new(1608self.dtype.clone(),1609height,1610arrays,1611None,1612))))1613},1614}1615}1616}161716181619