Path: blob/main/crates/polars-stream/src/nodes/io_sinks/config.rs
8480 views
use std::num::NonZeroUsize;12use polars_core::schema::SchemaRef;3use polars_plan::dsl::file_provider::FileProviderType;4use polars_plan::dsl::{FileWriteFormat, SinkTarget, UnifiedSinkArgs};5use polars_utils::pl_path::{CloudScheme, PlRefPath};67use crate::nodes::io_sinks::components::hstack_columns::HStackColumns;8use crate::nodes::io_sinks::components::partitioner::Partitioner;9use crate::nodes::io_sinks::components::size::NonZeroRowCountAndSize;1011pub struct IOSinkNodeConfig {12pub file_format: FileWriteFormat,13pub target: IOSinkTarget,14pub unified_sink_args: UnifiedSinkArgs,15pub input_schema: SchemaRef,16}1718impl IOSinkNodeConfig {19pub fn num_pipelines_per_sink(&self, num_pipelines: NonZeroUsize) -> NonZeroUsize {20NonZeroUsize::min(num_pipelines, self.inflight_morsel_limit(num_pipelines))21}2223pub fn inflight_morsel_limit(&self, num_pipelines: NonZeroUsize) -> NonZeroUsize {24if let Ok(v) = std::env::var("POLARS_INFLIGHT_SINK_MORSEL_LIMIT").map(|x| {25x.parse::<NonZeroUsize>().unwrap_or_else(|_| {26panic!("invalid value for POLARS_INFLIGHT_SINK_MORSEL_LIMIT: {x}")27})28}) {29return v;30};3132NonZeroUsize::saturating_add(33num_pipelines,34// Additional buffer to accommodate head-of-line blocking354,36)37}3839pub fn max_open_sinks(&self) -> NonZeroUsize {40if let Ok(v) = std::env::var("POLARS_MAX_OPEN_SINKS").map(|x| {41x.parse::<NonZeroUsize>()42.unwrap_or_else(|_| panic!("invalid value for POLARS_MAX_OPEN_SINKS: {x}"))43}) {44return v;45}4647if self.target.is_cloud_location() {48const { NonZeroUsize::new(512).unwrap() }49} else {50const { NonZeroUsize::new(128).unwrap() }51}52}5354pub fn cloud_upload_chunk_size(&self) -> usize {55polars_io::configs::upload_chunk_size()56}5758pub fn partitioned_upload_chunk_size(&self) -> usize {59polars_io::configs::partitioned_upload_chunk_size()60}6162pub fn upload_concurrency(&self) -> NonZeroUsize {63polars_io::configs::upload_concurrency()64}6566pub fn partitioned_upload_concurrency(&self) -> NonZeroUsize {67polars_io::configs::partitioned_upload_concurrency()68}69}7071pub enum IOSinkTarget {72File(SinkTarget),73Partitioned(Box<PartitionedTarget>),74}7576impl IOSinkTarget {77pub fn is_cloud_location(&self) -> bool {78match self {79Self::File(v) => v.cloud_scheme(),80Self::Partitioned(v) => v.base_path.scheme(),81}82.is_some_and(|x| !matches!(x, CloudScheme::File | CloudScheme::FileNoHostname))83}84}8586pub struct PartitionedTarget {87pub base_path: PlRefPath,88pub file_path_provider: FileProviderType,89pub partitioner: Partitioner,90/// How to hstack the keys back into the dataframe (with_columns)91pub hstack_keys: Option<HStackColumns>,92pub include_keys_in_file: bool,93pub file_schema: SchemaRef,94pub file_size_limit: Option<NonZeroRowCountAndSize>,95}969798