Path: blob/main/crates/polars-stream/src/utils/late_materialized_df.rs
8327 views
use std::sync::Arc;12use parking_lot::Mutex;3use polars_core::frame::DataFrame;4use polars_core::schema::Schema;5use polars_error::PolarsResult;6use polars_plan::dsl::{FileScanIR, ScanSources};7use polars_plan::plans::{AnonymousScan, AnonymousScanArgs, FileInfo, IR};8use polars_plan::prelude::{AnonymousScanOptions, UnifiedScanArgs};910/// Used to insert a dataframe into in-memory-engine query plan after the query11/// plan has been made.12#[derive(Default)]13pub struct LateMaterializedDataFrame {14df: Mutex<Option<DataFrame>>,15}1617impl LateMaterializedDataFrame {18pub fn set_materialized_dataframe(&self, df: DataFrame) {19*self.df.lock() = Some(df);20}2122pub fn as_ir_node(self: Arc<Self>, schema: Arc<Schema>) -> IR {23let options = Arc::new(AnonymousScanOptions {24skip_rows: None,25fmt_str: "LateMaterializedDataFrame",26});27IR::Scan {28sources: ScanSources::Paths(Default::default()),29file_info: FileInfo::new(schema, None, (None, usize::MAX)),30hive_parts: None,31predicate: None,32predicate_file_skip_applied: None,33output_schema: None,34scan_type: Box::new(FileScanIR::Anonymous {35options,36function: self,37}),38unified_scan_args: Box::new(UnifiedScanArgs::default()),39}40}41}4243impl AnonymousScan for LateMaterializedDataFrame {44fn as_any(&self) -> &dyn std::any::Any {45unimplemented!()46}4748fn scan(&self, _scan_opts: AnonymousScanArgs) -> PolarsResult<DataFrame> {49Ok(self.df.lock().take().unwrap())50}51}525354