Path: blob/main/crates/polars-stream/src/nodes/io_sinks/components/file_provider.rs
8480 views
use std::sync::Arc;12use polars_error::PolarsResult;3use polars_io::cloud::CloudOptions;4use polars_io::metrics::IOMetrics;5use polars_io::pl_async;6use polars_io::utils::file::Writeable;7use polars_plan::dsl::file_provider::{FileProviderReturn, FileProviderType};8use polars_plan::prelude::file_provider::FileProviderArgs;9use polars_utils::pl_path::PlRefPath;1011pub struct FileProvider {12pub base_path: PlRefPath,13pub cloud_options: Option<Arc<CloudOptions>>,14pub provider_type: FileProviderType,15pub upload_chunk_size: usize,16pub upload_max_concurrency: usize,17pub io_metrics: Option<Arc<IOMetrics>>,18}1920impl FileProvider {21pub async fn open_file(&self, args: FileProviderArgs) -> PolarsResult<Writeable> {22let provided_path: String = match &self.provider_type {23FileProviderType::Hive(v) => v.get_path(args)?,24FileProviderType::Function(f) => {25let f = f.clone();2627let out = pl_async::get_runtime()28.spawn_blocking(move || f.get_path_or_file(args))29.await30.unwrap()?;3132match out {33FileProviderReturn::Path(p) => p,34FileProviderReturn::Writeable(v) => return Ok(v),35}36},37};3839let path = self.base_path.join(&provided_path);4041if !path.has_scheme()42&& let Some(path) = path.parent()43{44// Ignore errors from directory creation - the `Writeable::try_new()` below will raise45// appropriate errors.46let _ = tokio::fs::DirBuilder::new()47.recursive(true)48.create(path)49.await;50}5152Writeable::try_new(53path,54self.cloud_options.as_deref(),55self.upload_chunk_size,56self.upload_max_concurrency,57self.io_metrics.clone(),58)59}60}616263