Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/hive.rs
6939 views
1
use polars_core::frame::DataFrame;
2
use polars_core::frame::column::ScalarColumn;
3
use polars_core::prelude::Column;
4
use polars_core::series::Series;
5
6
/// Materializes hive partitions.
7
/// We have a special num_rows arg, as df can be empty when a projection contains
8
/// only hive partition columns.
9
///
10
/// The `hive_partition_columns` must be ordered by their position in the `reader_schema`. The
11
/// columns will be materialized by their positions in the file schema if they exist, or otherwise
12
/// at the end.
13
///
14
/// # Safety
15
///
16
/// num_rows equals the height of the df when the df height is non-zero.
17
pub(crate) fn materialize_hive_partitions<D>(
18
df: &mut DataFrame,
19
reader_schema: &polars_schema::Schema<D>,
20
hive_partition_columns: Option<&[Series]>,
21
) {
22
let num_rows = df.height();
23
24
if let Some(hive_columns) = hive_partition_columns {
25
// Insert these hive columns in the order they are stored in the file.
26
if hive_columns.is_empty() {
27
return;
28
}
29
30
let hive_columns = hive_columns
31
.iter()
32
.map(|s| ScalarColumn::new(s.name().clone(), s.first(), num_rows).into())
33
.collect::<Vec<Column>>();
34
35
if reader_schema.index_of(hive_columns[0].name()).is_none() || df.width() == 0 {
36
// Fast-path - all hive columns are at the end
37
if df.width() == 0 {
38
unsafe { df.set_height(num_rows) };
39
}
40
unsafe { df.hstack_mut_unchecked(&hive_columns) };
41
return;
42
}
43
44
let mut merged = Vec::with_capacity(df.width() + hive_columns.len());
45
46
// `hive_partitions_from_paths()` guarantees `hive_columns` is sorted by their appearance in `reader_schema`.
47
merge_sorted_to_schema_order(
48
&mut unsafe { df.get_columns_mut().drain(..) },
49
&mut hive_columns.into_iter(),
50
reader_schema,
51
&mut merged,
52
);
53
54
*df = unsafe { DataFrame::new_no_checks(num_rows, merged) };
55
}
56
}
57
58
/// Merge 2 lists of columns into one, where each list contains columns ordered such that their indices
59
/// in the `schema` are in ascending order.
60
///
61
/// Layouts:
62
/// * `cols_lhs`: `[row_index?, ..schema_columns?, ..other_left?]`
63
/// * If the first item in `cols_lhs` is not found in the schema, it will be assumed to be a
64
/// `row_index` column and placed first into the result.
65
/// * `cols_rhs`: `[..schema_columns? ..other_right?]`
66
///
67
/// Output:
68
/// * `[..schema_columns?, ..other_left?, ..other_right?]`
69
///
70
/// Note: The `row_index` column should be handled before calling this function.
71
///
72
/// # Panics
73
/// Panics if either `cols_lhs` or `cols_rhs` is empty.
74
pub fn merge_sorted_to_schema_order<'a, D>(
75
cols_lhs: &'a mut dyn Iterator<Item = Column>,
76
cols_rhs: &'a mut dyn Iterator<Item = Column>,
77
schema: &polars_schema::Schema<D>,
78
output: &'a mut Vec<Column>,
79
) {
80
merge_sorted_to_schema_order_impl(cols_lhs, cols_rhs, output, &|v| schema.index_of(v.name()))
81
}
82
83
pub fn merge_sorted_to_schema_order_impl<'a, T, O>(
84
cols_lhs: &'a mut dyn Iterator<Item = T>,
85
cols_rhs: &'a mut dyn Iterator<Item = T>,
86
output: &mut O,
87
get_opt_index: &dyn for<'b> Fn(&'b T) -> Option<usize>,
88
) where
89
O: Extend<T>,
90
{
91
let mut series_arr = [cols_lhs.peekable(), cols_rhs.peekable()];
92
93
(|| {
94
let (Some(a), Some(b)) = (
95
series_arr[0]
96
.peek()
97
.and_then(|x| get_opt_index(x).or(Some(0))),
98
series_arr[1].peek().and_then(get_opt_index),
99
) else {
100
return;
101
};
102
103
let mut schema_idx_arr = [a, b];
104
105
loop {
106
// Take from the side whose next column appears earlier in the `schema`.
107
let arg_min = if schema_idx_arr[1] < schema_idx_arr[0] {
108
1
109
} else {
110
0
111
};
112
113
output.extend([series_arr[arg_min].next().unwrap()]);
114
115
let Some(v) = series_arr[arg_min].peek() else {
116
return;
117
};
118
119
let Some(i) = get_opt_index(v) else {
120
// All columns in `cols_lhs` should be present in `schema` except for a row_index column.
121
// We assume that if a row_index column exists it is always the first column and handle that at
122
// initialization.
123
debug_assert_eq!(arg_min, 1);
124
break;
125
};
126
127
schema_idx_arr[arg_min] = i;
128
}
129
})();
130
131
let [a, b] = series_arr;
132
output.extend(a);
133
output.extend(b);
134
}
135
136