Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sources/parquet/mod.rs
8512 views
1
use std::sync::Arc;
2
3
use arrow::datatypes::ArrowSchemaRef;
4
use async_trait::async_trait;
5
use polars_core::prelude::ArrowSchema;
6
use polars_core::schema::{Schema, SchemaExt, SchemaRef};
7
use polars_error::{PolarsResult, polars_err};
8
use polars_io::cloud::CloudOptions;
9
use polars_io::predicates::ScanIOPredicate;
10
use polars_io::prelude::{FileMetadata, ParquetOptions};
11
use polars_io::utils::byte_source::{BufferByteSource, DynByteSource, DynByteSourceBuilder};
12
use polars_io::{RowIndex, pl_async};
13
use polars_parquet::read::schema::infer_schema_with_options;
14
use polars_plan::dsl::ScanSource;
15
use polars_utils::IdxSize;
16
use polars_utils::mem::prefetch::get_memory_prefetch_func;
17
use polars_utils::slice_enum::Slice;
18
19
use super::multi_scan::reader_interface::output::{FileReaderOutputRecv, FileReaderOutputSend};
20
use super::multi_scan::reader_interface::{
21
BeginReadArgs, FileReader, FileReaderCallbacks, calc_row_position_after_slice,
22
};
23
use crate::async_executor::{self};
24
use crate::async_primitives::wait_group::{WaitGroup, WaitToken};
25
use crate::metrics::OptIOMetrics;
26
use crate::morsel::SourceToken;
27
use crate::nodes::compute_node_prelude::*;
28
use crate::nodes::io_sources::parquet::projection::{
29
ArrowFieldProjection, resolve_arrow_field_projections,
30
};
31
use crate::nodes::{TaskPriority, io_sources};
32
use crate::utils::tokio_handle_ext;
33
34
pub mod builder;
35
pub mod init;
36
mod metadata_utils;
37
mod projection;
38
mod row_group_data_fetch;
39
mod row_group_decode;
40
mod statistics;
41
42
pub struct ParquetFileReader {
43
scan_source: ScanSource,
44
cloud_options: Option<Arc<CloudOptions>>,
45
config: Arc<ParquetOptions>,
46
/// Set by the builder if we have metadata left over from DSL conversion.
47
metadata: Option<Arc<FileMetadata>>,
48
byte_source_builder: DynByteSourceBuilder,
49
row_group_prefetch_sync: RowGroupPrefetchSync,
50
io_metrics: OptIOMetrics,
51
verbose: bool,
52
53
/// Set during initialize()
54
init_data: Option<InitializedState>,
55
}
56
57
struct RowGroupPrefetchSync {
58
prefetch_limit: usize,
59
prefetch_semaphore: Arc<tokio::sync::Semaphore>,
60
shared_prefetch_wait_group_slot: Arc<std::sync::Mutex<Option<WaitGroup>>>,
61
62
/// Waits for the previous reader to finish spawning prefetches.
63
prev_all_spawned: Option<WaitGroup>,
64
/// Dropped once the current reader has finished spawning prefetches.
65
current_all_spawned: Option<WaitToken>,
66
}
67
68
#[derive(Clone)]
69
struct InitializedState {
70
file_metadata: Arc<FileMetadata>,
71
file_schema: Arc<ArrowSchema>,
72
file_schema_pl: Option<SchemaRef>,
73
byte_source: Arc<DynByteSource>,
74
}
75
76
#[async_trait]
77
impl FileReader for ParquetFileReader {
78
async fn initialize(&mut self) -> PolarsResult<()> {
79
let verbose = self.verbose;
80
81
if self.init_data.is_some() {
82
return Ok(());
83
}
84
85
let scan_source = self.scan_source.clone();
86
let byte_source_builder = self.byte_source_builder.clone();
87
let cloud_options = self.cloud_options.clone();
88
let io_metrics = self.io_metrics.clone();
89
90
let byte_source = pl_async::get_runtime()
91
.spawn(async move {
92
scan_source
93
.as_scan_source_ref()
94
.to_dyn_byte_source(
95
&byte_source_builder,
96
cloud_options.as_deref(),
97
io_metrics.0,
98
)
99
.await
100
})
101
.await
102
.unwrap()?;
103
104
let mut byte_source = Arc::new(byte_source);
105
106
let file_metadata = if let Some(v) = self.metadata.clone() {
107
v
108
} else {
109
let (metadata_bytes, opt_full_bytes) = {
110
let byte_source = byte_source.clone();
111
112
pl_async::get_runtime()
113
.spawn(async move {
114
metadata_utils::read_parquet_metadata_bytes(&byte_source, verbose).await
115
})
116
.await
117
.unwrap()?
118
};
119
120
if let Some(full_bytes) = opt_full_bytes {
121
byte_source = Arc::new(DynByteSource::Buffer(BufferByteSource(full_bytes)));
122
}
123
124
Arc::new(polars_parquet::parquet::read::deserialize_metadata(
125
metadata_bytes.as_ref(),
126
metadata_bytes.len() * 2 + 1024,
127
)?)
128
};
129
130
let file_schema = Arc::new(infer_schema_with_options(&file_metadata, &None)?);
131
132
self.init_data = Some(InitializedState {
133
file_metadata,
134
file_schema,
135
file_schema_pl: None,
136
byte_source,
137
});
138
139
Ok(())
140
}
141
142
fn prepare_read(&mut self) -> PolarsResult<()> {
143
let wait_group_this_reader = WaitGroup::default();
144
let prefetch_all_spawned_token = wait_group_this_reader.token();
145
146
let prev_wait_group: Option<WaitGroup> = self
147
.row_group_prefetch_sync
148
.shared_prefetch_wait_group_slot
149
.try_lock()
150
.unwrap()
151
.replace(wait_group_this_reader);
152
153
self.row_group_prefetch_sync.prev_all_spawned = prev_wait_group;
154
self.row_group_prefetch_sync.current_all_spawned = Some(prefetch_all_spawned_token);
155
156
Ok(())
157
}
158
159
fn begin_read(
160
&mut self,
161
args: BeginReadArgs,
162
) -> PolarsResult<(FileReaderOutputRecv, JoinHandle<PolarsResult<()>>)> {
163
let verbose = self.verbose;
164
165
let InitializedState {
166
file_metadata,
167
file_schema: file_arrow_schema,
168
file_schema_pl: _,
169
byte_source,
170
} = self.init_data.clone().unwrap();
171
172
let n_rows_in_file = self._n_rows_in_file()?;
173
174
let single_morsel_height: Option<usize> = if let BeginReadArgs {
175
projection,
176
row_index: None,
177
pre_slice,
178
predicate: None,
179
cast_columns_policy: _,
180
num_pipelines: _,
181
disable_morsel_split: true,
182
callbacks:
183
FileReaderCallbacks {
184
file_schema_tx: _,
185
n_rows_in_file_tx: _,
186
row_position_on_end_tx: _,
187
},
188
} = &args
189
&& projection.is_empty()
190
{
191
let mut h: usize = n_rows_in_file as _;
192
193
if let Some(pre_slice) = pre_slice.clone() {
194
h = usize::min(h, pre_slice.restrict_to_bounds(h).len());
195
}
196
197
Some(h)
198
} else {
199
None
200
};
201
202
let BeginReadArgs {
203
projection,
204
row_index,
205
pre_slice: pre_slice_arg,
206
predicate,
207
cast_columns_policy,
208
num_pipelines,
209
disable_morsel_split,
210
callbacks:
211
FileReaderCallbacks {
212
file_schema_tx,
213
n_rows_in_file_tx,
214
row_position_on_end_tx,
215
},
216
} = args;
217
218
let file_schema = self._file_schema().clone();
219
220
let normalized_pre_slice = pre_slice_arg
221
.clone()
222
.map(|x| x.restrict_to_bounds(usize::try_from(n_rows_in_file).unwrap()));
223
224
// Send all callbacks to unblock the next reader. We can do this immediately as we know
225
// the total row count upfront.
226
227
if let Some(n_rows_in_file_tx) = n_rows_in_file_tx {
228
_ = n_rows_in_file_tx.send(n_rows_in_file);
229
}
230
231
// We are allowed to send this value immediately, even though we haven't "ended" yet
232
// (see its definition under FileReaderCallbacks).
233
if let Some(row_position_on_end_tx) = row_position_on_end_tx {
234
_ = row_position_on_end_tx
235
.send(self._row_position_after_slice(normalized_pre_slice.clone())?);
236
}
237
238
if let Some(file_schema_tx) = file_schema_tx {
239
_ = file_schema_tx.send(file_schema.clone());
240
}
241
242
if normalized_pre_slice.as_ref().is_some_and(|x| x.len() == 0) {
243
let (_, rx) = FileReaderOutputSend::new_serial();
244
245
if verbose {
246
eprintln!(
247
"[ParquetFileReader]: early return: \
248
n_rows_in_file: {n_rows_in_file}, \
249
pre_slice: {pre_slice_arg:?}, \
250
resolved_pre_slice: {normalized_pre_slice:?}"
251
)
252
}
253
254
return Ok((
255
rx,
256
async_executor::spawn(TaskPriority::Low, std::future::ready(Ok(()))),
257
));
258
}
259
260
let mut _projected_arrow_fields: Option<Arc<[ArrowFieldProjection]>> = None;
261
let mut projected_arrow_fields = || {
262
if _projected_arrow_fields.is_none() {
263
_projected_arrow_fields = Some(resolve_arrow_field_projections(
264
&file_arrow_schema,
265
&file_schema,
266
projection.clone(),
267
cast_columns_policy.clone(),
268
)?);
269
}
270
PolarsResult::Ok(_projected_arrow_fields.as_ref().unwrap().clone())
271
};
272
273
if verbose {
274
eprintln!(
275
"[ParquetFileReader]: \
276
project: {} / {}, \
277
pre_slice: {:?}, \
278
resolved_pre_slice: {:?}, \
279
row_index: {:?}, \
280
predicate: {:?}",
281
projected_arrow_fields()?.len(),
282
file_schema.len(),
283
pre_slice_arg,
284
normalized_pre_slice,
285
&row_index,
286
predicate.as_ref().map(|_| "<predicate>"),
287
)
288
}
289
290
if let Some(single_morsel_height) = single_morsel_height {
291
let (mut tx, rx) = FileReaderOutputSend::new_serial();
292
293
let handle = async_executor::spawn(TaskPriority::Low, async move {
294
let _ = tx
295
.send_morsel(Morsel::new(
296
DataFrame::empty_with_height(single_morsel_height),
297
MorselSeq::default(),
298
SourceToken::default(),
299
))
300
.await;
301
Ok(())
302
});
303
304
return Ok((rx, handle));
305
}
306
307
// Prepare parameters for dispatch
308
let projected_arrow_fields = projected_arrow_fields()?.clone();
309
let memory_prefetch_func = get_memory_prefetch_func(verbose);
310
let row_group_prefetch_size = self
311
.row_group_prefetch_sync
312
.prefetch_limit
313
.min(file_metadata.row_groups.len())
314
.max(1);
315
316
// This can be set to 1 to force column-per-thread parallelism, e.g. for bug reproduction.
317
let target_values_per_thread =
318
std::env::var("POLARS_PARQUET_DECODE_TARGET_VALUES_PER_THREAD")
319
.map(|x| x.parse::<usize>().expect("integer").max(1))
320
.unwrap_or(16_777_216);
321
322
let is_full_projection = projected_arrow_fields.len() == file_schema.len();
323
324
let (output_recv, handle) = ParquetReadImpl {
325
projected_arrow_fields,
326
is_full_projection,
327
predicate,
328
// TODO: Refactor to avoid full clone
329
options: Arc::unwrap_or_clone(self.config.clone()),
330
byte_source,
331
normalized_pre_slice: normalized_pre_slice.map(|x| match x {
332
Slice::Positive { offset, len } => (offset, len),
333
Slice::Negative { .. } => unreachable!(),
334
}),
335
metadata: file_metadata,
336
config: io_sources::parquet::Config {
337
num_pipelines,
338
row_group_prefetch_size,
339
target_values_per_thread,
340
},
341
verbose,
342
memory_prefetch_func,
343
row_index,
344
rg_prefetch_semaphore: Arc::clone(&self.row_group_prefetch_sync.prefetch_semaphore),
345
rg_prefetch_prev_all_spawned: Option::take(
346
&mut self.row_group_prefetch_sync.prev_all_spawned,
347
),
348
rg_prefetch_current_all_spawned: Option::take(
349
&mut self.row_group_prefetch_sync.current_all_spawned,
350
),
351
disable_morsel_split,
352
}
353
.run();
354
355
Ok((
356
output_recv,
357
async_executor::spawn(TaskPriority::Low, async move { handle.await.unwrap() }),
358
))
359
}
360
361
async fn file_schema(&mut self) -> PolarsResult<SchemaRef> {
362
Ok(self._file_schema().clone())
363
}
364
365
async fn file_arrow_schema(&mut self) -> PolarsResult<Option<ArrowSchemaRef>> {
366
Ok(Some(self._file_arrow_schema().clone()))
367
}
368
369
async fn n_rows_in_file(&mut self) -> PolarsResult<IdxSize> {
370
self._n_rows_in_file()
371
}
372
373
async fn fast_n_rows_in_file(&mut self) -> PolarsResult<Option<IdxSize>> {
374
self._n_rows_in_file().map(Some)
375
}
376
377
async fn row_position_after_slice(
378
&mut self,
379
pre_slice: Option<Slice>,
380
) -> PolarsResult<IdxSize> {
381
self._row_position_after_slice(pre_slice)
382
}
383
}
384
385
impl ParquetFileReader {
386
fn _file_schema(&mut self) -> &SchemaRef {
387
let InitializedState {
388
file_schema,
389
file_schema_pl,
390
..
391
} = self.init_data.as_mut().unwrap();
392
393
if file_schema_pl.is_none() {
394
*file_schema_pl = Some(Arc::new(Schema::from_arrow_schema(file_schema.as_ref())))
395
}
396
397
file_schema_pl.as_ref().unwrap()
398
}
399
400
fn _file_arrow_schema(&mut self) -> &ArrowSchemaRef {
401
let InitializedState { file_schema, .. } = self.init_data.as_mut().unwrap();
402
file_schema
403
}
404
405
fn _n_rows_in_file(&self) -> PolarsResult<IdxSize> {
406
let n = self.init_data.as_ref().unwrap().file_metadata.num_rows;
407
IdxSize::try_from(n).map_err(|_| polars_err!(bigidx, ctx = "parquet file", size = n))
408
}
409
410
fn _row_position_after_slice(&self, pre_slice: Option<Slice>) -> PolarsResult<IdxSize> {
411
Ok(calc_row_position_after_slice(
412
self._n_rows_in_file()?,
413
pre_slice,
414
))
415
}
416
}
417
418
type AsyncTaskData = (
419
FileReaderOutputRecv,
420
tokio_handle_ext::AbortOnDropHandle<PolarsResult<()>>,
421
);
422
423
struct ParquetReadImpl {
424
projected_arrow_fields: Arc<[ArrowFieldProjection]>,
425
is_full_projection: bool,
426
predicate: Option<ScanIOPredicate>,
427
options: ParquetOptions,
428
byte_source: Arc<DynByteSource>,
429
normalized_pre_slice: Option<(usize, usize)>,
430
metadata: Arc<FileMetadata>,
431
// Run-time vars
432
config: Config,
433
verbose: bool,
434
memory_prefetch_func: fn(&[u8]) -> (),
435
row_index: Option<RowIndex>,
436
437
rg_prefetch_semaphore: Arc<tokio::sync::Semaphore>,
438
rg_prefetch_prev_all_spawned: Option<WaitGroup>,
439
rg_prefetch_current_all_spawned: Option<WaitToken>,
440
disable_morsel_split: bool,
441
}
442
443
#[derive(Debug)]
444
struct Config {
445
num_pipelines: usize,
446
/// Number of row groups to pre-fetch concurrently, this can be across files
447
row_group_prefetch_size: usize,
448
/// Minimum number of values for a parallel spawned task to process to amortize
449
/// parallelism overhead.
450
target_values_per_thread: usize,
451
}
452
453
impl ParquetReadImpl {
454
fn run(mut self) -> AsyncTaskData {
455
if self.verbose {
456
eprintln!("[ParquetFileReader]: {:?}", &self.config);
457
}
458
459
self.init_morsel_distributor()
460
}
461
}
462
463