Path: blob/main/crates/polars-stream/src/nodes/io_sinks2/config.rs
7884 views
use std::sync::Arc;12use polars_core::prelude::SortMultipleOptions;3use polars_core::schema::SchemaRef;4use polars_plan::dsl::sink2::FileProviderType;5use polars_plan::dsl::{FileType, SinkTarget, UnifiedSinkArgs};6use polars_utils::plpath::{CloudScheme, PlPath};78use crate::expression::StreamExpr;9use crate::nodes::io_sinks2::components::hstack_columns::HStackColumns;10use crate::nodes::io_sinks2::components::partitioner::Partitioner;11use crate::nodes::io_sinks2::components::size::RowCountAndSize;1213pub struct IOSinkNodeConfig {14pub file_format: Arc<FileType>,15pub target: IOSinkTarget,16pub unified_sink_args: UnifiedSinkArgs,17pub input_schema: SchemaRef,18pub num_pipelines: usize,19}2021impl IOSinkNodeConfig {22pub fn per_sink_pipeline_depth(&self) -> usize {23self.inflight_morsel_limit().min(self.num_pipelines)24}2526pub fn inflight_morsel_limit(&self) -> usize {27if let Ok(v) = std::env::var("POLARS_INFLIGHT_SINK_MORSEL_LIMIT").map(|x| {28x.parse::<usize>()29.ok()30.filter(|x| *x > 0)31.unwrap_or_else(|| {32panic!("invalid value for POLARS_INFLIGHT_SINK_MORSEL_LIMIT: {x}")33})34}) {35return v;36};3738self.num_pipelines.saturating_add(39// Additional buffer to accommodate head-of-line blocking404,41)42}4344pub fn max_open_sinks(&self) -> usize {45if let Ok(v) = std::env::var("POLARS_MAX_OPEN_SINKS").map(|x| {46x.parse::<usize>()47.ok()48.filter(|x| *x > 0)49.unwrap_or_else(|| panic!("invalid value for POLARS_MAX_OPEN_SINKS: {x}"))50}) {51return v;52}5354if self.target.is_cloud_location() {5551256} else {5712858}59}6061pub fn cloud_upload_chunk_size(&self) -> usize {62polars_io::get_upload_chunk_size()63}6465pub fn partitioned_cloud_upload_chunk_size(&self) -> usize {66if let Ok(v) = std::env::var("POLARS_PARTITIONED_UPLOAD_CHUNK_SIZE").map(|x| {67x.parse::<usize>()68.ok()69.filter(|x| *x > 0)70.unwrap_or_else(|| {71panic!("invalid value for POLARS_PARTITIONED_UPLOAD_CHUNK_SIZE: {x}")72})73}) {74return v;75}76776 * 1024 * 102478}79}8081pub enum IOSinkTarget {82File(SinkTarget),83Partitioned(Box<PartitionedTarget>),84}8586impl IOSinkTarget {87pub fn is_cloud_location(&self) -> bool {88match self {89Self::File(v) => v.cloud_scheme(),90Self::Partitioned(v) => v.base_path.cloud_scheme(),91}92.is_some_and(|x| !matches!(x, CloudScheme::File | CloudScheme::FileNoHostname))93}94}9596pub struct PartitionedTarget {97pub base_path: PlPath,98pub file_path_provider: FileProviderType,99pub partitioner: Partitioner,100/// How to hstack the keys back into the dataframe (with_columns)101pub hstack_keys: Option<HStackColumns>,102pub include_keys_in_file: bool,103pub file_schema: SchemaRef,104pub file_size_limit: Option<RowCountAndSize>,105pub per_partition_sort: Option<(Arc<[StreamExpr]>, SortMultipleOptions)>,106}107108109