Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sources/ipc/mod.rs
8446 views
1
use std::io::Cursor;
2
use std::ops::Range;
3
use std::sync::Arc;
4
5
use arrow::io::ipc::read::{Dictionaries, read_dictionary_block};
6
use async_trait::async_trait;
7
use polars_core::prelude::DataType;
8
use polars_core::schema::{Schema, SchemaExt};
9
use polars_core::utils::arrow::io::ipc::read::{
10
BlockReader, FileMetadata, ProjectionInfo, prepare_projection, read_file_metadata,
11
};
12
use polars_error::{ErrString, PolarsError, PolarsResult};
13
use polars_io::cloud::CloudOptions;
14
use polars_io::ipc::IpcScanOptions;
15
use polars_io::pl_async;
16
use polars_io::utils::byte_source::{
17
BufferByteSource, ByteSource, DynByteSource, DynByteSourceBuilder,
18
};
19
use polars_io::utils::slice::SplitSlicePosition;
20
use polars_plan::dsl::ScanSource;
21
use polars_utils::IdxSize;
22
use polars_utils::bool::UnsafeBool;
23
use polars_utils::mem::prefetch::get_memory_prefetch_func;
24
use polars_utils::slice_enum::Slice;
25
use record_batch_data_fetch::RecordBatchDataFetcher;
26
use record_batch_decode::RecordBatchDecoder;
27
28
use super::multi_scan::reader_interface::BeginReadArgs;
29
use super::multi_scan::reader_interface::output::FileReaderOutputRecv;
30
use crate::async_executor::{self, JoinHandle, TaskPriority};
31
use crate::async_primitives::wait_group::{WaitGroup, WaitToken};
32
use crate::metrics::OptIOMetrics;
33
use crate::morsel::{Morsel, MorselSeq, SourceToken, get_ideal_morsel_size};
34
use crate::nodes::io_sources::ipc::metadata::read_ipc_metadata_bytes;
35
use crate::nodes::io_sources::multi_scan::reader_interface::output::FileReaderOutputSend;
36
use crate::nodes::io_sources::multi_scan::reader_interface::{
37
FileReader, FileReaderCallbacks, Projection,
38
};
39
use crate::nodes::io_sources::parquet::init::split_to_morsels;
40
use crate::utils::tokio_handle_ext::AbortOnDropHandle;
41
42
pub mod builder;
43
mod metadata;
44
mod record_batch_data_fetch;
45
mod record_batch_decode;
46
47
const ROW_COUNT_OVERFLOW_ERR: PolarsError = PolarsError::ComputeError(ErrString::new_static(
48
"\
49
IPC file produces more than 2^32 rows; \
50
consider compiling with polars-bigidx feature (pip install polars[rt64])",
51
));
52
53
struct IpcFileReader {
54
scan_source: ScanSource,
55
cloud_options: Option<Arc<CloudOptions>>,
56
config: Arc<IpcScanOptions>,
57
metadata: Option<Arc<FileMetadata>>,
58
byte_source_builder: DynByteSourceBuilder,
59
record_batch_prefetch_sync: RecordBatchPrefetchSync,
60
io_metrics: OptIOMetrics,
61
verbose: bool,
62
init_data: Option<InitializedState>,
63
checked: UnsafeBool,
64
}
65
66
struct RecordBatchPrefetchSync {
67
prefetch_limit: usize,
68
prefetch_semaphore: Arc<tokio::sync::Semaphore>,
69
shared_prefetch_wait_group_slot: Arc<std::sync::Mutex<Option<WaitGroup>>>,
70
71
/// Waits for the previous reader to finish spawning prefetches.
72
prev_all_spawned: Option<WaitGroup>,
73
/// Dropped once the current reader has finished spawning prefetches.
74
current_all_spawned: Option<WaitToken>,
75
}
76
77
#[derive(Clone)]
78
struct InitializedState {
79
file_metadata: Arc<FileMetadata>,
80
byte_source: Arc<DynByteSource>,
81
dictionaries: Arc<Option<Dictionaries>>,
82
}
83
84
#[async_trait]
85
impl FileReader for IpcFileReader {
86
async fn initialize(&mut self) -> PolarsResult<()> {
87
if self.init_data.is_some() {
88
return Ok(());
89
}
90
91
let verbose = self.verbose;
92
let scan_source = self.scan_source.clone();
93
let byte_source_builder = self.byte_source_builder.clone();
94
let cloud_options = self.cloud_options.clone();
95
let io_metrics = self.io_metrics.clone();
96
97
let byte_source = pl_async::get_runtime()
98
.spawn(async move {
99
scan_source
100
.as_scan_source_ref()
101
.to_dyn_byte_source(
102
&byte_source_builder,
103
cloud_options.as_deref(),
104
io_metrics.0,
105
)
106
.await
107
})
108
.await
109
.unwrap()?;
110
111
let mut byte_source = Arc::new(byte_source);
112
113
let file_metadata = if let Some(v) = self.metadata.clone() {
114
v
115
} else {
116
let (metadata_bytes, opt_full_bytes) = {
117
let byte_source = byte_source.clone();
118
119
pl_async::get_runtime()
120
.spawn(async move { read_ipc_metadata_bytes(&byte_source, verbose).await })
121
.await
122
.unwrap()?
123
};
124
125
if let Some(full_bytes) = opt_full_bytes {
126
byte_source = Arc::new(DynByteSource::Buffer(BufferByteSource(full_bytes)));
127
}
128
129
Arc::new(read_file_metadata(&mut std::io::Cursor::new(
130
metadata_bytes,
131
))?)
132
};
133
134
let dictionaries = {
135
let byte_source_async = byte_source.clone();
136
let metadata_async = file_metadata.clone();
137
let checked = self.checked;
138
let dictionaries = pl_async::get_runtime()
139
.spawn(async move {
140
read_dictionaries(&byte_source_async, metadata_async, verbose, checked).await
141
})
142
.await
143
.unwrap()?;
144
Arc::new(Some(dictionaries))
145
};
146
147
self.init_data = Some(InitializedState {
148
file_metadata,
149
byte_source,
150
dictionaries,
151
});
152
153
Ok(())
154
}
155
156
fn prepare_read(&mut self) -> PolarsResult<()> {
157
let wait_group_this_reader = WaitGroup::default();
158
let prefetch_all_spawned_token = wait_group_this_reader.token();
159
160
let prev_wait_group: Option<WaitGroup> = self
161
.record_batch_prefetch_sync
162
.shared_prefetch_wait_group_slot
163
.try_lock()
164
.unwrap()
165
.replace(wait_group_this_reader);
166
167
self.record_batch_prefetch_sync.prev_all_spawned = prev_wait_group;
168
self.record_batch_prefetch_sync.current_all_spawned = Some(prefetch_all_spawned_token);
169
170
Ok(())
171
}
172
173
fn begin_read(
174
&mut self,
175
args: BeginReadArgs,
176
) -> PolarsResult<(FileReaderOutputRecv, JoinHandle<PolarsResult<()>>)> {
177
let verbose = self.verbose;
178
179
// Initialize.
180
let InitializedState {
181
file_metadata,
182
byte_source,
183
dictionaries,
184
} = self.init_data.clone().unwrap();
185
186
let BeginReadArgs {
187
projection: Projection::Plain(projected_schema),
188
row_index,
189
pre_slice: pre_slice_arg,
190
predicate: None,
191
cast_columns_policy: _,
192
num_pipelines,
193
disable_morsel_split,
194
callbacks:
195
FileReaderCallbacks {
196
file_schema_tx,
197
n_rows_in_file_tx,
198
row_position_on_end_tx,
199
},
200
} = args
201
else {
202
panic!("unsupported args: {:?}", &args)
203
};
204
205
debug_assert!(!matches!(pre_slice_arg, Some(Slice::Negative { .. })));
206
207
let file_schema_pl = std::cell::LazyCell::new(|| {
208
Arc::new(Schema::from_arrow_schema(file_metadata.schema.as_ref()))
209
});
210
211
// Handle callbacks that are ready now.
212
if let Some(file_schema_tx) = file_schema_tx {
213
_ = file_schema_tx.send(file_schema_pl.clone());
214
}
215
216
// @NOTE. Negative slicing takes a 2-pass approach. The first pass gets the total row count
217
// by setting slice.len to 0, and uses this to convert the negative slice into a positive slice.
218
// The second pass uses the positive slice to fetch and slice the data.
219
let fetch_metadata_only = pre_slice_arg.as_ref().is_some_and(|x| x.len() == 0);
220
221
// Always create a slice. If no slice was given, just make the biggest slice possible.
222
let slice_range: Range<usize> = pre_slice_arg
223
.clone()
224
.map_or(0..usize::MAX, Range::<usize>::from);
225
let n_rows_limit = if pre_slice_arg.is_some() {
226
Some(slice_range.end)
227
} else {
228
None
229
};
230
231
// Avoid materializing projection info if we are projecting all the columns of this file.
232
let projection_indices: Option<Vec<usize>> = if let Some(first_mismatch_idx) =
233
(0..file_metadata.schema.len().min(projected_schema.len())).find(|&i| {
234
file_metadata.schema.get_at_index(i).unwrap().0
235
!= projected_schema.get_at_index(i).unwrap().0
236
}) {
237
let mut out = Vec::with_capacity(file_metadata.schema.len());
238
239
out.extend(0..first_mismatch_idx);
240
241
out.extend(
242
(first_mismatch_idx..projected_schema.len()).filter_map(|i| {
243
file_metadata
244
.schema
245
.index_of(projected_schema.get_at_index(i).unwrap().0)
246
}),
247
);
248
249
Some(out)
250
} else if file_metadata.schema.len() > projected_schema.len() {
251
// Names match up to projected schema len.
252
Some((0..projected_schema.len()).collect::<Vec<_>>())
253
} else {
254
// Name order matches up to `file_metadata.schema.len()`, we are projecting all columns
255
// in this file.
256
None
257
};
258
259
// Unstable.
260
let read_statistics_flags = self.config.record_batch_statistics;
261
262
if verbose {
263
eprintln!(
264
"[IpcFileReader]: \
265
project: {} / {}, \
266
pre_slice: {:?}, \
267
read_record_batch_statistics_flags: {}\
268
",
269
projection_indices
270
.as_ref()
271
.map_or(file_metadata.schema.len(), |x| x.len()),
272
file_metadata.schema.len(),
273
pre_slice_arg,
274
read_statistics_flags
275
)
276
}
277
278
let projection_info: Option<ProjectionInfo> =
279
projection_indices.map(|indices| prepare_projection(&file_metadata.schema, indices));
280
let projection_info = Arc::new(projection_info);
281
282
let schema = projection_info.as_ref().as_ref().map_or(
283
file_metadata.schema.as_ref(),
284
|ProjectionInfo { schema, .. }| schema,
285
);
286
let pl_schema = Arc::new(
287
schema
288
.iter()
289
.map(|(n, f)| (n.clone(), DataType::from_arrow_field(f)))
290
.collect::<Schema>(),
291
);
292
293
// Prepare parameters for Prefetch
294
let memory_prefetch_func = get_memory_prefetch_func(verbose);
295
296
let record_batch_prefetch_size = self
297
.record_batch_prefetch_sync
298
.prefetch_limit
299
.min(file_metadata.blocks.len())
300
.max(1);
301
302
let io_runtime = polars_io::pl_async::get_runtime();
303
let ideal_morsel_size = get_ideal_morsel_size();
304
305
if verbose {
306
eprintln!(
307
"[IpcFileReader]: num_pipelines: {num_pipelines}, record_batch_prefetch_size: {record_batch_prefetch_size}, ideal_morsel_size: {ideal_morsel_size}"
308
);
309
eprintln!(
310
"[IpcFileReader]: record batch count: {:?}",
311
file_metadata.blocks.len()
312
);
313
}
314
315
let record_batch_decoder = Arc::new(RecordBatchDecoder {
316
file_metadata: file_metadata.clone(),
317
pl_schema,
318
projection_info,
319
dictionaries: dictionaries.clone(),
320
row_index,
321
read_statistics_flags,
322
checked: self.checked,
323
});
324
325
// Set up channels.
326
let (prefetch_send, mut prefetch_recv) =
327
tokio::sync::mpsc::channel(record_batch_prefetch_size);
328
let (decode_send, mut decode_recv) = tokio::sync::mpsc::channel(num_pipelines);
329
let (mut morsel_send, morsel_recv) = FileReaderOutputSend::new_serial();
330
331
let rb_prefetch_semaphore = Arc::clone(&self.record_batch_prefetch_sync.prefetch_semaphore);
332
let rb_prefetch_prev_all_spawned =
333
Option::take(&mut self.record_batch_prefetch_sync.prev_all_spawned);
334
let rb_prefetch_current_all_spawned =
335
Option::take(&mut self.record_batch_prefetch_sync.current_all_spawned);
336
337
// Task: Prefetch.
338
let byte_source = byte_source.clone();
339
let metadata = file_metadata.clone();
340
let prefetch_task = AbortOnDropHandle(io_runtime.spawn(async move {
341
let mut record_batch_data_fetcher = RecordBatchDataFetcher {
342
memory_prefetch_func,
343
metadata,
344
byte_source,
345
record_batch_idx: 0,
346
fetch_metadata_only,
347
n_rows_limit,
348
n_rows_in_file_tx,
349
row_position_on_end_tx,
350
prefetch_send,
351
rb_prefetch_semaphore,
352
rb_prefetch_current_all_spawned,
353
};
354
355
if let Some(rb_prefetch_prev_all_spawned) = rb_prefetch_prev_all_spawned {
356
rb_prefetch_prev_all_spawned.wait().await;
357
}
358
359
record_batch_data_fetcher.run().await?;
360
361
PolarsResult::Ok(())
362
}));
363
364
// Task: Decode.
365
let decode_task = AbortOnDropHandle(io_runtime.spawn(async move {
366
let mut current_row_offset: IdxSize = 0;
367
368
while let Some((prefetch_task, permit)) = prefetch_recv.recv().await {
369
let mut record_batch_data = prefetch_task.await.unwrap()?;
370
record_batch_data.row_offset = Some(current_row_offset);
371
372
// Fetch every record batch so we can track the total row count.
373
let rb_num_rows = record_batch_data.num_rows;
374
let rb_num_rows =
375
IdxSize::try_from(rb_num_rows).map_err(|_| ROW_COUNT_OVERFLOW_ERR)?;
376
377
// Only pass to decoder if we need the data.
378
let record_batch_position = SplitSlicePosition::split_slice_at_file(
379
current_row_offset as usize,
380
rb_num_rows as usize,
381
slice_range.clone(),
382
);
383
384
current_row_offset = current_row_offset
385
.checked_add(rb_num_rows)
386
.ok_or(ROW_COUNT_OVERFLOW_ERR)?;
387
388
match record_batch_position {
389
SplitSlicePosition::Before => continue,
390
SplitSlicePosition::Overlapping(rows_offset, rows_len) => {
391
let record_batch_decoder = record_batch_decoder.clone();
392
let decode_fut = async_executor::spawn(TaskPriority::High, async move {
393
record_batch_decoder
394
.record_batch_data_to_df(record_batch_data, rows_offset, rows_len)
395
.await
396
});
397
if decode_send.send((decode_fut, permit)).await.is_err() {
398
break;
399
}
400
},
401
SplitSlicePosition::After => break,
402
};
403
}
404
405
PolarsResult::Ok(())
406
}));
407
408
// Task: Distributor.
409
// Distributes morsels across pipelines. This does not perform any CPU or I/O bound work -
410
// it is purely a dispatch loop. Run on the computational executor to reduce context switches.
411
let last_morsel_min_split = num_pipelines;
412
let distribute_task = async_executor::spawn(TaskPriority::High, async move {
413
let mut morsel_seq = MorselSeq::default();
414
// Note: We don't use this (it is handled by the bridge). But morsels require a source token.
415
let source_token = SourceToken::new();
416
417
// Decode first non-empty morsel.
418
let mut next = None;
419
loop {
420
let Some((decode_fut, permit)) = decode_recv.recv().await else {
421
break;
422
};
423
let df = decode_fut.await?;
424
if df.height() == 0 {
425
continue;
426
}
427
428
if disable_morsel_split {
429
if morsel_send
430
.send_morsel(Morsel::new(df, morsel_seq, source_token.clone()))
431
.await
432
.is_err()
433
{
434
return Ok(());
435
}
436
drop(permit);
437
morsel_seq = morsel_seq.successor();
438
continue;
439
}
440
441
next = Some((df, permit));
442
break;
443
}
444
445
while let Some((df, permit)) = next.take() {
446
// Try to decode the next non-empty morsel first, so we know
447
// whether the df is the last morsel.
448
449
// Important: Drop this before awaiting the next one, or could
450
// deadlock if the permit limit is 1.
451
drop(permit);
452
loop {
453
let Some((decode_fut, permit)) = decode_recv.recv().await else {
454
break;
455
};
456
let next_df = decode_fut.await?;
457
if next_df.height() == 0 {
458
continue;
459
}
460
next = Some((next_df, permit));
461
break;
462
}
463
464
for df in split_to_morsels(
465
&df,
466
ideal_morsel_size,
467
next.is_none(),
468
last_morsel_min_split,
469
) {
470
if morsel_send
471
.send_morsel(Morsel::new(df, morsel_seq, source_token.clone()))
472
.await
473
.is_err()
474
{
475
return Ok(());
476
}
477
morsel_seq = morsel_seq.successor();
478
}
479
}
480
PolarsResult::Ok(())
481
});
482
483
// Orchestration.
484
let join_task = io_runtime.spawn(async move {
485
prefetch_task.await.unwrap()?;
486
decode_task.await.unwrap()?;
487
distribute_task.await?;
488
Ok(())
489
});
490
491
let handle = AbortOnDropHandle(join_task);
492
493
Ok((
494
morsel_recv,
495
async_executor::spawn(TaskPriority::Low, async move { handle.await.unwrap() }),
496
))
497
}
498
}
499
500
async fn read_dictionaries(
501
byte_source: &DynByteSource,
502
file_metadata: Arc<FileMetadata>,
503
verbose: bool,
504
checked: UnsafeBool,
505
) -> PolarsResult<Dictionaries> {
506
let blocks = if let Some(blocks) = &file_metadata.dictionaries {
507
blocks
508
} else {
509
return Ok(Dictionaries::default());
510
};
511
512
if verbose {
513
eprintln!("[IpcFileReader]: reading dictionaries ({:?})", blocks.len());
514
}
515
516
let mut dictionaries = Dictionaries::default();
517
518
let mut message_scratch = Vec::new();
519
let mut dictionary_scratch = Vec::new();
520
521
for block in blocks {
522
let range = block.offset as usize
523
..block.offset as usize + block.meta_data_length as usize + block.body_length as usize;
524
let bytes = byte_source.get_range(range).await?;
525
526
let mut reader = BlockReader::new(Cursor::new(bytes.as_ref()));
527
528
read_dictionary_block(
529
&mut reader.reader,
530
file_metadata.as_ref(),
531
block,
532
true,
533
&mut dictionaries,
534
&mut message_scratch,
535
&mut dictionary_scratch,
536
checked,
537
)?;
538
}
539
540
Ok(dictionaries)
541
}
542
543