Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sources/ipc.rs
6939 views
1
use std::cmp::Reverse;
2
use std::io::Cursor;
3
use std::ops::Range;
4
use std::sync::Arc;
5
6
use arrow::array::TryExtend;
7
use async_trait::async_trait;
8
use polars_core::frame::DataFrame;
9
use polars_core::prelude::DataType;
10
use polars_core::schema::{Schema, SchemaExt};
11
use polars_core::utils::arrow::io::ipc::read::{
12
FileMetadata, ProjectionInfo, get_row_count_from_blocks, prepare_projection, read_file_metadata,
13
};
14
use polars_error::{ErrString, PolarsError, PolarsResult, polars_err};
15
use polars_io::RowIndex;
16
use polars_io::cloud::CloudOptions;
17
use polars_plan::dsl::{ScanSource, ScanSourceRef};
18
use polars_utils::IdxSize;
19
use polars_utils::mmap::MemSlice;
20
use polars_utils::priority::Priority;
21
use polars_utils::slice_enum::Slice;
22
23
use super::multi_scan::reader_interface::output::FileReaderOutputRecv;
24
use super::multi_scan::reader_interface::{BeginReadArgs, calc_row_position_after_slice};
25
use crate::async_executor::{AbortOnDropHandle, JoinHandle, TaskPriority, spawn};
26
use crate::async_primitives::distributor_channel::distributor_channel;
27
use crate::async_primitives::linearizer::Linearizer;
28
use crate::morsel::{Morsel, MorselSeq, SourceToken, get_ideal_morsel_size};
29
use crate::nodes::io_sources::multi_scan::reader_interface::output::FileReaderOutputSend;
30
use crate::nodes::io_sources::multi_scan::reader_interface::{
31
FileReader, FileReaderCallbacks, Projection,
32
};
33
use crate::{DEFAULT_DISTRIBUTOR_BUFFER_SIZE, DEFAULT_LINEARIZER_BUFFER_SIZE};
34
35
pub mod builder {
36
use std::sync::Arc;
37
38
use arrow::io::ipc::read::FileMetadata;
39
use polars_core::config;
40
use polars_io::cloud::CloudOptions;
41
use polars_plan::dsl::ScanSource;
42
43
use super::IpcFileReader;
44
use crate::nodes::io_sources::multi_scan::reader_interface::FileReader;
45
use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;
46
use crate::nodes::io_sources::multi_scan::reader_interface::capabilities::ReaderCapabilities;
47
48
#[derive(Debug)]
49
pub struct IpcReaderBuilder {
50
#[expect(unused)]
51
pub first_metadata: Option<Arc<FileMetadata>>,
52
}
53
54
#[cfg(feature = "ipc")]
55
impl FileReaderBuilder for IpcReaderBuilder {
56
fn reader_name(&self) -> &str {
57
"ipc"
58
}
59
60
fn reader_capabilities(&self) -> ReaderCapabilities {
61
use ReaderCapabilities as RC;
62
63
RC::NEEDS_FILE_CACHE_INIT | RC::ROW_INDEX | RC::PRE_SLICE | RC::NEGATIVE_PRE_SLICE
64
}
65
66
fn build_file_reader(
67
&self,
68
source: ScanSource,
69
cloud_options: Option<Arc<CloudOptions>>,
70
#[expect(unused)] scan_source_idx: usize,
71
) -> Box<dyn FileReader> {
72
let scan_source = source;
73
let verbose = config::verbose();
74
75
// FIXME: For some reason the metadata does not match on idx == 0, and we end up with
76
// * ComputeError: out-of-spec: InvalidBuffersLength { buffers_size: 1508, file_size: 763 }
77
//
78
// let metadata: Option<Arc<FileMetadata>> = if scan_source_idx == 0 {
79
// self.first_metadata.clone()
80
// } else {
81
// None
82
// };
83
let metadata = None;
84
85
let reader = IpcFileReader {
86
scan_source,
87
cloud_options,
88
metadata,
89
verbose,
90
init_data: None,
91
};
92
93
Box::new(reader) as Box<dyn FileReader>
94
}
95
}
96
}
97
98
const ROW_COUNT_OVERFLOW_ERR: PolarsError = PolarsError::ComputeError(ErrString::new_static(
99
"\
100
IPC file produces more than 2^32 rows; \
101
consider compiling with polars-bigidx feature (polars-u64-idx package on python)",
102
));
103
104
struct IpcFileReader {
105
scan_source: ScanSource,
106
cloud_options: Option<Arc<CloudOptions>>,
107
metadata: Option<Arc<FileMetadata>>,
108
verbose: bool,
109
110
init_data: Option<InitializedState>,
111
}
112
113
#[derive(Clone)]
114
struct InitializedState {
115
memslice: MemSlice,
116
file_metadata: Arc<FileMetadata>,
117
// Lazily initialized - getting this involves iterating record batches.
118
n_rows_in_file: Option<IdxSize>,
119
}
120
121
/// Move `slice` forward by `n` and return the slice until then.
122
fn slice_take(slice: &mut Range<usize>, n: usize) -> Range<usize> {
123
let offset = slice.start;
124
let length = slice.len();
125
126
assert!(offset <= n);
127
128
let chunk_length = (n - offset).min(length);
129
let rng = offset..offset + chunk_length;
130
*slice = 0..length - chunk_length;
131
132
rng
133
}
134
135
fn get_max_morsel_size() -> usize {
136
std::env::var("POLARS_STREAMING_IPC_SOURCE_MAX_MORSEL_SIZE")
137
.map_or_else(
138
|_| get_ideal_morsel_size(),
139
|v| {
140
v.parse::<usize>().expect(
141
"POLARS_STREAMING_IPC_SOURCE_MAX_MORSEL_SIZE does not contain valid size",
142
)
143
},
144
)
145
.max(1)
146
}
147
148
#[async_trait]
149
impl FileReader for IpcFileReader {
150
async fn initialize(&mut self) -> PolarsResult<()> {
151
if self.init_data.is_some() {
152
return Ok(());
153
}
154
155
// check_latest: IR resolution does not download IPC.
156
// TODO: Streaming reads
157
if let ScanSourceRef::Path(addr) = self.scan_source.as_scan_source_ref() {
158
polars_io::file_cache::init_entries_from_uri_list(
159
[Arc::from(addr.to_str())].into_iter(),
160
self.cloud_options.as_deref(),
161
)?;
162
}
163
164
let memslice = self
165
.scan_source
166
.as_scan_source_ref()
167
.to_memslice_async_check_latest(self.scan_source.run_async())?;
168
169
let file_metadata = if let Some(v) = self.metadata.clone() {
170
v
171
} else {
172
Arc::new(read_file_metadata(&mut std::io::Cursor::new(
173
memslice.as_ref(),
174
))?)
175
};
176
177
self.init_data = Some(InitializedState {
178
memslice,
179
file_metadata,
180
n_rows_in_file: None,
181
});
182
183
Ok(())
184
}
185
186
fn begin_read(
187
&mut self,
188
args: BeginReadArgs,
189
) -> PolarsResult<(FileReaderOutputRecv, JoinHandle<PolarsResult<()>>)> {
190
let verbose = self.verbose;
191
192
let InitializedState {
193
memslice,
194
file_metadata,
195
n_rows_in_file: _,
196
} = self.init_data.clone().unwrap();
197
198
let BeginReadArgs {
199
projection: Projection::Plain(projected_schema),
200
row_index,
201
pre_slice: pre_slice_arg,
202
predicate: None,
203
cast_columns_policy: _,
204
num_pipelines,
205
callbacks:
206
FileReaderCallbacks {
207
file_schema_tx,
208
n_rows_in_file_tx,
209
row_position_on_end_tx,
210
},
211
} = args
212
else {
213
panic!("unsupported args: {:?}", &args)
214
};
215
216
let file_schema_pl = std::cell::LazyCell::new(|| {
217
Arc::new(Schema::from_arrow_schema(file_metadata.schema.as_ref()))
218
});
219
220
let normalized_pre_slice = if let Some(pre_slice) = pre_slice_arg.clone() {
221
Some(pre_slice.restrict_to_bounds(usize::try_from(self._n_rows_in_file()?).unwrap()))
222
} else {
223
None
224
};
225
226
if let Some(mut n_rows_in_file_tx) = n_rows_in_file_tx {
227
_ = n_rows_in_file_tx.try_send(self._n_rows_in_file()?);
228
}
229
230
if let Some(mut row_position_on_end_tx) = row_position_on_end_tx {
231
_ = row_position_on_end_tx
232
.try_send(self._row_position_after_slice(normalized_pre_slice.clone())?);
233
}
234
235
if let Some(mut file_schema_tx) = file_schema_tx {
236
_ = file_schema_tx.try_send(file_schema_pl.clone());
237
}
238
239
if normalized_pre_slice.as_ref().is_some_and(|x| x.len() == 0) {
240
let (_, rx) = FileReaderOutputSend::new_serial();
241
242
if verbose {
243
eprintln!(
244
"[IpcFileReader]: early return: \
245
n_rows_in_file: {}, \
246
pre_slice: {:?}, \
247
resolved_pre_slice: {:?} \
248
",
249
self._n_rows_in_file()?,
250
pre_slice_arg,
251
normalized_pre_slice
252
)
253
}
254
255
return Ok((rx, spawn(TaskPriority::Low, std::future::ready(Ok(())))));
256
}
257
258
// Prepare parameters for tasks
259
260
// Always create a slice. If no slice was given, just make the biggest slice possible.
261
let slice: Range<usize> = normalized_pre_slice
262
.clone()
263
.map_or(0..usize::MAX, Range::<usize>::from);
264
265
// Avoid materializing projection info if we are projecting all the columns of this file.
266
let projection_indices: Option<Vec<usize>> = if let Some(first_mismatch_idx) =
267
(0..file_metadata.schema.len().min(projected_schema.len())).find(|&i| {
268
file_metadata.schema.get_at_index(i).unwrap().0
269
!= projected_schema.get_at_index(i).unwrap().0
270
}) {
271
let mut out = Vec::with_capacity(file_metadata.schema.len());
272
273
out.extend(0..first_mismatch_idx);
274
275
out.extend(
276
(first_mismatch_idx..projected_schema.len()).filter_map(|i| {
277
file_metadata
278
.schema
279
.index_of(projected_schema.get_at_index(i).unwrap().0)
280
}),
281
);
282
283
Some(out)
284
} else if file_metadata.schema.len() > projected_schema.len() {
285
// Names match up to projected schema len.
286
Some((0..projected_schema.len()).collect::<Vec<_>>())
287
} else {
288
// Name order matches up to `file_metadata.schema.len()`, we are projecting all columns
289
// in this file.
290
None
291
};
292
293
if verbose {
294
eprintln!(
295
"[IpcFileReader]: \
296
project: {} / {}, \
297
pre_slice: {:?}, \
298
resolved_pre_slice: {:?} \
299
",
300
projection_indices
301
.as_ref()
302
.map_or(file_metadata.schema.len(), |x| x.len()),
303
file_metadata.schema.len(),
304
pre_slice_arg,
305
normalized_pre_slice
306
)
307
}
308
309
let projection_info: Option<ProjectionInfo> =
310
projection_indices.map(|indices| prepare_projection(&file_metadata.schema, indices));
311
312
// Split size for morsels.
313
let max_morsel_size = get_max_morsel_size();
314
315
let metadata = file_metadata;
316
317
/// Messages sent from Walker task to Decoder tasks.
318
struct BatchMessage {
319
row_idx_offset: IdxSize,
320
slice: Range<usize>,
321
block_range: Range<usize>,
322
morsel_seq_base: u64,
323
}
324
325
let (mut morsel_sender, morsel_rx) = FileReaderOutputSend::new_serial();
326
327
// Walker task -> Decoder tasks.
328
let (mut batch_tx, batch_rxs) =
329
distributor_channel::<BatchMessage>(num_pipelines, *DEFAULT_DISTRIBUTOR_BUFFER_SIZE);
330
// Decoder tasks -> Distributor task.
331
let (mut decoded_rx, decoded_tx) =
332
Linearizer::<Priority<Reverse<MorselSeq>, DataFrame>>::new(
333
num_pipelines,
334
*DEFAULT_LINEARIZER_BUFFER_SIZE,
335
);
336
337
// Explicitly linearize here to redistribute morsels from large record batches.
338
//
339
// If record batches in the source IPC file are large, one decoder might produce many
340
// morsels at the same time. At the same time, other decoders might not produce anything.
341
// Therefore, we would like to distribute the output of a single decoder task over the
342
// available output pipelines.
343
//
344
// Note, we can theoretically use `FileReaderOutputSend::parallel()` as it also linearizes
345
// internally, but this behavior is an implementation detail rather than a guarantee.
346
let distributor_handle = AbortOnDropHandle::new(spawn(TaskPriority::High, async move {
347
// Note: We don't use this (it is handled by the bridge). But morsels require a source token.
348
let source_token = SourceToken::new();
349
350
while let Some(Priority(Reverse(seq), df)) = decoded_rx.get().await {
351
let morsel = Morsel::new(df, seq, source_token.clone());
352
353
if morsel_sender.send_morsel(morsel).await.is_err() {
354
break;
355
}
356
}
357
358
PolarsResult::Ok(())
359
}));
360
361
// Decoder tasks.
362
//
363
// Tasks a IPC file and certain number of blocks and decodes each block as a record batch.
364
// Then, all record batches are concatenated into a DataFrame. If the resulting DataFrame
365
// is too large, which happens when we have one very large block, the DataFrame is split
366
// into smaller pieces an spread among the pipelines.
367
let decoder_handles = decoded_tx
368
.into_iter()
369
.zip(batch_rxs)
370
.map(|(mut send, mut rx)| {
371
let memslice = memslice.clone();
372
let metadata = metadata.clone();
373
let row_index = row_index.clone();
374
let projection_info = projection_info.clone();
375
AbortOnDropHandle::new(spawn(TaskPriority::Low, async move {
376
// Amortize allocations.
377
let mut data_scratch = Vec::new();
378
let mut message_scratch = Vec::new();
379
380
let schema = projection_info.as_ref().map_or(
381
metadata.schema.as_ref(),
382
|ProjectionInfo { schema, .. }| schema,
383
);
384
let pl_schema = schema
385
.iter()
386
.map(|(n, f)| (n.clone(), DataType::from_arrow_field(f)))
387
.collect::<Schema>();
388
389
while let Ok(m) = rx.recv().await {
390
let BatchMessage {
391
row_idx_offset,
392
slice,
393
morsel_seq_base,
394
block_range,
395
} = m;
396
397
// If we don't project any columns we cannot read properly from the file,
398
// so we just create an empty frame with the proper height.
399
let mut df = if pl_schema.is_empty() {
400
DataFrame::empty_with_height(slice.len())
401
} else {
402
use polars_core::utils::arrow::io::ipc;
403
404
let mut reader = ipc::read::FileReader::new_with_projection_info(
405
Cursor::new(memslice.as_ref()),
406
metadata.as_ref().clone(),
407
projection_info.clone(),
408
None,
409
);
410
411
reader.set_current_block(block_range.start);
412
reader.set_scratches((
413
std::mem::take(&mut data_scratch),
414
std::mem::take(&mut message_scratch),
415
));
416
417
// Create the DataFrame with the appropriate schema and append all the record
418
// batches to it. This will perform schema validation as well.
419
let mut df = DataFrame::empty_with_schema(&pl_schema);
420
df.try_extend(reader.by_ref().take(block_range.len()))?;
421
422
(data_scratch, message_scratch) = reader.take_scratches();
423
df = df.slice(slice.start as i64, slice.len());
424
425
df
426
};
427
428
if let Some(RowIndex { name, offset: _ }) = &row_index {
429
let offset = row_idx_offset + slice.start as IdxSize;
430
df = df.with_row_index(name.clone(), Some(offset))?;
431
}
432
433
// If the block is very large, we want to split the block amongst the
434
// pipelines. That will at least allow some parallelism.
435
if df.height() > max_morsel_size && verbose {
436
eprintln!(
437
"IpcFileReader encountered a (too) large record batch \
438
of {} rows. Splitting and continuing.",
439
df.height()
440
);
441
}
442
443
for i in 0..df.height().div_ceil(max_morsel_size) {
444
let morsel_df = df.slice((i * max_morsel_size) as i64, max_morsel_size);
445
let seq = MorselSeq::new(morsel_seq_base + i as u64);
446
if send
447
.insert(Priority(Reverse(seq), morsel_df))
448
.await
449
.is_err()
450
{
451
break;
452
}
453
}
454
}
455
456
PolarsResult::Ok(())
457
}))
458
})
459
.collect::<Vec<_>>();
460
461
let memslice = memslice;
462
let metadata = metadata;
463
let row_index = row_index;
464
let projection_info = projection_info;
465
466
// Walker task.
467
//
468
// Walks all the sources and supplies block ranges to the decoder tasks.
469
let walker_handle = AbortOnDropHandle::new(spawn(TaskPriority::Low, async move {
470
let mut morsel_seq: u64 = 0;
471
let mut row_idx_offset: IdxSize = row_index.as_ref().map_or(0, |ri| ri.offset);
472
let mut slice: Range<usize> = slice;
473
474
struct Batch {
475
row_idx_offset: IdxSize,
476
block_start: usize,
477
num_rows: usize,
478
}
479
480
// Batch completion parameters
481
let batch_size_limit = get_ideal_morsel_size();
482
let sliced_batch_size_limit = slice.len().div_ceil(num_pipelines);
483
let batch_block_limit = metadata.blocks.len().div_ceil(num_pipelines);
484
485
use polars_core::utils::arrow::io::ipc;
486
487
let mut reader = ipc::read::FileReader::new_with_projection_info(
488
Cursor::new(memslice.as_ref()),
489
metadata.as_ref().clone(),
490
projection_info.clone(),
491
None,
492
);
493
494
if slice.start > 0 {
495
// Skip over all blocks that the slice would skip anyway.
496
let new_offset = reader.skip_blocks_till_limit(slice.start as u64)?;
497
498
row_idx_offset += (slice.start as u64 - new_offset) as IdxSize;
499
slice = new_offset as usize..new_offset as usize + slice.len();
500
}
501
502
'read: {
503
// If we skip the entire file. Don't even try to read from it.
504
if reader.get_current_block() == reader.metadata().blocks.len() {
505
break 'read;
506
}
507
508
let mut batch = Batch {
509
row_idx_offset,
510
block_start: reader.get_current_block(),
511
num_rows: 0,
512
};
513
514
// We don't yet want to commit these values to the state in case this batch gets
515
// cancelled.
516
let mut uncommitted_slice = slice.clone();
517
let mut uncommitted_row_idx_offset = row_idx_offset;
518
while !slice.is_empty() {
519
let mut is_batch_complete = false;
520
521
match reader.next_record_batch() {
522
None if batch.num_rows == 0 => break,
523
524
// If we have no more record batches available, we want to send what is
525
// left.
526
None => is_batch_complete = true,
527
Some(record_batch) => {
528
let rb_num_rows = record_batch?.length()? as usize;
529
batch.num_rows += rb_num_rows;
530
531
// We need to ensure that we are not overflowing the IdxSize maximum
532
// capacity.
533
let rb_num_rows = IdxSize::try_from(rb_num_rows)
534
.map_err(|_| ROW_COUNT_OVERFLOW_ERR)?;
535
uncommitted_row_idx_offset = uncommitted_row_idx_offset
536
.checked_add(rb_num_rows)
537
.ok_or(ROW_COUNT_OVERFLOW_ERR)?;
538
},
539
}
540
541
let current_block = reader.get_current_block();
542
543
// Subdivide into batches for large files.
544
is_batch_complete |= batch.num_rows >= batch_size_limit;
545
// Subdivide into batches if the file is sliced.
546
is_batch_complete |= batch.num_rows >= sliced_batch_size_limit;
547
// Subdivide into batches for small files.
548
is_batch_complete |= current_block - batch.block_start >= batch_block_limit;
549
550
// Batch blocks such that we send appropriately sized morsels. We guarantee a
551
// lower bound here, but not an upper bound.
552
if is_batch_complete {
553
let batch_slice = slice_take(&mut uncommitted_slice, batch.num_rows);
554
let batch_slice_len = batch_slice.len();
555
let block_range = batch.block_start..current_block;
556
557
let message = BatchMessage {
558
row_idx_offset: batch.row_idx_offset,
559
slice: batch_slice,
560
morsel_seq_base: morsel_seq,
561
block_range,
562
};
563
564
if batch_tx.send(message).await.is_err() {
565
// This should only happen if the receiver of the decoder
566
// has broken off, meaning no further input will be needed.
567
break 'read;
568
}
569
570
// Commit the changes to the state.
571
// Now, we know that the a decoder will process it.
572
//
573
// This might generate several morsels if the record batch is very large.
574
morsel_seq += batch_slice_len.div_ceil(max_morsel_size) as u64;
575
slice = uncommitted_slice.clone();
576
row_idx_offset = uncommitted_row_idx_offset;
577
578
batch = Batch {
579
row_idx_offset,
580
block_start: current_block,
581
num_rows: 0,
582
};
583
}
584
}
585
} // 'read
586
587
PolarsResult::Ok(())
588
}));
589
590
Ok((
591
morsel_rx,
592
spawn(TaskPriority::Low, async move {
593
distributor_handle.await?;
594
595
for handle in decoder_handles {
596
handle.await?;
597
}
598
599
walker_handle.await?;
600
Ok(())
601
}),
602
))
603
}
604
605
async fn n_rows_in_file(&mut self) -> PolarsResult<IdxSize> {
606
self._n_rows_in_file()
607
}
608
609
async fn row_position_after_slice(
610
&mut self,
611
pre_slice: Option<Slice>,
612
) -> PolarsResult<IdxSize> {
613
self._row_position_after_slice(pre_slice)
614
}
615
}
616
617
impl IpcFileReader {
618
fn _n_rows_in_file(&mut self) -> PolarsResult<IdxSize> {
619
let InitializedState {
620
memslice,
621
file_metadata,
622
n_rows_in_file,
623
} = self.init_data.as_mut().unwrap();
624
625
if n_rows_in_file.is_none() {
626
let n_rows: i64 = get_row_count_from_blocks(
627
&mut std::io::Cursor::new(memslice.as_ref()),
628
&file_metadata.blocks,
629
)?;
630
631
let n_rows = IdxSize::try_from(n_rows)
632
.map_err(|_| polars_err!(bigidx, ctx = "ipc file", size = n_rows))?;
633
634
*n_rows_in_file = Some(n_rows);
635
}
636
637
Ok(n_rows_in_file.unwrap())
638
}
639
640
fn _row_position_after_slice(&mut self, pre_slice: Option<Slice>) -> PolarsResult<IdxSize> {
641
Ok(calc_row_position_after_slice(
642
self._n_rows_in_file()?,
643
pre_slice,
644
))
645
}
646
}
647
648