Path: blob/main/crates/polars-stream/src/nodes/io_sinks2/writers/mod.rs
7884 views
use std::sync::Arc;12use polars_core::schema::SchemaRef;3use polars_error::PolarsResult;4use polars_io::utils::sync_on_close::SyncOnCloseType;5use polars_plan::dsl::FileType;6use polars_utils::IdxSize;78use crate::nodes::io_sinks2::writers::interface::FileWriterStarter;910pub mod interface;11#[cfg(feature = "ipc")]12pub mod ipc;13#[cfg(feature = "parquet")]14pub mod parquet;1516pub fn create_file_writer_starter(17file_format: &Arc<FileType>,18file_schema: &SchemaRef,19pipeline_depth: usize,20sync_on_close: SyncOnCloseType,21) -> PolarsResult<Arc<dyn FileWriterStarter>> {22Ok(match file_format.as_ref() {23#[cfg(feature = "parquet")]24FileType::Parquet(options) => {25use polars_core::prelude::CompatLevel;26use polars_io::schema_to_arrow_checked;2728use crate::nodes::io_sinks2::writers::parquet::ParquetWriterStarter;2930let arrow_schema = Arc::new(schema_to_arrow_checked(31file_schema.as_ref(),32CompatLevel::newest(),33"",34)?);3536Arc::new(ParquetWriterStarter {37options: options.clone(),38arrow_schema,39initialized_state: Default::default(),40pipeline_depth,41sync_on_close,42row_group_size: options43.row_group_size44.map(|x| IdxSize::try_from(x).unwrap()),45}) as _46},47#[cfg(feature = "ipc")]48FileType::Ipc(options) => {49use crate::nodes::io_sinks2::writers::ipc::IpcWriterStarter;5051Arc::new(IpcWriterStarter {52options: *options,53schema: file_schema.clone(),54pipeline_depth,55sync_on_close,56}) as _57},58_ => unimplemented!(),59})60}616263