Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/hive.rs
8446 views
1
use std::path::{Component, Path};
2
3
use polars_core::prelude::*;
4
use polars_io::prelude::schema_inference::{finish_infer_field_schema, infer_field_schema};
5
use polars_utils::pl_path::PlRefPath;
6
#[cfg(feature = "serde")]
7
use serde::{Deserialize, Serialize};
8
9
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
10
#[derive(Debug, Clone)]
11
pub struct HivePartitionsDf(DataFrame);
12
13
impl HivePartitionsDf {
14
/// Filter the columns to those contained in `projected_columns`.
15
pub fn filter_columns(&self, projected_columns: &Schema) -> Self {
16
let columns: Vec<_> = self
17
.df()
18
.columns()
19
.iter()
20
.filter(|c| projected_columns.contains(c.name()))
21
.cloned()
22
.collect();
23
24
let height = self.df().height();
25
unsafe { DataFrame::new_unchecked(height, columns) }.into()
26
}
27
28
pub fn df(&self) -> &DataFrame {
29
&self.0
30
}
31
32
pub fn schema(&self) -> &SchemaRef {
33
self.0.schema()
34
}
35
}
36
37
impl From<DataFrame> for HivePartitionsDf {
38
fn from(value: DataFrame) -> Self {
39
Self(value)
40
}
41
}
42
43
/// Note: Returned hive partitions are ordered by their position in the `reader_schema`
44
///
45
/// # Safety
46
/// `hive_start_idx <= [min path length]`
47
pub fn hive_partitions_from_paths(
48
paths: &[PlRefPath],
49
hive_start_idx: usize,
50
schema: Option<SchemaRef>,
51
reader_schema: &Schema,
52
try_parse_dates: bool,
53
) -> PolarsResult<Option<HivePartitionsDf>> {
54
let Some(path) = paths.first() else {
55
return Ok(None);
56
};
57
58
// generate an iterator for path segments
59
fn get_normal_components(path: &Path) -> Box<dyn Iterator<Item = &str> + '_> {
60
Box::new(path.components().filter_map(|c| match c {
61
Component::Normal(seg) => Some(seg.to_str().unwrap()),
62
_ => None,
63
}))
64
}
65
66
fn parse_hive_string_and_decode(part: &'_ str) -> Option<(&'_ str, std::borrow::Cow<'_, str>)> {
67
let (k, v) = parse_hive_string(part)?;
68
let v = percent_encoding::percent_decode(v.as_bytes())
69
.decode_utf8()
70
.ok()?;
71
72
Some((k, v))
73
}
74
75
// generate (k,v) tuples from 'k=v' partition strings
76
macro_rules! get_hive_parts_iter {
77
($pl_path:expr) => {{
78
let path: &Path = $pl_path.as_std_path();
79
let file_index = get_normal_components(path).count() - 1;
80
let path_parts = get_normal_components(path);
81
82
path_parts.enumerate().filter_map(move |(index, part)| {
83
if index == file_index {
84
return None;
85
}
86
87
parse_hive_string_and_decode(part)
88
})
89
}};
90
}
91
92
let hive_schema = if let Some(ref schema) = schema {
93
let path = path.sliced(hive_start_idx..path.as_str().len());
94
Arc::new(get_hive_parts_iter!(&path).map(|(name, _)| {
95
let Some(dtype) = schema.get(name) else {
96
polars_bail!(
97
SchemaFieldNotFound:
98
"path contains column not present in the given Hive schema: {:?}, path = {:?}",
99
name,
100
path
101
)
102
};
103
104
let dtype = if !try_parse_dates && dtype.is_temporal() {
105
DataType::String
106
} else {
107
dtype.clone()
108
};
109
110
Ok(Field::new(PlSmallStr::from_str(name), dtype))
111
}).collect::<PolarsResult<Schema>>()?)
112
} else {
113
let path = path.sliced(hive_start_idx..path.as_str().len());
114
115
let mut hive_schema = Schema::with_capacity(16);
116
let mut schema_inference_map: PlHashMap<&str, PlHashSet<DataType>> =
117
PlHashMap::with_capacity(16);
118
119
for (name, _) in get_hive_parts_iter!(&path) {
120
// If the column is also in the file we can use the dtype stored there.
121
if let Some(dtype) = reader_schema.get(name) {
122
let dtype = if !try_parse_dates && dtype.is_temporal() {
123
DataType::String
124
} else {
125
dtype.clone()
126
};
127
128
hive_schema.insert_at_index(hive_schema.len(), name.into(), dtype.clone())?;
129
continue;
130
}
131
132
hive_schema.insert_at_index(hive_schema.len(), name.into(), DataType::String)?;
133
schema_inference_map.insert(name, PlHashSet::with_capacity(4));
134
}
135
136
if hive_schema.is_empty() && schema_inference_map.is_empty() {
137
return Ok(None);
138
}
139
140
if !schema_inference_map.is_empty() {
141
for path in paths {
142
let path = path.sliced(hive_start_idx..path.as_str().len());
143
for (name, value) in get_hive_parts_iter!(&path) {
144
let Some(entry) = schema_inference_map.get_mut(name) else {
145
continue;
146
};
147
148
if value.is_empty() || value == "__HIVE_DEFAULT_PARTITION__" {
149
continue;
150
}
151
152
entry.insert(infer_field_schema(value.as_ref(), try_parse_dates, false));
153
}
154
}
155
156
for (name, ref possibilities) in schema_inference_map.drain() {
157
let dtype = finish_infer_field_schema(possibilities);
158
*hive_schema.try_get_mut(name).unwrap() = dtype;
159
}
160
}
161
Arc::new(hive_schema)
162
};
163
164
let mut buffers = polars_io::csv::read::builder::init_builders(
165
&(0..hive_schema.len()).collect::<Vec<_>>(),
166
paths.len(),
167
hive_schema.as_ref(),
168
None,
169
polars_io::prelude::CsvEncoding::Utf8,
170
false,
171
)?;
172
173
for path in paths {
174
let path = path.sliced(hive_start_idx..path.as_str().len());
175
for (name, value) in get_hive_parts_iter!(&path) {
176
let Some(index) = hive_schema.index_of(name) else {
177
polars_bail!(
178
SchemaFieldNotFound:
179
"path contains column not present in the given Hive schema: {:?}, path = {:?}",
180
name,
181
path
182
)
183
};
184
185
let buf = buffers.get_mut(index).unwrap();
186
187
if value != "__HIVE_DEFAULT_PARTITION__" {
188
buf.add(value.as_bytes(), false, false, false)?;
189
} else {
190
buf.add_null(false);
191
}
192
}
193
}
194
195
let mut buffers = buffers
196
.into_iter()
197
.map(|x| Ok(x.into_series()?.into_column()))
198
.collect::<PolarsResult<Vec<_>>>()?;
199
buffers.sort_by_key(|s| reader_schema.index_of(s.name()).unwrap_or(usize::MAX));
200
201
Ok(Some(HivePartitionsDf(DataFrame::new(
202
paths.len(),
203
buffers,
204
)?)))
205
}
206
207
/// Parse a Hive partition string (e.g. "column=1.5") into a name and value part.
208
///
209
/// Returns `None` if the string is not a Hive partition string.
210
fn parse_hive_string(part: &'_ str) -> Option<(&'_ str, &'_ str)> {
211
let mut it = part.split('=');
212
let name = it.next()?;
213
let value = it.next()?;
214
215
// Having multiple '=' doesn't seem like a valid Hive partition.
216
if it.next().is_some() {
217
return None;
218
}
219
220
// Files are not Hive partitions, so globs are not valid.
221
if value.contains('*') {
222
return None;
223
};
224
225
Some((name, value))
226
}
227
228