Path: blob/main/crates/polars-stream/src/physical_plan/io/python_dataset.rs
6939 views
use std::sync::{Arc, Mutex};12use polars_core::config;3use polars_plan::plans::{ExpandedPythonScan, python_df_to_rust};4use polars_utils::format_pl_smallstr;56use crate::execute::StreamingExecutionState;7use crate::nodes::io_sources::batch::GetBatchFn;8use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;910/// Note: Currently used for iceberg fallback.11pub fn python_dataset_scan_to_reader_builder(12expanded_scan: &ExpandedPythonScan,13) -> Arc<dyn FileReaderBuilder> {14use polars_plan::dsl::python_dsl::PythonScanSource as S;15use pyo3::prelude::*;1617let (name, get_batch_fn) = match &expanded_scan.variant {18S::Pyarrow => {19// * Pyarrow is a oneshot function call.20// * Arc / Mutex because because closure cannot be FnOnce21let python_scan_function = Arc::new(Mutex::new(Some(expanded_scan.scan_fn.clone())));2223(24format_pl_smallstr!("python[{} @ pyarrow]", &expanded_scan.name),25Box::new(move |_state: &StreamingExecutionState| {26Python::with_gil(|py| {27let Some(python_scan_function) =28python_scan_function.lock().unwrap().take()29else {30return Ok(None);31};3233// Note: to_dataset_scan() has already captured projection / limit.3435let df = python_scan_function.call0(py)?;36let df = python_df_to_rust(py, df.bind(py).clone())?;3738Ok(Some(df))39})40}) as GetBatchFn,41)42},4344s => todo!("{:?}", s),45};4647use crate::nodes::io_sources::batch::builder::BatchFnReaderBuilder;48use crate::nodes::io_sources::batch::{BatchFnReader, GetBatchState};4950let reader = BatchFnReader {51name: name.clone(),52output_schema: None,53get_batch_state: Some(GetBatchState::from(get_batch_fn)),54execution_state: None,55verbose: config::verbose(),56};5758Arc::new(BatchFnReaderBuilder {59name,60reader: std::sync::Mutex::new(Some(reader)),61execution_state: Default::default(),62}) as Arc<dyn FileReaderBuilder>63}646566