Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/physical_plan/io/python_dataset.rs
6939 views
1
use std::sync::{Arc, Mutex};
2
3
use polars_core::config;
4
use polars_plan::plans::{ExpandedPythonScan, python_df_to_rust};
5
use polars_utils::format_pl_smallstr;
6
7
use crate::execute::StreamingExecutionState;
8
use crate::nodes::io_sources::batch::GetBatchFn;
9
use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;
10
11
/// Note: Currently used for iceberg fallback.
12
pub fn python_dataset_scan_to_reader_builder(
13
expanded_scan: &ExpandedPythonScan,
14
) -> Arc<dyn FileReaderBuilder> {
15
use polars_plan::dsl::python_dsl::PythonScanSource as S;
16
use pyo3::prelude::*;
17
18
let (name, get_batch_fn) = match &expanded_scan.variant {
19
S::Pyarrow => {
20
// * Pyarrow is a oneshot function call.
21
// * Arc / Mutex because because closure cannot be FnOnce
22
let python_scan_function = Arc::new(Mutex::new(Some(expanded_scan.scan_fn.clone())));
23
24
(
25
format_pl_smallstr!("python[{} @ pyarrow]", &expanded_scan.name),
26
Box::new(move |_state: &StreamingExecutionState| {
27
Python::with_gil(|py| {
28
let Some(python_scan_function) =
29
python_scan_function.lock().unwrap().take()
30
else {
31
return Ok(None);
32
};
33
34
// Note: to_dataset_scan() has already captured projection / limit.
35
36
let df = python_scan_function.call0(py)?;
37
let df = python_df_to_rust(py, df.bind(py).clone())?;
38
39
Ok(Some(df))
40
})
41
}) as GetBatchFn,
42
)
43
},
44
45
s => todo!("{:?}", s),
46
};
47
48
use crate::nodes::io_sources::batch::builder::BatchFnReaderBuilder;
49
use crate::nodes::io_sources::batch::{BatchFnReader, GetBatchState};
50
51
let reader = BatchFnReader {
52
name: name.clone(),
53
output_schema: None,
54
get_batch_state: Some(GetBatchState::from(get_batch_fn)),
55
execution_state: None,
56
verbose: config::verbose(),
57
};
58
59
Arc::new(BatchFnReaderBuilder {
60
name,
61
reader: std::sync::Mutex::new(Some(reader)),
62
execution_state: Default::default(),
63
}) as Arc<dyn FileReaderBuilder>
64
}
65
66