use polars_core::frame::DataFrame;
use polars_core::frame::column::ScalarColumn;
use polars_core::prelude::{Column, DataType};
use polars_core::series::Series;
use crate::utils::HIVE_VALUE_ENCODE_CHARSET;
pub(crate) fn materialize_hive_partitions<F, M>(
df: &mut DataFrame,
reader_schema: &polars_schema::Schema<F, M>,
hive_partition_columns: Option<&[Series]>,
) {
let num_rows = df.height();
if let Some(hive_columns) = hive_partition_columns {
if hive_columns.is_empty() {
return;
}
let hive_columns = hive_columns
.iter()
.map(|s| ScalarColumn::new(s.name().clone(), s.first(), num_rows).into())
.collect::<Vec<Column>>();
if reader_schema.index_of(hive_columns[0].name()).is_none() || df.width() == 0 {
if df.width() == 0 {
unsafe { df.set_height(num_rows) };
}
unsafe { df.hstack_mut_unchecked(&hive_columns) };
return;
}
let mut merged = Vec::with_capacity(df.width() + hive_columns.len());
merge_sorted_to_schema_order(
&mut unsafe { df.columns_mut() }.drain(..),
&mut hive_columns.into_iter(),
reader_schema,
&mut merged,
);
*df = unsafe { DataFrame::new_unchecked(num_rows, merged) };
}
}
pub fn merge_sorted_to_schema_order<'a, F, M>(
cols_lhs: &'a mut dyn Iterator<Item = Column>,
cols_rhs: &'a mut dyn Iterator<Item = Column>,
schema: &polars_schema::Schema<F, M>,
output: &'a mut Vec<Column>,
) {
merge_sorted_to_schema_order_impl(cols_lhs, cols_rhs, output, &|v| schema.index_of(v.name()))
}
pub fn merge_sorted_to_schema_order_impl<'a, T, O>(
cols_lhs: &'a mut dyn Iterator<Item = T>,
cols_rhs: &'a mut dyn Iterator<Item = T>,
output: &mut O,
get_opt_index: &dyn for<'b> Fn(&'b T) -> Option<usize>,
) where
O: Extend<T>,
{
let mut series_arr = [cols_lhs.peekable(), cols_rhs.peekable()];
(|| {
let (Some(a), Some(b)) = (
series_arr[0]
.peek()
.and_then(|x| get_opt_index(x).or(Some(0))),
series_arr[1].peek().and_then(get_opt_index),
) else {
return;
};
let mut schema_idx_arr = [a, b];
loop {
let arg_min = if schema_idx_arr[1] < schema_idx_arr[0] {
1
} else {
0
};
output.extend([series_arr[arg_min].next().unwrap()]);
let Some(v) = series_arr[arg_min].peek() else {
return;
};
let Some(i) = get_opt_index(v) else {
debug_assert_eq!(arg_min, 1);
break;
};
schema_idx_arr[arg_min] = i;
}
})();
let [a, b] = series_arr;
output.extend(a);
output.extend(b);
}
pub struct HivePathFormatter<'a> {
keys: &'a [Column],
}
impl<'a> HivePathFormatter<'a> {
pub fn new(keys: &'a [Column]) -> Self {
Self { keys }
}
}
impl std::fmt::Display for HivePathFormatter<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for column in self.keys {
assert_eq!(column.len(), 1);
let column = column.cast(&DataType::String).unwrap();
let key = column.name();
let value = percent_encoding::percent_encode(
column
.str()
.unwrap()
.get(0)
.unwrap_or("__HIVE_DEFAULT_PARTITION__")
.as_bytes(),
HIVE_VALUE_ENCODE_CHARSET,
);
write!(f, "{key}={value}/")?
}
Ok(())
}
}