Path: blob/main/crates/polars-mem-engine/src/executors/sink.rs
6940 views
use std::borrow::Cow;1use std::sync::Arc;23use polars_core::frame::DataFrame;4use polars_core::schema::Schema;5use polars_error::PolarsResult;6use polars_expr::state::ExecutionState;7use polars_plan::dsl::{FileType, SinkTypeIR};8use polars_plan::plans::prune::prune;9use polars_plan::plans::{AExpr, IR, IRPlan};10use polars_utils::arena::{Arena, Node};1112use crate::{Executor, StreamingExecutorBuilder};1314pub struct PartitionedSinkExecutor {15name: String,16input_exec: Box<dyn Executor>,17input_scan_node: Node,18plan: IRPlan,19builder: StreamingExecutorBuilder,20}2122impl PartitionedSinkExecutor {23pub fn new(24input: Box<dyn Executor>,25builder: StreamingExecutorBuilder,26root: Node,27lp_arena: &mut Arena<IR>,28expr_arena: &Arena<AExpr>,29) -> Self {30// Create a DataFrame scan for injecting the input result31let scan = lp_arena.add(IR::DataFrameScan {32df: Arc::new(DataFrame::empty()),33schema: Arc::new(Schema::default()),34output_schema: None,35});3637let name = {38let IR::Sink {39input: sink_input,40payload: SinkTypeIR::Partition(part),41} = lp_arena.get_mut(root)42else {43unreachable!();44};4546// Set the scan as the sink input47*sink_input = scan;4849// Generate a name based on the sink file type50format!("sink_{}[partitioned]", sink_name(&part.file_type))51};5253// Prune the subplan into separate arenas54let mut new_ir_arena = Arena::new();55let mut new_expr_arena = Arena::new();56let [new_root, new_scan] = prune(57&[root, scan],58lp_arena,59expr_arena,60&mut new_ir_arena,61&mut new_expr_arena,62)63.try_into()64.unwrap();6566let plan = IRPlan {67lp_top: new_root,68lp_arena: new_ir_arena,69expr_arena: new_expr_arena,70};7172Self {73name,74input_exec: input,75input_scan_node: new_scan,76plan,77builder,78}79}80}8182impl Executor for PartitionedSinkExecutor {83fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {84state.should_stop()?;85#[cfg(debug_assertions)]86{87if state.verbose() {88eprintln!("run {}", self.name)89}90}91let input_df = self.input_exec.execute(state)?;9293let profile_name = if state.has_node_timer() {94Cow::Owned(format!(".{}()", &self.name))95} else {96Cow::Borrowed("")97};9899// Insert the input DataFrame into our DataFrame scan node100if let IR::DataFrameScan { df, schema, .. } =101self.plan.lp_arena.get_mut(self.input_scan_node)102{103*schema = input_df.schema().clone();104*df = Arc::new(input_df);105} else {106unreachable!();107}108109let mut streaming_exec = (self.builder)(110self.plan.lp_top,111&mut self.plan.lp_arena,112&mut self.plan.expr_arena,113)?;114115state116.clone()117.record(|| streaming_exec.execute(state), profile_name)118}119}120121pub fn sink_name(file_type: &FileType) -> &'static str {122match file_type {123#[cfg(feature = "parquet")]124FileType::Parquet(_) => "parquet",125#[cfg(feature = "ipc")]126FileType::Ipc(_) => "ipc",127#[cfg(feature = "csv")]128FileType::Csv(_) => "csv",129#[cfg(feature = "json")]130FileType::Json(_) => "json",131#[allow(unreachable_patterns)]132_ => panic!("enable filetype feature"),133}134}135136137