Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sources/parquet/statistics.rs
6939 views
1
use std::ops::Range;
2
3
use arrow::array::{MutablePrimitiveArray, PrimitiveArray};
4
use arrow::bitmap::Bitmap;
5
use arrow::pushable::Pushable;
6
use polars_core::prelude::*;
7
use polars_io::RowIndex;
8
use polars_io::predicates::ScanIOPredicate;
9
use polars_io::prelude::FileMetadata;
10
use polars_parquet::read::RowGroupMetadata;
11
use polars_parquet::read::statistics::{ArrowColumnStatisticsArrays, deserialize_all};
12
use polars_utils::format_pl_smallstr;
13
14
use crate::async_executor::{self, TaskPriority};
15
use crate::nodes::io_sources::parquet::projection::ArrowFieldProjection;
16
17
struct StatisticsColumns {
18
min: Column,
19
max: Column,
20
null_count: Column,
21
}
22
23
impl StatisticsColumns {
24
fn new_null(dtype: &DataType, height: usize) -> Self {
25
Self {
26
min: Column::full_null(PlSmallStr::EMPTY, height, dtype),
27
max: Column::full_null(PlSmallStr::EMPTY, height, dtype),
28
null_count: Column::full_null(PlSmallStr::EMPTY, height, &IDX_DTYPE),
29
}
30
}
31
32
fn from_arrow_statistics(
33
statistics: ArrowColumnStatisticsArrays,
34
field: &ArrowField,
35
) -> PolarsResult<Self> {
36
Ok(Self {
37
min: unsafe {
38
Series::_try_from_arrow_unchecked_with_md(
39
PlSmallStr::EMPTY,
40
vec![statistics.min_value],
41
field.dtype(),
42
field.metadata.as_deref(),
43
)
44
}?
45
.into_column(),
46
47
max: unsafe {
48
Series::_try_from_arrow_unchecked_with_md(
49
PlSmallStr::EMPTY,
50
vec![statistics.max_value],
51
field.dtype(),
52
field.metadata.as_deref(),
53
)
54
}?
55
.into_column(),
56
57
null_count: Series::from_arrow(PlSmallStr::EMPTY, statistics.null_count.boxed())?
58
.into_column(),
59
})
60
}
61
62
fn with_base_column_name(self, base_column_name: &str) -> Self {
63
let b = base_column_name;
64
65
let min = self.min.with_name(format_pl_smallstr!("{b}_min"));
66
let max = self.max.with_name(format_pl_smallstr!("{b}_max"));
67
let null_count = self.null_count.with_name(format_pl_smallstr!("{b}_nc"));
68
69
Self {
70
min,
71
max,
72
null_count,
73
}
74
}
75
}
76
77
pub(super) async fn calculate_row_group_pred_pushdown_skip_mask(
78
row_group_slice: Range<usize>,
79
use_statistics: bool,
80
predicate: Option<&ScanIOPredicate>,
81
metadata: &Arc<FileMetadata>,
82
projected_arrow_fields: Arc<[ArrowFieldProjection]>,
83
// This is mut so that the offset is updated to the position of the first
84
// row group.
85
mut row_index: Option<RowIndex>,
86
verbose: bool,
87
) -> PolarsResult<Option<Bitmap>> {
88
if !use_statistics {
89
return Ok(None);
90
}
91
92
let Some(predicate) = predicate else {
93
return Ok(None);
94
};
95
96
let Some(sbp) = predicate.skip_batch_predicate.as_ref() else {
97
return Ok(None);
98
};
99
100
let sbp = sbp.clone();
101
102
let num_row_groups = row_group_slice.len();
103
let metadata = metadata.clone();
104
let live_columns = predicate.live_columns.clone();
105
106
// Note: We are spawning here onto the computational async runtime because the caller is being run
107
// on a tokio async thread.
108
let skip_row_group_mask = async_executor::spawn(TaskPriority::High, async move {
109
let row_groups_slice = &metadata.row_groups[row_group_slice.clone()];
110
111
if let Some(ri) = &mut row_index {
112
for md in metadata.row_groups[0..row_group_slice.start].iter() {
113
ri.offset = ri
114
.offset
115
.saturating_add(IdxSize::try_from(md.num_rows()).unwrap_or(IdxSize::MAX));
116
}
117
}
118
119
let mut columns = Vec::with_capacity(1 + live_columns.len() * 3);
120
121
let lengths: Vec<IdxSize> = row_groups_slice
122
.iter()
123
.map(|rg| rg.num_rows() as IdxSize)
124
.collect();
125
126
columns.push(Column::new("len".into(), lengths));
127
128
for projection in projected_arrow_fields.iter() {
129
let c = projection.output_name();
130
131
if !live_columns.contains(c) {
132
continue;
133
}
134
135
let mut statistics = load_parquet_column_statistics(row_groups_slice, projection)?;
136
137
// Note: Order is important here. We re-use the transform for the output column, meaning
138
// that it may set the column name.
139
statistics.min = projection.apply_transform(statistics.min)?;
140
statistics.max = projection.apply_transform(statistics.max)?;
141
142
let statistics = statistics.with_base_column_name(c);
143
144
columns.extend([statistics.min, statistics.max, statistics.null_count]);
145
}
146
147
if let Some(row_index) = row_index {
148
let statistics = build_row_index_statistics(&row_index, row_groups_slice)
149
.with_base_column_name(&row_index.name);
150
151
columns.extend([statistics.min, statistics.max, statistics.null_count]);
152
}
153
154
let statistics_df = DataFrame::new_with_height(num_row_groups, columns)?;
155
156
sbp.evaluate_with_stat_df(&statistics_df)
157
})
158
.await?;
159
160
if verbose {
161
eprintln!(
162
"[ParquetFileReader]: Predicate pushdown: \
163
reading {} / {} row groups",
164
skip_row_group_mask.unset_bits(),
165
num_row_groups,
166
);
167
}
168
169
Ok(Some(skip_row_group_mask))
170
}
171
172
fn load_parquet_column_statistics(
173
row_groups: &[RowGroupMetadata],
174
projection: &ArrowFieldProjection,
175
) -> PolarsResult<StatisticsColumns> {
176
let arrow_field = projection.arrow_field();
177
178
let null_statistics = || {
179
Ok(StatisticsColumns::new_null(
180
&DataType::from_arrow_field(arrow_field),
181
row_groups.len(),
182
))
183
};
184
185
// This can be None in the allow_missing_columns case.
186
let Some(idxs) = row_groups[0].columns_idxs_under_root_iter(&arrow_field.name) else {
187
return null_statistics();
188
};
189
190
// 0 is possible for possible for empty structs.
191
//
192
// 2+ is for structs. We don't support reading nested statistics for now. It does not
193
// really make any sense at the moment with how we structure statistics.
194
if idxs.is_empty() || idxs.len() > 1 {
195
return null_statistics();
196
}
197
198
let idx = idxs[0];
199
200
let Some(statistics) = deserialize_all(arrow_field, row_groups, idx)? else {
201
return null_statistics();
202
};
203
204
StatisticsColumns::from_arrow_statistics(statistics, arrow_field)
205
}
206
207
fn build_row_index_statistics(
208
row_index: &RowIndex,
209
row_groups: &[RowGroupMetadata],
210
) -> StatisticsColumns {
211
let mut offset = row_index.offset;
212
213
let null_count = PrimitiveArray::<IdxSize>::full(row_groups.len(), 0, ArrowDataType::IDX_DTYPE);
214
215
let mut min_value = MutablePrimitiveArray::<IdxSize>::with_capacity(row_groups.len());
216
let mut max_value = MutablePrimitiveArray::<IdxSize>::with_capacity(row_groups.len());
217
218
for rg in row_groups.iter() {
219
let n_rows = IdxSize::try_from(rg.num_rows()).unwrap_or(IdxSize::MAX);
220
221
if offset.checked_add(n_rows).is_none() {
222
min_value.push_null();
223
max_value.push_null();
224
continue;
225
}
226
227
if n_rows == 0 {
228
min_value.push_null();
229
max_value.push_null();
230
} else {
231
min_value.push_value(offset);
232
max_value.push_value(offset + n_rows - 1);
233
}
234
235
offset = offset.saturating_add(n_rows);
236
}
237
238
StatisticsColumns {
239
min: Series::from_array(PlSmallStr::EMPTY, min_value.freeze()).into_column(),
240
max: Series::from_array(PlSmallStr::EMPTY, max_value.freeze()).into_column(),
241
null_count: Series::from_array(PlSmallStr::EMPTY, null_count).into_column(),
242
}
243
}
244
245