Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sources/parquet/projection.rs
6939 views
1
use std::borrow::Cow;
2
use std::sync::Arc;
3
4
use arrow::datatypes::ArrowSchema;
5
use polars_core::prelude::{ArrowField, Column, DataType};
6
use polars_core::schema::Schema;
7
use polars_error::PolarsResult;
8
use polars_plan::dsl::CastColumnsPolicy;
9
use polars_utils::pl_str::PlSmallStr;
10
11
use crate::nodes::io_sources::multi_scan::components::column_selector::ColumnSelector;
12
use crate::nodes::io_sources::multi_scan::components::projection::MappedProjectionRef;
13
use crate::nodes::io_sources::multi_scan::components::projection::builder::ProjectionBuilder;
14
use crate::nodes::io_sources::multi_scan::reader_interface::Projection;
15
16
pub fn resolve_arrow_field_projections(
17
file_arrow_schema: &ArrowSchema,
18
file_schema: &Schema,
19
projection: Projection,
20
cast_columns_policy: CastColumnsPolicy,
21
) -> PolarsResult<Arc<[ArrowFieldProjection]>> {
22
let projection: Projection = match projection {
23
Projection::Plain(projected_schema) => ProjectionBuilder::new(projected_schema, None, None)
24
.build_projection(Some(file_schema), None, cast_columns_policy, usize::MAX)?,
25
Projection::Mapped { .. } => projection,
26
};
27
28
Ok(projection
29
.iter_non_missing_columns()
30
.map(
31
|MappedProjectionRef {
32
source_name,
33
output_name,
34
output_dtype,
35
resolved_transform,
36
}| {
37
let arrow_field = file_arrow_schema.get(source_name.as_str()).unwrap().clone();
38
39
let Some(resolved_transform) = resolved_transform else {
40
assert_eq!(source_name, output_name);
41
42
return ArrowFieldProjection::Plain(arrow_field);
43
};
44
45
assert_eq!(
46
resolved_transform.source_dtype,
47
file_schema.get(source_name.as_str()).unwrap()
48
);
49
50
ArrowFieldProjection::Mapped {
51
arrow_field,
52
output_name: output_name.clone(),
53
output_dtype: output_dtype.clone(),
54
transform: resolved_transform.attach_transforms(ColumnSelector::Position(0)),
55
}
56
},
57
)
58
.collect::<Arc<[ArrowFieldProjection]>>())
59
}
60
61
/// Represents a potentially mapped (i.e. casted and/or renamed) arrow field projection.
62
#[derive(Debug)]
63
pub enum ArrowFieldProjection {
64
Plain(ArrowField),
65
Mapped {
66
arrow_field: ArrowField,
67
output_name: PlSmallStr,
68
output_dtype: DataType,
69
transform: ColumnSelector,
70
},
71
}
72
73
impl ArrowFieldProjection {
74
pub fn arrow_field(&self) -> &ArrowField {
75
match self {
76
Self::Plain(field) => field,
77
Self::Mapped { arrow_field, .. } => arrow_field,
78
}
79
}
80
81
pub fn output_name(&self) -> &PlSmallStr {
82
match self {
83
Self::Plain(field) => &field.name,
84
Self::Mapped { output_name, .. } => output_name,
85
}
86
}
87
88
#[expect(unused)]
89
pub fn output_dtype(&self) -> Cow<'_, DataType> {
90
match self {
91
Self::Plain(field) => Cow::Owned(DataType::from_arrow_field(field)),
92
Self::Mapped { output_dtype, .. } => Cow::Borrowed(output_dtype),
93
}
94
}
95
96
pub fn apply_transform(&self, column: Column) -> PolarsResult<Column> {
97
match self {
98
Self::Plain(_) => Ok(column),
99
Self::Mapped {
100
transform,
101
output_dtype,
102
..
103
} => {
104
let output_height = column.len();
105
let out = transform.select_from_columns(&[column], output_height)?;
106
107
debug_assert_eq!(out.dtype(), output_dtype);
108
109
Ok(out)
110
},
111
}
112
}
113
}
114
115