Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/utils/late_materialized_df.rs
8327 views
1
use std::sync::Arc;
2
3
use parking_lot::Mutex;
4
use polars_core::frame::DataFrame;
5
use polars_core::schema::Schema;
6
use polars_error::PolarsResult;
7
use polars_plan::dsl::{FileScanIR, ScanSources};
8
use polars_plan::plans::{AnonymousScan, AnonymousScanArgs, FileInfo, IR};
9
use polars_plan::prelude::{AnonymousScanOptions, UnifiedScanArgs};
10
11
/// Used to insert a dataframe into in-memory-engine query plan after the query
12
/// plan has been made.
13
#[derive(Default)]
14
pub struct LateMaterializedDataFrame {
15
df: Mutex<Option<DataFrame>>,
16
}
17
18
impl LateMaterializedDataFrame {
19
pub fn set_materialized_dataframe(&self, df: DataFrame) {
20
*self.df.lock() = Some(df);
21
}
22
23
pub fn as_ir_node(self: Arc<Self>, schema: Arc<Schema>) -> IR {
24
let options = Arc::new(AnonymousScanOptions {
25
skip_rows: None,
26
fmt_str: "LateMaterializedDataFrame",
27
});
28
IR::Scan {
29
sources: ScanSources::Paths(Default::default()),
30
file_info: FileInfo::new(schema, None, (None, usize::MAX)),
31
hive_parts: None,
32
predicate: None,
33
predicate_file_skip_applied: None,
34
output_schema: None,
35
scan_type: Box::new(FileScanIR::Anonymous {
36
options,
37
function: self,
38
}),
39
unified_scan_args: Box::new(UnifiedScanArgs::default()),
40
}
41
}
42
}
43
44
impl AnonymousScan for LateMaterializedDataFrame {
45
fn as_any(&self) -> &dyn std::any::Any {
46
unimplemented!()
47
}
48
49
fn scan(&self, _scan_opts: AnonymousScanArgs) -> PolarsResult<DataFrame> {
50
Ok(self.df.lock().take().unwrap())
51
}
52
}
53
54