Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sinks/components/file_provider.rs
8480 views
1
use std::sync::Arc;
2
3
use polars_error::PolarsResult;
4
use polars_io::cloud::CloudOptions;
5
use polars_io::metrics::IOMetrics;
6
use polars_io::pl_async;
7
use polars_io::utils::file::Writeable;
8
use polars_plan::dsl::file_provider::{FileProviderReturn, FileProviderType};
9
use polars_plan::prelude::file_provider::FileProviderArgs;
10
use polars_utils::pl_path::PlRefPath;
11
12
pub struct FileProvider {
13
pub base_path: PlRefPath,
14
pub cloud_options: Option<Arc<CloudOptions>>,
15
pub provider_type: FileProviderType,
16
pub upload_chunk_size: usize,
17
pub upload_max_concurrency: usize,
18
pub io_metrics: Option<Arc<IOMetrics>>,
19
}
20
21
impl FileProvider {
22
pub async fn open_file(&self, args: FileProviderArgs) -> PolarsResult<Writeable> {
23
let provided_path: String = match &self.provider_type {
24
FileProviderType::Hive(v) => v.get_path(args)?,
25
FileProviderType::Function(f) => {
26
let f = f.clone();
27
28
let out = pl_async::get_runtime()
29
.spawn_blocking(move || f.get_path_or_file(args))
30
.await
31
.unwrap()?;
32
33
match out {
34
FileProviderReturn::Path(p) => p,
35
FileProviderReturn::Writeable(v) => return Ok(v),
36
}
37
},
38
};
39
40
let path = self.base_path.join(&provided_path);
41
42
if !path.has_scheme()
43
&& let Some(path) = path.parent()
44
{
45
// Ignore errors from directory creation - the `Writeable::try_new()` below will raise
46
// appropriate errors.
47
let _ = tokio::fs::DirBuilder::new()
48
.recursive(true)
49
.create(path)
50
.await;
51
}
52
53
Writeable::try_new(
54
path,
55
self.cloud_options.as_deref(),
56
self.upload_chunk_size,
57
self.upload_max_concurrency,
58
self.io_metrics.clone(),
59
)
60
}
61
}
62
63