Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sources/parquet/mod.rs
6939 views
1
use std::sync::Arc;
2
3
use arrow::datatypes::ArrowSchemaRef;
4
use async_trait::async_trait;
5
use polars_core::prelude::ArrowSchema;
6
use polars_core::schema::{Schema, SchemaExt, SchemaRef};
7
use polars_error::{PolarsResult, polars_err};
8
use polars_io::cloud::CloudOptions;
9
use polars_io::predicates::ScanIOPredicate;
10
use polars_io::prelude::{FileMetadata, ParquetOptions};
11
use polars_io::utils::byte_source::{DynByteSource, DynByteSourceBuilder, MemSliceByteSource};
12
use polars_io::{RowIndex, pl_async};
13
use polars_parquet::read::schema::infer_schema_with_options;
14
use polars_plan::dsl::ScanSource;
15
use polars_utils::IdxSize;
16
use polars_utils::mem::prefetch::get_memory_prefetch_func;
17
use polars_utils::slice_enum::Slice;
18
19
use super::multi_scan::reader_interface::output::{FileReaderOutputRecv, FileReaderOutputSend};
20
use super::multi_scan::reader_interface::{
21
BeginReadArgs, FileReader, FileReaderCallbacks, calc_row_position_after_slice,
22
};
23
use crate::async_executor::{self};
24
use crate::nodes::compute_node_prelude::*;
25
use crate::nodes::io_sources::parquet::projection::{
26
ArrowFieldProjection, resolve_arrow_field_projections,
27
};
28
use crate::nodes::{TaskPriority, io_sources};
29
use crate::utils::task_handles_ext;
30
31
pub mod builder;
32
mod init;
33
mod metadata_utils;
34
mod projection;
35
mod row_group_data_fetch;
36
mod row_group_decode;
37
mod statistics;
38
39
pub struct ParquetFileReader {
40
scan_source: ScanSource,
41
cloud_options: Option<Arc<CloudOptions>>,
42
config: Arc<ParquetOptions>,
43
/// Set by the builder if we have metadata left over from DSL conversion.
44
metadata: Option<Arc<FileMetadata>>,
45
byte_source_builder: DynByteSourceBuilder,
46
verbose: bool,
47
48
/// Set during initialize()
49
init_data: Option<InitializedState>,
50
}
51
52
#[derive(Clone)]
53
struct InitializedState {
54
file_metadata: Arc<FileMetadata>,
55
file_schema: Arc<ArrowSchema>,
56
file_schema_pl: Option<SchemaRef>,
57
byte_source: Arc<DynByteSource>,
58
}
59
60
#[async_trait]
61
impl FileReader for ParquetFileReader {
62
async fn initialize(&mut self) -> PolarsResult<()> {
63
let verbose = self.verbose;
64
65
if self.init_data.is_some() {
66
return Ok(());
67
}
68
69
let scan_source = self.scan_source.clone();
70
let byte_source_builder = self.byte_source_builder.clone();
71
let cloud_options = self.cloud_options.clone();
72
73
let byte_source = pl_async::get_runtime()
74
.spawn(async move {
75
scan_source
76
.as_scan_source_ref()
77
.to_dyn_byte_source(&byte_source_builder, cloud_options.as_deref())
78
.await
79
})
80
.await
81
.unwrap()?;
82
83
let mut byte_source = Arc::new(byte_source);
84
85
let file_metadata = if let Some(v) = self.metadata.clone() {
86
v
87
} else {
88
let (metadata_bytes, opt_full_bytes) = {
89
let byte_source = byte_source.clone();
90
91
pl_async::get_runtime()
92
.spawn(async move {
93
metadata_utils::read_parquet_metadata_bytes(&byte_source, verbose).await
94
})
95
.await
96
.unwrap()?
97
};
98
99
if let Some(full_bytes) = opt_full_bytes {
100
byte_source = Arc::new(DynByteSource::MemSlice(MemSliceByteSource(full_bytes)));
101
}
102
103
Arc::new(polars_parquet::parquet::read::deserialize_metadata(
104
metadata_bytes.as_ref(),
105
metadata_bytes.len() * 2 + 1024,
106
)?)
107
};
108
109
let file_schema = Arc::new(infer_schema_with_options(&file_metadata, &None)?);
110
111
self.init_data = Some(InitializedState {
112
file_metadata,
113
file_schema,
114
file_schema_pl: None,
115
byte_source,
116
});
117
118
Ok(())
119
}
120
121
fn begin_read(
122
&mut self,
123
args: BeginReadArgs,
124
) -> PolarsResult<(FileReaderOutputRecv, JoinHandle<PolarsResult<()>>)> {
125
let verbose = self.verbose;
126
127
let InitializedState {
128
file_metadata,
129
file_schema: file_arrow_schema,
130
file_schema_pl: _,
131
byte_source,
132
} = self.init_data.clone().unwrap();
133
134
let BeginReadArgs {
135
projection,
136
row_index,
137
pre_slice: pre_slice_arg,
138
predicate,
139
cast_columns_policy,
140
num_pipelines,
141
callbacks:
142
FileReaderCallbacks {
143
file_schema_tx,
144
n_rows_in_file_tx,
145
row_position_on_end_tx,
146
},
147
} = args;
148
149
let file_schema = self._file_schema().clone();
150
151
let projected_arrow_fields = resolve_arrow_field_projections(
152
&file_arrow_schema,
153
&file_schema,
154
projection,
155
cast_columns_policy,
156
)?;
157
158
let n_rows_in_file = self._n_rows_in_file()?;
159
160
let normalized_pre_slice = pre_slice_arg
161
.clone()
162
.map(|x| x.restrict_to_bounds(usize::try_from(n_rows_in_file).unwrap()));
163
164
// Send all callbacks to unblock the next reader. We can do this immediately as we know
165
// the total row count upfront.
166
167
if let Some(mut n_rows_in_file_tx) = n_rows_in_file_tx {
168
_ = n_rows_in_file_tx.try_send(n_rows_in_file);
169
}
170
171
// We are allowed to send this value immediately, even though we haven't "ended" yet
172
// (see its definition under FileReaderCallbacks).
173
if let Some(mut row_position_on_end_tx) = row_position_on_end_tx {
174
_ = row_position_on_end_tx
175
.try_send(self._row_position_after_slice(normalized_pre_slice.clone())?);
176
}
177
178
if let Some(mut file_schema_tx) = file_schema_tx {
179
_ = file_schema_tx.try_send(file_schema.clone());
180
}
181
182
if normalized_pre_slice.as_ref().is_some_and(|x| x.len() == 0) {
183
let (_, rx) = FileReaderOutputSend::new_serial();
184
185
if verbose {
186
eprintln!(
187
"[ParquetFileReader]: early return: \
188
n_rows_in_file: {n_rows_in_file}, \
189
pre_slice: {pre_slice_arg:?}, \
190
resolved_pre_slice: {normalized_pre_slice:?} \
191
"
192
)
193
}
194
195
return Ok((
196
rx,
197
async_executor::spawn(TaskPriority::Low, std::future::ready(Ok(()))),
198
));
199
}
200
201
// Prepare parameters for dispatch
202
203
let memory_prefetch_func = get_memory_prefetch_func(verbose);
204
let row_group_prefetch_size = polars_core::config::get_rg_prefetch_size();
205
206
// This can be set to 1 to force column-per-thread parallelism, e.g. for bug reproduction.
207
let target_values_per_thread =
208
std::env::var("POLARS_PARQUET_DECODE_TARGET_VALUES_PER_THREAD")
209
.map(|x| x.parse::<usize>().expect("integer").max(1))
210
.unwrap_or(16_777_216);
211
212
let is_full_projection = projected_arrow_fields.len() == file_schema.len();
213
214
if verbose {
215
eprintln!(
216
"[ParquetFileReader]: \
217
project: {} / {}, \
218
pre_slice: {:?}, \
219
resolved_pre_slice: {:?}, \
220
row_index: {:?}, \
221
predicate: {:?} \
222
",
223
projected_arrow_fields.len(),
224
file_schema.len(),
225
pre_slice_arg,
226
normalized_pre_slice,
227
&row_index,
228
predicate.as_ref().map(|_| "<predicate>"),
229
)
230
}
231
232
let (output_recv, handle) = ParquetReadImpl {
233
projected_arrow_fields,
234
is_full_projection,
235
predicate,
236
// TODO: Refactor to avoid full clone
237
options: Arc::unwrap_or_clone(self.config.clone()),
238
byte_source,
239
normalized_pre_slice: normalized_pre_slice.map(|x| match x {
240
Slice::Positive { offset, len } => (offset, len),
241
Slice::Negative { .. } => unreachable!(),
242
}),
243
metadata: file_metadata,
244
config: io_sources::parquet::Config {
245
num_pipelines,
246
row_group_prefetch_size,
247
target_values_per_thread,
248
},
249
verbose,
250
memory_prefetch_func,
251
row_index,
252
}
253
.run();
254
255
Ok((
256
output_recv,
257
async_executor::spawn(TaskPriority::Low, async move { handle.await.unwrap() }),
258
))
259
}
260
261
async fn file_schema(&mut self) -> PolarsResult<SchemaRef> {
262
Ok(self._file_schema().clone())
263
}
264
265
async fn file_arrow_schema(&mut self) -> PolarsResult<Option<ArrowSchemaRef>> {
266
Ok(Some(self._file_arrow_schema().clone()))
267
}
268
269
async fn n_rows_in_file(&mut self) -> PolarsResult<IdxSize> {
270
self._n_rows_in_file()
271
}
272
273
async fn fast_n_rows_in_file(&mut self) -> PolarsResult<Option<IdxSize>> {
274
self._n_rows_in_file().map(Some)
275
}
276
277
async fn row_position_after_slice(
278
&mut self,
279
pre_slice: Option<Slice>,
280
) -> PolarsResult<IdxSize> {
281
self._row_position_after_slice(pre_slice)
282
}
283
}
284
285
impl ParquetFileReader {
286
fn _file_schema(&mut self) -> &SchemaRef {
287
let InitializedState {
288
file_schema,
289
file_schema_pl,
290
..
291
} = self.init_data.as_mut().unwrap();
292
293
if file_schema_pl.is_none() {
294
*file_schema_pl = Some(Arc::new(Schema::from_arrow_schema(file_schema.as_ref())))
295
}
296
297
file_schema_pl.as_ref().unwrap()
298
}
299
300
fn _file_arrow_schema(&mut self) -> &ArrowSchemaRef {
301
let InitializedState { file_schema, .. } = self.init_data.as_mut().unwrap();
302
file_schema
303
}
304
305
fn _n_rows_in_file(&self) -> PolarsResult<IdxSize> {
306
let n = self.init_data.as_ref().unwrap().file_metadata.num_rows;
307
IdxSize::try_from(n).map_err(|_| polars_err!(bigidx, ctx = "parquet file", size = n))
308
}
309
310
fn _row_position_after_slice(&self, pre_slice: Option<Slice>) -> PolarsResult<IdxSize> {
311
Ok(calc_row_position_after_slice(
312
self._n_rows_in_file()?,
313
pre_slice,
314
))
315
}
316
}
317
318
type AsyncTaskData = (
319
FileReaderOutputRecv,
320
task_handles_ext::AbortOnDropHandle<PolarsResult<()>>,
321
);
322
323
struct ParquetReadImpl {
324
projected_arrow_fields: Arc<[ArrowFieldProjection]>,
325
is_full_projection: bool,
326
predicate: Option<ScanIOPredicate>,
327
options: ParquetOptions,
328
byte_source: Arc<DynByteSource>,
329
normalized_pre_slice: Option<(usize, usize)>,
330
metadata: Arc<FileMetadata>,
331
// Run-time vars
332
config: Config,
333
verbose: bool,
334
memory_prefetch_func: fn(&[u8]) -> (),
335
row_index: Option<RowIndex>,
336
}
337
338
#[derive(Debug)]
339
struct Config {
340
num_pipelines: usize,
341
/// Number of row groups to pre-fetch concurrently, this can be across files
342
row_group_prefetch_size: usize,
343
/// Minimum number of values for a parallel spawned task to process to amortize
344
/// parallelism overhead.
345
target_values_per_thread: usize,
346
}
347
348
impl ParquetReadImpl {
349
fn run(mut self) -> AsyncTaskData {
350
if self.verbose {
351
eprintln!("[ParquetFileReader]: {:?}", &self.config);
352
}
353
354
self.init_morsel_distributor()
355
}
356
}
357
358