Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/parquet/read/read_impl.rs
6940 views
1
use std::borrow::Cow;
2
3
use arrow::bitmap::Bitmap;
4
use arrow::datatypes::ArrowSchemaRef;
5
use polars_core::chunked_array::builder::NullChunkedBuilder;
6
use polars_core::prelude::*;
7
use polars_core::series::IsSorted;
8
use polars_core::utils::accumulate_dataframes_vertical;
9
use polars_core::{POOL, config};
10
use polars_parquet::read::{self, ColumnChunkMetadata, FileMetadata, Filter, RowGroupMetadata};
11
use rayon::prelude::*;
12
13
use super::mmap::mmap_columns;
14
use super::utils::materialize_empty_df;
15
use super::{ParallelStrategy, mmap};
16
use crate::RowIndex;
17
use crate::hive::materialize_hive_partitions;
18
use crate::mmap::{MmapBytesReader, ReaderBytes};
19
use crate::parquet::metadata::FileMetadataRef;
20
use crate::parquet::read::ROW_COUNT_OVERFLOW_ERR;
21
use crate::utils::slice::split_slice_at_file;
22
23
#[cfg(debug_assertions)]
24
// Ensure we get the proper polars types from schema inference
25
// This saves unneeded casts.
26
fn assert_dtypes(dtype: &ArrowDataType) {
27
use ArrowDataType as D;
28
29
match dtype {
30
// These should all be cast to the BinaryView / Utf8View variants
31
D::Utf8 | D::Binary | D::LargeUtf8 | D::LargeBinary => unreachable!(),
32
33
// These should be cast to Float32
34
D::Float16 => unreachable!(),
35
36
// This should have been converted to a LargeList
37
D::List(_) => unreachable!(),
38
39
// This should have been converted to a LargeList(Struct(_))
40
D::Map(_, _) => unreachable!(),
41
42
// Recursive checks
43
D::Dictionary(_, dtype, _) => assert_dtypes(dtype),
44
D::Extension(ext) => assert_dtypes(&ext.inner),
45
D::LargeList(inner) => assert_dtypes(&inner.dtype),
46
D::FixedSizeList(inner, _) => assert_dtypes(&inner.dtype),
47
D::Struct(fields) => fields.iter().for_each(|f| assert_dtypes(f.dtype())),
48
49
_ => {},
50
}
51
}
52
53
fn should_copy_sortedness(dtype: &DataType) -> bool {
54
// @NOTE: For now, we are a bit conservative with this.
55
use DataType as D;
56
57
matches!(
58
dtype,
59
D::Int8 | D::Int16 | D::Int32 | D::Int64 | D::UInt8 | D::UInt16 | D::UInt32 | D::UInt64
60
)
61
}
62
63
pub fn try_set_sorted_flag(series: &mut Series, col_idx: usize, sorting_map: &[(usize, IsSorted)]) {
64
let Some((sorted_col, is_sorted)) = sorting_map.first() else {
65
return;
66
};
67
if *sorted_col != col_idx || !should_copy_sortedness(series.dtype()) {
68
return;
69
}
70
if config::verbose() {
71
eprintln!(
72
"Parquet conserved SortingColumn for column chunk of '{}' to {is_sorted:?}",
73
series.name()
74
);
75
}
76
77
series.set_sorted_flag(*is_sorted);
78
}
79
80
pub fn create_sorting_map(md: &RowGroupMetadata) -> Vec<(usize, IsSorted)> {
81
let capacity = md.sorting_columns().map_or(0, |s| s.len());
82
let mut sorting_map = Vec::with_capacity(capacity);
83
84
if let Some(sorting_columns) = md.sorting_columns() {
85
for sorting in sorting_columns {
86
sorting_map.push((
87
sorting.column_idx as usize,
88
if sorting.descending {
89
IsSorted::Descending
90
} else {
91
IsSorted::Ascending
92
},
93
))
94
}
95
}
96
97
sorting_map
98
}
99
100
fn column_idx_to_series(
101
column_i: usize,
102
// The metadata belonging to this column
103
field_md: &[&ColumnChunkMetadata],
104
filter: Option<Filter>,
105
file_schema: &ArrowSchema,
106
store: &mmap::ColumnStore,
107
) -> PolarsResult<(Series, Bitmap)> {
108
let field = file_schema.get_at_index(column_i).unwrap().1;
109
110
#[cfg(debug_assertions)]
111
{
112
assert_dtypes(field.dtype())
113
}
114
let columns = mmap_columns(store, field_md);
115
let (array, pred_true_mask) = mmap::to_deserializer(columns, field.clone(), filter)?;
116
let series = Series::try_from((field, array))?;
117
118
Ok((series, pred_true_mask))
119
}
120
121
#[allow(clippy::too_many_arguments)]
122
fn rg_to_dfs(
123
store: &mmap::ColumnStore,
124
previous_row_count: &mut IdxSize,
125
row_group_start: usize,
126
row_group_end: usize,
127
pre_slice: (usize, usize),
128
file_metadata: &FileMetadata,
129
schema: &ArrowSchemaRef,
130
row_index: Option<RowIndex>,
131
parallel: ParallelStrategy,
132
projection: &[usize],
133
hive_partition_columns: Option<&[Series]>,
134
) -> PolarsResult<Vec<DataFrame>> {
135
if config::verbose() {
136
eprintln!("parquet scan with parallel = {parallel:?}");
137
}
138
139
// If we are only interested in the row_index, we take a little special path here.
140
if projection.is_empty() {
141
if let Some(row_index) = row_index {
142
let placeholder =
143
NullChunkedBuilder::new(PlSmallStr::from_static("__PL_TMP"), pre_slice.1).finish();
144
return Ok(vec![
145
DataFrame::new(vec![placeholder.into_series().into_column()])?
146
.with_row_index(
147
row_index.name.clone(),
148
Some(row_index.offset + IdxSize::try_from(pre_slice.0).unwrap()),
149
)?
150
.select(std::iter::once(row_index.name))?,
151
]);
152
}
153
}
154
155
use ParallelStrategy as S;
156
157
match parallel {
158
S::Columns | S::None => rg_to_dfs_optionally_par_over_columns(
159
store,
160
previous_row_count,
161
row_group_start,
162
row_group_end,
163
pre_slice,
164
file_metadata,
165
schema,
166
row_index,
167
parallel,
168
projection,
169
hive_partition_columns,
170
),
171
_ => rg_to_dfs_par_over_rg(
172
store,
173
row_group_start,
174
row_group_end,
175
previous_row_count,
176
pre_slice,
177
file_metadata,
178
schema,
179
row_index,
180
projection,
181
hive_partition_columns,
182
),
183
}
184
}
185
186
#[allow(clippy::too_many_arguments)]
187
// might parallelize over columns
188
fn rg_to_dfs_optionally_par_over_columns(
189
store: &mmap::ColumnStore,
190
previous_row_count: &mut IdxSize,
191
row_group_start: usize,
192
row_group_end: usize,
193
slice: (usize, usize),
194
file_metadata: &FileMetadata,
195
schema: &ArrowSchemaRef,
196
row_index: Option<RowIndex>,
197
parallel: ParallelStrategy,
198
projection: &[usize],
199
hive_partition_columns: Option<&[Series]>,
200
) -> PolarsResult<Vec<DataFrame>> {
201
let mut dfs = Vec::with_capacity(row_group_end - row_group_start);
202
203
let mut n_rows_processed: usize = (0..row_group_start)
204
.map(|i| file_metadata.row_groups[i].num_rows())
205
.sum();
206
let slice_end = slice.0 + slice.1;
207
208
for rg_idx in row_group_start..row_group_end {
209
let md = &file_metadata.row_groups[rg_idx];
210
211
let rg_slice =
212
split_slice_at_file(&mut n_rows_processed, md.num_rows(), slice.0, slice_end);
213
let current_row_count = md.num_rows() as IdxSize;
214
215
let sorting_map = create_sorting_map(md);
216
217
let f = |column_i: &usize| {
218
let (name, field) = schema.get_at_index(*column_i).unwrap();
219
220
let Some(iter) = md.columns_under_root_iter(name) else {
221
return Ok(Column::full_null(
222
name.clone(),
223
rg_slice.1,
224
&DataType::from_arrow_field(field),
225
));
226
};
227
228
let part = iter.collect::<Vec<_>>();
229
230
let (mut series, _) = column_idx_to_series(
231
*column_i,
232
part.as_slice(),
233
Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)),
234
schema,
235
store,
236
)?;
237
238
try_set_sorted_flag(&mut series, *column_i, &sorting_map);
239
Ok(series.into_column())
240
};
241
242
let columns = if let ParallelStrategy::Columns = parallel {
243
POOL.install(|| {
244
projection
245
.par_iter()
246
.map(f)
247
.collect::<PolarsResult<Vec<_>>>()
248
})?
249
} else {
250
projection.iter().map(f).collect::<PolarsResult<Vec<_>>>()?
251
};
252
253
let mut df = unsafe { DataFrame::new_no_checks(rg_slice.1, columns) };
254
if let Some(rc) = &row_index {
255
unsafe {
256
df.with_row_index_mut(
257
rc.name.clone(),
258
Some(*previous_row_count + rc.offset + rg_slice.0 as IdxSize),
259
)
260
};
261
}
262
263
materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
264
265
*previous_row_count = previous_row_count.checked_add(current_row_count).ok_or_else(||
266
polars_err!(
267
ComputeError: "Parquet file produces more than pow(2, 32) rows; \
268
consider compiling with polars-bigidx feature (polars-u64-idx package on python), \
269
or set 'streaming'"
270
),
271
)?;
272
dfs.push(df);
273
274
if *previous_row_count as usize >= slice_end {
275
break;
276
}
277
}
278
279
Ok(dfs)
280
}
281
282
#[allow(clippy::too_many_arguments)]
283
// parallelizes over row groups
284
fn rg_to_dfs_par_over_rg(
285
store: &mmap::ColumnStore,
286
row_group_start: usize,
287
row_group_end: usize,
288
rows_read: &mut IdxSize,
289
slice: (usize, usize),
290
file_metadata: &FileMetadata,
291
schema: &ArrowSchemaRef,
292
row_index: Option<RowIndex>,
293
projection: &[usize],
294
hive_partition_columns: Option<&[Series]>,
295
) -> PolarsResult<Vec<DataFrame>> {
296
// compute the limits per row group and the row count offsets
297
let mut row_groups = Vec::with_capacity(row_group_end - row_group_start);
298
299
let mut n_rows_processed: usize = (0..row_group_start)
300
.map(|i| file_metadata.row_groups[i].num_rows())
301
.sum();
302
let slice_end = slice.0 + slice.1;
303
304
// rows_scanned is the number of rows that have been scanned so far when checking for overlap with the slice.
305
// rows_read is the number of rows found to overlap with the slice, and thus the number of rows that will be
306
// read into a dataframe.
307
let mut rows_scanned: IdxSize;
308
309
if row_group_start > 0 {
310
// In the case of async reads, we need to account for the fact that row_group_start may be greater than
311
// zero due to earlier processing.
312
// For details, see: https://github.com/pola-rs/polars/pull/20508#discussion_r1900165649
313
rows_scanned = (0..row_group_start)
314
.map(|i| file_metadata.row_groups[i].num_rows() as IdxSize)
315
.sum();
316
} else {
317
rows_scanned = 0;
318
}
319
320
for i in row_group_start..row_group_end {
321
let row_count_start = rows_scanned;
322
let rg_md = &file_metadata.row_groups[i];
323
let n_rows_this_file = rg_md.num_rows();
324
let rg_slice =
325
split_slice_at_file(&mut n_rows_processed, n_rows_this_file, slice.0, slice_end);
326
rows_scanned = rows_scanned
327
.checked_add(n_rows_this_file as IdxSize)
328
.ok_or(ROW_COUNT_OVERFLOW_ERR)?;
329
330
*rows_read += rg_slice.1 as IdxSize;
331
332
if rg_slice.1 == 0 {
333
continue;
334
}
335
336
row_groups.push((rg_md, rg_slice, row_count_start));
337
}
338
339
let dfs = POOL.install(|| {
340
// Set partitioned fields to prevent quadratic behavior.
341
// Ensure all row groups are partitioned.
342
row_groups
343
.into_par_iter()
344
.map(|(md, slice, row_count_start)| {
345
if slice.1 == 0 {
346
return Ok(None);
347
}
348
// test we don't read the parquet file if this env var is set
349
#[cfg(debug_assertions)]
350
{
351
assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err())
352
}
353
354
let sorting_map = create_sorting_map(md);
355
356
let columns = projection
357
.iter()
358
.map(|column_i| {
359
let (name, field) = schema.get_at_index(*column_i).unwrap();
360
361
let Some(iter) = md.columns_under_root_iter(name) else {
362
return Ok(Column::full_null(
363
name.clone(),
364
md.num_rows(),
365
&DataType::from_arrow_field(field),
366
));
367
};
368
369
let part = iter.collect::<Vec<_>>();
370
371
let (mut series, _) = column_idx_to_series(
372
*column_i,
373
part.as_slice(),
374
Some(Filter::new_ranged(slice.0, slice.0 + slice.1)),
375
schema,
376
store,
377
)?;
378
379
try_set_sorted_flag(&mut series, *column_i, &sorting_map);
380
Ok(series.into_column())
381
})
382
.collect::<PolarsResult<Vec<_>>>()?;
383
384
let mut df = unsafe { DataFrame::new_no_checks(slice.1, columns) };
385
386
if let Some(rc) = &row_index {
387
unsafe {
388
df.with_row_index_mut(
389
rc.name.clone(),
390
Some(row_count_start as IdxSize + rc.offset + slice.0 as IdxSize),
391
)
392
};
393
}
394
395
materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);
396
397
Ok(Some(df))
398
})
399
.collect::<PolarsResult<Vec<_>>>()
400
})?;
401
Ok(dfs.into_iter().flatten().collect())
402
}
403
404
#[allow(clippy::too_many_arguments)]
405
pub fn read_parquet<R: MmapBytesReader>(
406
mut reader: R,
407
pre_slice: (usize, usize),
408
projection: Option<&[usize]>,
409
reader_schema: &ArrowSchemaRef,
410
metadata: Option<FileMetadataRef>,
411
mut parallel: ParallelStrategy,
412
row_index: Option<RowIndex>,
413
hive_partition_columns: Option<&[Series]>,
414
) -> PolarsResult<DataFrame> {
415
// Fast path.
416
if pre_slice.1 == 0 {
417
return Ok(materialize_empty_df(
418
projection,
419
reader_schema,
420
hive_partition_columns,
421
row_index.as_ref(),
422
));
423
}
424
425
let file_metadata = metadata
426
.map(Ok)
427
.unwrap_or_else(|| read::read_metadata(&mut reader).map(Arc::new))?;
428
let n_row_groups = file_metadata.row_groups.len();
429
430
let materialized_projection = projection
431
.map(Cow::Borrowed)
432
.unwrap_or_else(|| Cow::Owned((0usize..reader_schema.len()).collect::<Vec<_>>()));
433
434
if ParallelStrategy::Auto == parallel {
435
if n_row_groups > materialized_projection.len() || n_row_groups > POOL.current_num_threads()
436
{
437
parallel = ParallelStrategy::RowGroups;
438
} else {
439
parallel = ParallelStrategy::Columns;
440
}
441
}
442
443
if let (ParallelStrategy::Columns, true) = (parallel, materialized_projection.len() == 1) {
444
parallel = ParallelStrategy::None;
445
}
446
447
let reader = ReaderBytes::from(&mut reader);
448
let store = mmap::ColumnStore::Local(unsafe {
449
std::mem::transmute::<ReaderBytes<'_>, ReaderBytes<'static>>(reader).to_memslice()
450
});
451
452
let dfs = rg_to_dfs(
453
&store,
454
&mut 0,
455
0,
456
n_row_groups,
457
pre_slice,
458
&file_metadata,
459
reader_schema,
460
row_index.clone(),
461
parallel,
462
&materialized_projection,
463
hive_partition_columns,
464
)?;
465
466
if dfs.is_empty() {
467
Ok(materialize_empty_df(
468
projection,
469
reader_schema,
470
hive_partition_columns,
471
row_index.as_ref(),
472
))
473
} else {
474
accumulate_dataframes_vertical(dfs)
475
}
476
}
477
478
pub fn calc_prefilter_cost(mask: &arrow::bitmap::Bitmap) -> f64 {
479
let num_edges = mask.num_edges() as f64;
480
let rg_len = mask.len() as f64;
481
482
// @GB: I did quite some analysis on this.
483
//
484
// Pre-filtered and Post-filtered can both be faster in certain scenarios.
485
//
486
// - Pre-filtered is faster when there is some amount of clustering or
487
// sorting involved or if the number of values selected is small.
488
// - Post-filtering is faster when the predicate selects a somewhat random
489
// elements throughout the row group.
490
//
491
// The following is a heuristic value to try and estimate which one is
492
// faster. Essentially, it sees how many times it needs to switch between
493
// skipping items and collecting items and compares it against the number
494
// of values that it will collect.
495
//
496
// Closer to 0: pre-filtering is probably better.
497
// Closer to 1: post-filtering is probably better.
498
(num_edges / rg_len).clamp(0.0, 1.0)
499
}
500
501
#[derive(Clone, Copy)]
502
pub enum PrefilterMaskSetting {
503
Auto,
504
Pre,
505
Post,
506
}
507
508
impl PrefilterMaskSetting {
509
pub fn init_from_env() -> Self {
510
std::env::var("POLARS_PQ_PREFILTERED_MASK").map_or(Self::Auto, |v| match &v[..] {
511
"auto" => Self::Auto,
512
"pre" => Self::Pre,
513
"post" => Self::Post,
514
_ => panic!("Invalid `POLARS_PQ_PREFILTERED_MASK` value '{v}'."),
515
})
516
}
517
518
pub fn should_prefilter(&self, prefilter_cost: f64, dtype: &ArrowDataType) -> bool {
519
match self {
520
Self::Auto => {
521
// Prefiltering is only expensive for nested types so we make the cut-off quite
522
// high.
523
let is_nested = dtype.is_nested();
524
525
// We empirically selected these numbers.
526
!is_nested && prefilter_cost <= 0.01
527
},
528
Self::Pre => true,
529
Self::Post => false,
530
}
531
}
532
}
533
534