Path: blob/main/crates/polars-stream/src/nodes/io_sinks2/components/file_provider.rs
7884 views
use std::fmt::Write;1use std::sync::Arc;23use polars_core::prelude::{Column, DataType};4use polars_error::PolarsResult;5use polars_io::cloud::CloudOptions;6use polars_io::pl_async;7use polars_io::utils::HIVE_VALUE_ENCODE_CHARSET;8use polars_io::utils::file::Writeable;9use polars_plan::dsl::sink2::{FileProviderReturn, FileProviderType};10use polars_plan::prelude::sink2::FileProviderArgs;11use polars_utils::plpath::PlPath;1213pub struct FileProvider {14pub base_path: PlPath,15pub cloud_options: Option<Arc<CloudOptions>>,16pub provider_type: FileProviderType,17pub upload_chunk_size: usize,18}1920impl FileProvider {21pub async fn open_file(&self, args: FileProviderArgs) -> PolarsResult<Writeable> {22let provided_path: String = match &self.provider_type {23FileProviderType::Hive { extension } => {24let FileProviderArgs {25index_in_partition,26partition_keys,27} = args;2829let mut partition_parts = String::new();3031let partition_keys: &[Column] = partition_keys.get_columns();3233write!(34&mut partition_parts,35"{}",36HivePathFormatter::new(partition_keys)37)38.unwrap();3940assert!(index_in_partition <= 0xffff_ffff);4142write!(&mut partition_parts, "{index_in_partition:08x}.{extension}").unwrap();4344partition_parts45},4647FileProviderType::Function(f) => {48let f = f.clone();4950let out = pl_async::get_runtime()51.spawn_blocking(move || f.get_file(args))52.await53.unwrap()?;5455match out {56FileProviderReturn::Path(p) => p,57FileProviderReturn::Writeable(v) => return Ok(v),58}59},6061FileProviderType::Legacy(_) => unreachable!(),62};6364let path = self.base_path.as_ref().join(&provided_path);65let path = path.as_ref();6667if let Some(path) = path.as_local_path().and_then(|p| p.parent()) {68// Ignore errors from directory creation - the `Writeable::try_new()` below will raise69// appropriate errors.70let _ = tokio::fs::DirBuilder::new()71.recursive(true)72.create(path)73.await;74}7576Writeable::try_new(path, self.cloud_options.as_deref(), self.upload_chunk_size)77}78}7980/// # Panics81/// The `Display` impl of this will panic if a column has non-unit length.82pub struct HivePathFormatter<'a> {83keys: &'a [Column],84}8586impl<'a> HivePathFormatter<'a> {87pub fn new(keys: &'a [Column]) -> Self {88Self { keys }89}90}9192impl std::fmt::Display for HivePathFormatter<'_> {93fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {94for column in self.keys {95assert_eq!(column.len(), 1);96let column = column.cast(&DataType::String).unwrap();9798let key = column.name();99let value = percent_encoding::percent_encode(100column101.str()102.unwrap()103.get(0)104.unwrap_or("__HIVE_DEFAULT_PARTITION__")105.as_bytes(),106HIVE_VALUE_ENCODE_CHARSET,107);108109write!(f, "{key}={value}/")?110}111112Ok(())113}114}115116117