Path: blob/main/crates/polars-stream/src/utils/late_materialized_df.rs
6939 views
use std::sync::Arc;12use parking_lot::Mutex;3use polars_core::frame::DataFrame;4use polars_core::schema::Schema;5use polars_error::PolarsResult;6use polars_plan::dsl::{FileScanIR, ScanSources};7use polars_plan::plans::{AnonymousScan, AnonymousScanArgs, FileInfo, IR};8use polars_plan::prelude::{AnonymousScanOptions, UnifiedScanArgs};910/// Used to insert a dataframe into in-memory-engine query plan after the query11/// plan has been made.12#[derive(Default)]13pub struct LateMaterializedDataFrame {14df: Mutex<Option<DataFrame>>,15}1617impl LateMaterializedDataFrame {18pub fn set_materialized_dataframe(&self, df: DataFrame) {19*self.df.lock() = Some(df);20}2122pub fn as_ir_node(self: Arc<Self>, schema: Arc<Schema>) -> IR {23let options = Arc::new(AnonymousScanOptions {24skip_rows: None,25fmt_str: "LateMaterializedDataFrame",26});27IR::Scan {28sources: ScanSources::Paths(Arc::default()),29file_info: FileInfo::new(schema, None, (None, usize::MAX)),30hive_parts: None,31predicate: None,32output_schema: None,33scan_type: Box::new(FileScanIR::Anonymous {34options,35function: self,36}),37unified_scan_args: Box::new(UnifiedScanArgs::default()),38}39}40}4142impl AnonymousScan for LateMaterializedDataFrame {43fn as_any(&self) -> &dyn std::any::Any {44unimplemented!()45}4647fn scan(&self, _scan_opts: AnonymousScanArgs) -> PolarsResult<DataFrame> {48Ok(self.df.lock().take().unwrap())49}50}515253