Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/parquet/read/utils.rs
8475 views
1
use std::borrow::Cow;
2
3
use polars_core::prelude::{ArrowSchema, Column, DataFrame, DataType, IDX_DTYPE, Series};
4
use polars_core::schema::{SchemaExt, SchemaNamesAndDtypes};
5
use polars_error::{PolarsResult, polars_bail};
6
use polars_schema::Schema;
7
8
use crate::RowIndex;
9
use crate::hive::materialize_hive_partitions;
10
use crate::utils::apply_projection;
11
12
pub fn materialize_empty_df(
13
projection: Option<&[usize]>,
14
reader_schema: &ArrowSchema,
15
hive_partition_columns: Option<&[Series]>,
16
row_index: Option<&RowIndex>,
17
) -> DataFrame {
18
let schema = if let Some(projection) = projection {
19
Cow::Owned(apply_projection(reader_schema, projection))
20
} else {
21
Cow::Borrowed(reader_schema)
22
};
23
let mut df = DataFrame::empty_with_schema(&Schema::from_arrow_schema(&schema));
24
25
if let Some(row_index) = row_index {
26
df.insert_column(0, Column::new_empty(row_index.name.clone(), &IDX_DTYPE))
27
.unwrap();
28
}
29
30
materialize_hive_partitions(&mut df, reader_schema, hive_partition_columns);
31
32
df
33
}
34
35
pub(super) fn projected_arrow_schema_to_projection_indices(
36
schema: &ArrowSchema,
37
projected_arrow_schema: &ArrowSchema,
38
) -> PolarsResult<Option<Vec<usize>>> {
39
let mut projection_indices = Vec::with_capacity(projected_arrow_schema.len());
40
let mut is_full_ordered_projection = projected_arrow_schema.len() == schema.len();
41
42
for (i, field) in projected_arrow_schema.iter_values().enumerate() {
43
let dtype = {
44
let Some((idx, _, field)) = schema.get_full(&field.name) else {
45
polars_bail!(ColumnNotFound: "did not find column in file: {}", field.name)
46
};
47
48
projection_indices.push(idx);
49
is_full_ordered_projection &= idx == i;
50
51
DataType::from_arrow_field(field)
52
};
53
let expected_dtype = DataType::from_arrow_field(field);
54
55
if dtype.clone() != expected_dtype {
56
polars_bail!(
57
mismatch,
58
col = &field.name,
59
expected = expected_dtype,
60
found = dtype
61
);
62
}
63
}
64
65
Ok((!is_full_ordered_projection).then_some(projection_indices))
66
}
67
68
/// Utility to ensure the dtype of the column in `current_schema` matches the dtype in `schema` if
69
/// that column exists in `schema`.
70
pub fn ensure_matching_dtypes_if_found(
71
schema: &ArrowSchema,
72
current_schema: &ArrowSchema,
73
) -> PolarsResult<()> {
74
current_schema
75
.iter_names_and_dtypes()
76
.try_for_each(|(name, dtype)| {
77
if let Some(field) = schema.get(name) {
78
if dtype != &field.dtype {
79
// Check again with timezone normalization
80
// TODO: Add an ArrowDtype eq wrapper?
81
let lhs = DataType::from_arrow_dtype(dtype);
82
let rhs = DataType::from_arrow_field(field);
83
84
if lhs != rhs {
85
polars_bail!(
86
SchemaMismatch:
87
"dtypes differ for column {}: {:?} != {:?}"
88
, name, dtype, &field.dtype
89
);
90
}
91
}
92
}
93
Ok(())
94
})
95
}
96
97