Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sinks2/components/file_provider.rs
7884 views
1
use std::fmt::Write;
2
use std::sync::Arc;
3
4
use polars_core::prelude::{Column, DataType};
5
use polars_error::PolarsResult;
6
use polars_io::cloud::CloudOptions;
7
use polars_io::pl_async;
8
use polars_io::utils::HIVE_VALUE_ENCODE_CHARSET;
9
use polars_io::utils::file::Writeable;
10
use polars_plan::dsl::sink2::{FileProviderReturn, FileProviderType};
11
use polars_plan::prelude::sink2::FileProviderArgs;
12
use polars_utils::plpath::PlPath;
13
14
pub struct FileProvider {
15
pub base_path: PlPath,
16
pub cloud_options: Option<Arc<CloudOptions>>,
17
pub provider_type: FileProviderType,
18
pub upload_chunk_size: usize,
19
}
20
21
impl FileProvider {
22
pub async fn open_file(&self, args: FileProviderArgs) -> PolarsResult<Writeable> {
23
let provided_path: String = match &self.provider_type {
24
FileProviderType::Hive { extension } => {
25
let FileProviderArgs {
26
index_in_partition,
27
partition_keys,
28
} = args;
29
30
let mut partition_parts = String::new();
31
32
let partition_keys: &[Column] = partition_keys.get_columns();
33
34
write!(
35
&mut partition_parts,
36
"{}",
37
HivePathFormatter::new(partition_keys)
38
)
39
.unwrap();
40
41
assert!(index_in_partition <= 0xffff_ffff);
42
43
write!(&mut partition_parts, "{index_in_partition:08x}.{extension}").unwrap();
44
45
partition_parts
46
},
47
48
FileProviderType::Function(f) => {
49
let f = f.clone();
50
51
let out = pl_async::get_runtime()
52
.spawn_blocking(move || f.get_file(args))
53
.await
54
.unwrap()?;
55
56
match out {
57
FileProviderReturn::Path(p) => p,
58
FileProviderReturn::Writeable(v) => return Ok(v),
59
}
60
},
61
62
FileProviderType::Legacy(_) => unreachable!(),
63
};
64
65
let path = self.base_path.as_ref().join(&provided_path);
66
let path = path.as_ref();
67
68
if let Some(path) = path.as_local_path().and_then(|p| p.parent()) {
69
// Ignore errors from directory creation - the `Writeable::try_new()` below will raise
70
// appropriate errors.
71
let _ = tokio::fs::DirBuilder::new()
72
.recursive(true)
73
.create(path)
74
.await;
75
}
76
77
Writeable::try_new(path, self.cloud_options.as_deref(), self.upload_chunk_size)
78
}
79
}
80
81
/// # Panics
82
/// The `Display` impl of this will panic if a column has non-unit length.
83
pub struct HivePathFormatter<'a> {
84
keys: &'a [Column],
85
}
86
87
impl<'a> HivePathFormatter<'a> {
88
pub fn new(keys: &'a [Column]) -> Self {
89
Self { keys }
90
}
91
}
92
93
impl std::fmt::Display for HivePathFormatter<'_> {
94
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95
for column in self.keys {
96
assert_eq!(column.len(), 1);
97
let column = column.cast(&DataType::String).unwrap();
98
99
let key = column.name();
100
let value = percent_encoding::percent_encode(
101
column
102
.str()
103
.unwrap()
104
.get(0)
105
.unwrap_or("__HIVE_DEFAULT_PARTITION__")
106
.as_bytes(),
107
HIVE_VALUE_ENCODE_CHARSET,
108
);
109
110
write!(f, "{key}={value}/")?
111
}
112
113
Ok(())
114
}
115
}
116
117