Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/hive.rs
8436 views
1
use polars_core::frame::DataFrame;
2
use polars_core::frame::column::ScalarColumn;
3
use polars_core::prelude::{Column, DataType};
4
use polars_core::series::Series;
5
6
use crate::utils::HIVE_VALUE_ENCODE_CHARSET;
7
8
/// Materializes hive partitions.
9
/// We have a special num_rows arg, as df can be empty when a projection contains
10
/// only hive partition columns.
11
///
12
/// The `hive_partition_columns` must be ordered by their position in the `reader_schema`. The
13
/// columns will be materialized by their positions in the file schema if they exist, or otherwise
14
/// at the end.
15
///
16
/// # Safety
17
///
18
/// num_rows equals the height of the df when the df height is non-zero.
19
pub(crate) fn materialize_hive_partitions<F, M>(
20
df: &mut DataFrame,
21
reader_schema: &polars_schema::Schema<F, M>,
22
hive_partition_columns: Option<&[Series]>,
23
) {
24
let num_rows = df.height();
25
26
if let Some(hive_columns) = hive_partition_columns {
27
// Insert these hive columns in the order they are stored in the file.
28
if hive_columns.is_empty() {
29
return;
30
}
31
32
let hive_columns = hive_columns
33
.iter()
34
.map(|s| ScalarColumn::new(s.name().clone(), s.first(), num_rows).into())
35
.collect::<Vec<Column>>();
36
37
if reader_schema.index_of(hive_columns[0].name()).is_none() || df.width() == 0 {
38
// Fast-path - all hive columns are at the end
39
if df.width() == 0 {
40
unsafe { df.set_height(num_rows) };
41
}
42
unsafe { df.hstack_mut_unchecked(&hive_columns) };
43
return;
44
}
45
46
let mut merged = Vec::with_capacity(df.width() + hive_columns.len());
47
48
// `hive_partitions_from_paths()` guarantees `hive_columns` is sorted by their appearance in `reader_schema`.
49
merge_sorted_to_schema_order(
50
&mut unsafe { df.columns_mut() }.drain(..),
51
&mut hive_columns.into_iter(),
52
reader_schema,
53
&mut merged,
54
);
55
56
*df = unsafe { DataFrame::new_unchecked(num_rows, merged) };
57
}
58
}
59
60
/// Merge 2 lists of columns into one, where each list contains columns ordered such that their indices
61
/// in the `schema` are in ascending order.
62
///
63
/// Layouts:
64
/// * `cols_lhs`: `[row_index?, ..schema_columns?, ..other_left?]`
65
/// * If the first item in `cols_lhs` is not found in the schema, it will be assumed to be a
66
/// `row_index` column and placed first into the result.
67
/// * `cols_rhs`: `[..schema_columns? ..other_right?]`
68
///
69
/// Output:
70
/// * `[..schema_columns?, ..other_left?, ..other_right?]`
71
///
72
/// Note: The `row_index` column should be handled before calling this function.
73
///
74
/// # Panics
75
/// Panics if either `cols_lhs` or `cols_rhs` is empty.
76
pub fn merge_sorted_to_schema_order<'a, F, M>(
77
cols_lhs: &'a mut dyn Iterator<Item = Column>,
78
cols_rhs: &'a mut dyn Iterator<Item = Column>,
79
schema: &polars_schema::Schema<F, M>,
80
output: &'a mut Vec<Column>,
81
) {
82
merge_sorted_to_schema_order_impl(cols_lhs, cols_rhs, output, &|v| schema.index_of(v.name()))
83
}
84
85
pub fn merge_sorted_to_schema_order_impl<'a, T, O>(
86
cols_lhs: &'a mut dyn Iterator<Item = T>,
87
cols_rhs: &'a mut dyn Iterator<Item = T>,
88
output: &mut O,
89
get_opt_index: &dyn for<'b> Fn(&'b T) -> Option<usize>,
90
) where
91
O: Extend<T>,
92
{
93
let mut series_arr = [cols_lhs.peekable(), cols_rhs.peekable()];
94
95
(|| {
96
let (Some(a), Some(b)) = (
97
series_arr[0]
98
.peek()
99
.and_then(|x| get_opt_index(x).or(Some(0))),
100
series_arr[1].peek().and_then(get_opt_index),
101
) else {
102
return;
103
};
104
105
let mut schema_idx_arr = [a, b];
106
107
loop {
108
// Take from the side whose next column appears earlier in the `schema`.
109
let arg_min = if schema_idx_arr[1] < schema_idx_arr[0] {
110
1
111
} else {
112
0
113
};
114
115
output.extend([series_arr[arg_min].next().unwrap()]);
116
117
let Some(v) = series_arr[arg_min].peek() else {
118
return;
119
};
120
121
let Some(i) = get_opt_index(v) else {
122
// All columns in `cols_lhs` should be present in `schema` except for a row_index column.
123
// We assume that if a row_index column exists it is always the first column and handle that at
124
// initialization.
125
debug_assert_eq!(arg_min, 1);
126
break;
127
};
128
129
schema_idx_arr[arg_min] = i;
130
}
131
})();
132
133
let [a, b] = series_arr;
134
output.extend(a);
135
output.extend(b);
136
}
137
138
/// # Panics
139
/// The `Display` impl of this will panic if a column has non-unit length.
140
pub struct HivePathFormatter<'a> {
141
keys: &'a [Column],
142
}
143
144
impl<'a> HivePathFormatter<'a> {
145
pub fn new(keys: &'a [Column]) -> Self {
146
Self { keys }
147
}
148
}
149
150
impl std::fmt::Display for HivePathFormatter<'_> {
151
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152
for column in self.keys {
153
assert_eq!(column.len(), 1);
154
let column = column.cast(&DataType::String).unwrap();
155
156
let key = column.name();
157
let value = percent_encoding::percent_encode(
158
column
159
.str()
160
.unwrap()
161
.get(0)
162
.unwrap_or("__HIVE_DEFAULT_PARTITION__")
163
.as_bytes(),
164
HIVE_VALUE_ENCODE_CHARSET,
165
);
166
167
write!(f, "{key}={value}/")?
168
}
169
170
Ok(())
171
}
172
}
173
174