Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/hive.rs
6940 views
1
use std::path::{Component, Path};
2
3
use polars_core::prelude::*;
4
use polars_io::prelude::schema_inference::{finish_infer_field_schema, infer_field_schema};
5
use polars_utils::plpath::PlPath;
6
#[cfg(feature = "serde")]
7
use serde::{Deserialize, Serialize};
8
9
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
10
#[derive(Debug, Clone)]
11
pub struct HivePartitionsDf(DataFrame);
12
13
impl HivePartitionsDf {
14
pub fn get_projection_schema_and_indices(
15
&self,
16
names: &PlHashSet<PlSmallStr>,
17
) -> (SchemaRef, Vec<usize>) {
18
let mut out_schema = Schema::with_capacity(self.schema().len());
19
let mut out_indices = Vec::with_capacity(self.0.get_columns().len());
20
21
for (i, column) in self.0.get_columns().iter().enumerate() {
22
let name = column.name();
23
if names.contains(name.as_str()) {
24
out_indices.push(i);
25
out_schema
26
.insert_at_index(out_schema.len(), name.clone(), column.dtype().clone())
27
.unwrap();
28
}
29
}
30
31
(out_schema.into(), out_indices)
32
}
33
34
pub fn apply_projection(&mut self, column_indices: &[usize]) {
35
let schema = self.schema();
36
let projected_schema = schema.try_project_indices(column_indices).unwrap();
37
self.0 = self.0.select(projected_schema.iter_names_cloned()).unwrap();
38
}
39
40
pub fn take_indices(&self, row_indexes: &[IdxSize]) -> Self {
41
if !row_indexes.is_empty() {
42
let mut max_idx = 0;
43
for &i in row_indexes {
44
max_idx = max_idx.max(i);
45
}
46
assert!(max_idx < self.0.height() as IdxSize);
47
}
48
// SAFETY: Checked bounds before.
49
Self(unsafe { self.0.take_slice_unchecked(row_indexes) })
50
}
51
52
pub fn df(&self) -> &DataFrame {
53
&self.0
54
}
55
56
pub fn schema(&self) -> &SchemaRef {
57
self.0.schema()
58
}
59
}
60
61
impl From<DataFrame> for HivePartitionsDf {
62
fn from(value: DataFrame) -> Self {
63
Self(value)
64
}
65
}
66
67
/// Note: Returned hive partitions are ordered by their position in the `reader_schema`
68
///
69
/// # Safety
70
/// `hive_start_idx <= [min path length]`
71
pub fn hive_partitions_from_paths(
72
paths: &[PlPath],
73
hive_start_idx: usize,
74
schema: Option<SchemaRef>,
75
reader_schema: &Schema,
76
try_parse_dates: bool,
77
) -> PolarsResult<Option<HivePartitionsDf>> {
78
let Some(path) = paths.first() else {
79
return Ok(None);
80
};
81
82
// generate an iterator for path segments
83
fn get_normal_components(path: &Path) -> Box<dyn Iterator<Item = &str> + '_> {
84
Box::new(path.components().filter_map(|c| match c {
85
Component::Normal(seg) => Some(seg.to_str().unwrap()),
86
_ => None,
87
}))
88
}
89
90
fn parse_hive_string_and_decode(part: &'_ str) -> Option<(&'_ str, std::borrow::Cow<'_, str>)> {
91
let (k, v) = parse_hive_string(part)?;
92
let v = percent_encoding::percent_decode(v.as_bytes())
93
.decode_utf8()
94
.ok()?;
95
96
Some((k, v))
97
}
98
99
// generate (k,v) tuples from 'k=v' partition strings
100
macro_rules! get_hive_parts_iter {
101
($e:expr) => {{
102
let file_index = get_normal_components($e).count() - 1;
103
let path_parts = get_normal_components($e);
104
105
path_parts.enumerate().filter_map(move |(index, part)| {
106
if index == file_index {
107
return None;
108
}
109
110
parse_hive_string_and_decode(part)
111
})
112
}};
113
}
114
115
let hive_schema = if let Some(ref schema) = schema {
116
let path = path.as_ref();
117
let path = path.offset_bytes(hive_start_idx);
118
Arc::new(get_hive_parts_iter!(&path.as_path()).map(|(name, _)| {
119
let Some(dtype) = schema.get(name) else {
120
polars_bail!(
121
SchemaFieldNotFound:
122
"path contains column not present in the given Hive schema: {:?}, path = {:?}",
123
name,
124
path
125
)
126
};
127
128
let dtype = if !try_parse_dates && dtype.is_temporal() {
129
DataType::String
130
} else {
131
dtype.clone()
132
};
133
134
Ok(Field::new(PlSmallStr::from_str(name), dtype))
135
}).collect::<PolarsResult<Schema>>()?)
136
} else {
137
let path = path.as_ref();
138
let path = path.offset_bytes(hive_start_idx);
139
140
let mut hive_schema = Schema::with_capacity(16);
141
let mut schema_inference_map: PlHashMap<&str, PlHashSet<DataType>> =
142
PlHashMap::with_capacity(16);
143
144
for (name, _) in get_hive_parts_iter!(&path.as_path()) {
145
// If the column is also in the file we can use the dtype stored there.
146
if let Some(dtype) = reader_schema.get(name) {
147
let dtype = if !try_parse_dates && dtype.is_temporal() {
148
DataType::String
149
} else {
150
dtype.clone()
151
};
152
153
hive_schema.insert_at_index(hive_schema.len(), name.into(), dtype.clone())?;
154
continue;
155
}
156
157
hive_schema.insert_at_index(hive_schema.len(), name.into(), DataType::String)?;
158
schema_inference_map.insert(name, PlHashSet::with_capacity(4));
159
}
160
161
if hive_schema.is_empty() && schema_inference_map.is_empty() {
162
return Ok(None);
163
}
164
165
if !schema_inference_map.is_empty() {
166
for path in paths {
167
let path = path.as_ref();
168
let path = path.offset_bytes(hive_start_idx);
169
for (name, value) in get_hive_parts_iter!(&path.as_path()) {
170
let Some(entry) = schema_inference_map.get_mut(name) else {
171
continue;
172
};
173
174
if value.is_empty() || value == "__HIVE_DEFAULT_PARTITION__" {
175
continue;
176
}
177
178
entry.insert(infer_field_schema(value.as_ref(), try_parse_dates, false));
179
}
180
}
181
182
for (name, ref possibilities) in schema_inference_map.drain() {
183
let dtype = finish_infer_field_schema(possibilities);
184
*hive_schema.try_get_mut(name).unwrap() = dtype;
185
}
186
}
187
Arc::new(hive_schema)
188
};
189
190
let mut buffers = polars_io::csv::read::buffer::init_buffers(
191
&(0..hive_schema.len()).collect::<Vec<_>>(),
192
paths.len(),
193
hive_schema.as_ref(),
194
None,
195
polars_io::prelude::CsvEncoding::Utf8,
196
false,
197
)?;
198
199
for path in paths {
200
let path = path.as_ref();
201
let path = path.offset_bytes(hive_start_idx);
202
for (name, value) in get_hive_parts_iter!(&path.as_path()) {
203
let Some(index) = hive_schema.index_of(name) else {
204
polars_bail!(
205
SchemaFieldNotFound:
206
"path contains column not present in the given Hive schema: {:?}, path = {:?}",
207
name,
208
path
209
)
210
};
211
212
let buf = buffers.get_mut(index).unwrap();
213
214
if !value.is_empty() && value != "__HIVE_DEFAULT_PARTITION__" {
215
buf.add(value.as_bytes(), false, false, false)?;
216
} else {
217
buf.add_null(false);
218
}
219
}
220
}
221
222
let mut buffers = buffers
223
.into_iter()
224
.map(|x| Ok(x.into_series()?.into_column()))
225
.collect::<PolarsResult<Vec<_>>>()?;
226
buffers.sort_by_key(|s| reader_schema.index_of(s.name()).unwrap_or(usize::MAX));
227
228
Ok(Some(HivePartitionsDf(DataFrame::new_with_height(
229
paths.len(),
230
buffers,
231
)?)))
232
}
233
234
/// Parse a Hive partition string (e.g. "column=1.5") into a name and value part.
235
///
236
/// Returns `None` if the string is not a Hive partition string.
237
fn parse_hive_string(part: &'_ str) -> Option<(&'_ str, &'_ str)> {
238
let mut it = part.split('=');
239
let name = it.next()?;
240
let value = it.next()?;
241
242
// Having multiple '=' doesn't seem like a valid Hive partition.
243
if it.next().is_some() {
244
return None;
245
}
246
247
// Files are not Hive partitions, so globs are not valid.
248
if value.contains('*') {
249
return None;
250
};
251
252
Some((name, value))
253
}
254
255