Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sources/ndjson/mod.rs
8479 views
1
pub mod builder;
2
3
use std::cmp::Reverse;
4
use std::num::NonZeroUsize;
5
use std::ops::Range;
6
use std::sync::Arc;
7
8
use async_trait::async_trait;
9
use chunk_data_fetch::ChunkDataFetcher;
10
use line_batch_processor::{LineBatchProcessor, LineBatchProcessorOutputPort};
11
use negative_slice_pass::MorselStreamReverser;
12
use polars_error::{PolarsResult, polars_bail, polars_err};
13
use polars_io::cloud::CloudOptions;
14
use polars_io::metrics::OptIOMetrics;
15
use polars_io::pl_async;
16
use polars_io::utils::byte_source::{ByteSource, DynByteSource, DynByteSourceBuilder};
17
use polars_io::utils::compression::{ByteSourceReader, SupportedCompression};
18
use polars_io::utils::stream_buf_reader::{ReaderSource, StreamBufReader};
19
use polars_plan::dsl::ScanSource;
20
use polars_utils::IdxSize;
21
use polars_utils::mem::prefetch::get_memory_prefetch_func;
22
use polars_utils::priority::Priority;
23
use polars_utils::slice_enum::Slice;
24
use row_index_limit_pass::ApplyRowIndexOrLimit;
25
26
use super::multi_scan::reader_interface::output::FileReaderOutputRecv;
27
use super::multi_scan::reader_interface::{BeginReadArgs, FileReader, FileReaderCallbacks};
28
use crate::async_executor::{AbortOnDropHandle, spawn};
29
use crate::async_primitives::distributor_channel::distributor_channel;
30
use crate::async_primitives::linearizer::Linearizer;
31
use crate::async_primitives::oneshot_channel;
32
use crate::async_primitives::wait_group::{WaitGroup, WaitToken};
33
use crate::morsel::SourceToken;
34
use crate::nodes::compute_node_prelude::*;
35
use crate::nodes::io_sources::multi_scan::reader_interface::Projection;
36
use crate::nodes::io_sources::multi_scan::reader_interface::output::FileReaderOutputSend;
37
use crate::nodes::io_sources::ndjson::chunk_reader::ChunkReaderBuilder;
38
use crate::nodes::io_sources::ndjson::line_batch_distributor::RowSkipper;
39
use crate::nodes::{MorselSeq, TaskPriority};
40
use crate::utils::tokio_handle_ext;
41
pub mod chunk_data_fetch;
42
pub(super) mod chunk_reader;
43
mod line_batch_distributor;
44
mod line_batch_processor;
45
mod negative_slice_pass;
46
mod row_index_limit_pass;
47
48
pub struct NDJsonFileReader {
49
pub scan_source: ScanSource,
50
pub cloud_options: Option<Arc<CloudOptions>>,
51
pub chunk_reader_builder: ChunkReaderBuilder,
52
pub count_rows_fn: fn(&[u8]) -> usize,
53
pub verbose: bool,
54
pub byte_source_builder: DynByteSourceBuilder,
55
pub chunk_prefetch_sync: ChunkPrefetchSync,
56
pub init_data: Option<InitializedState>,
57
pub io_metrics: OptIOMetrics,
58
}
59
60
pub(crate) struct ChunkPrefetchSync {
61
pub(crate) prefetch_limit: usize,
62
pub(crate) prefetch_semaphore: Arc<tokio::sync::Semaphore>,
63
pub(crate) shared_prefetch_wait_group_slot: Arc<std::sync::Mutex<Option<WaitGroup>>>,
64
65
/// Waits for the previous reader to finish spawning prefetches.
66
pub(crate) prev_all_spawned: Option<WaitGroup>,
67
/// Dropped once the current reader has finished spawning prefetches.
68
pub(crate) current_all_spawned: Option<WaitToken>,
69
}
70
71
#[derive(Clone)]
72
pub struct InitializedState {
73
file_size: usize,
74
compression: Option<SupportedCompression>,
75
byte_source: Arc<DynByteSource>,
76
}
77
78
#[async_trait]
79
impl FileReader for NDJsonFileReader {
80
async fn initialize(&mut self) -> PolarsResult<()> {
81
if self.init_data.is_some() {
82
return Ok(());
83
}
84
85
let scan_source = self.scan_source.clone();
86
let byte_source_builder = self.byte_source_builder.clone();
87
let cloud_options = self.cloud_options.clone();
88
let io_metrics = self.io_metrics.clone();
89
90
let byte_source = pl_async::get_runtime()
91
.spawn(async move {
92
scan_source
93
.as_scan_source_ref()
94
.to_dyn_byte_source(
95
&byte_source_builder,
96
cloud_options.as_deref(),
97
io_metrics.0,
98
)
99
.await
100
})
101
.await
102
.unwrap()?;
103
let byte_source = Arc::new(byte_source);
104
105
// @TODO: Refactor FileInfo so we can re-use the file_size value from the planning stage.
106
let file_size = {
107
let byte_source = byte_source.clone();
108
pl_async::get_runtime()
109
.spawn(async move { byte_source.get_size().await })
110
.await
111
.unwrap()?
112
};
113
114
let compression = if file_size >= 4 {
115
let byte_source = byte_source.clone();
116
let magic_range = 0..4;
117
let magic_bytes = pl_async::get_runtime()
118
.spawn(async move { byte_source.get_range(magic_range).await })
119
.await
120
.unwrap()?;
121
SupportedCompression::check(&magic_bytes)
122
} else {
123
None
124
};
125
126
self.init_data = Some(InitializedState {
127
file_size,
128
compression,
129
byte_source,
130
});
131
132
Ok(())
133
}
134
135
fn prepare_read(&mut self) -> PolarsResult<()> {
136
let wait_group_this_reader = WaitGroup::default();
137
let prefetch_all_spawned_token = wait_group_this_reader.token();
138
139
let prev_wait_group: Option<WaitGroup> = self
140
.chunk_prefetch_sync
141
.shared_prefetch_wait_group_slot
142
.try_lock()
143
.unwrap()
144
.replace(wait_group_this_reader);
145
146
self.chunk_prefetch_sync.prev_all_spawned = prev_wait_group;
147
self.chunk_prefetch_sync.current_all_spawned = Some(prefetch_all_spawned_token);
148
149
Ok(())
150
}
151
152
fn begin_read(
153
&mut self,
154
args: BeginReadArgs,
155
) -> PolarsResult<(FileReaderOutputRecv, JoinHandle<PolarsResult<()>>)> {
156
let verbose = self.verbose;
157
158
// Initialize.
159
let InitializedState {
160
file_size,
161
compression,
162
byte_source,
163
} = self.init_data.clone().unwrap();
164
165
let BeginReadArgs {
166
projection: Projection::Plain(projected_schema),
167
mut row_index,
168
pre_slice,
169
170
num_pipelines,
171
disable_morsel_split: _,
172
callbacks:
173
FileReaderCallbacks {
174
file_schema_tx,
175
n_rows_in_file_tx,
176
row_position_on_end_tx,
177
},
178
179
predicate: None,
180
cast_columns_policy: _,
181
} = args
182
else {
183
panic!("unsupported args: {:?}", &args)
184
};
185
186
let is_empty_slice = pre_slice.as_ref().is_some_and(|x| x.len() == 0);
187
let is_negative_slice = matches!(pre_slice, Some(Slice::Negative { .. }));
188
189
// There are two byte sourcing strategies `ReaderSource`: (a) async parallel prefetch using a
190
// streaming pipeline, or (b) memory-mapped, only to be used for uncompressed local files.
191
// The `compressed_reader` (of type `ByteSourceReader`) abstracts these source types.
192
// The `use_async_prefetch` flag controls the optional pipeline startup behavior.
193
let use_async_prefetch =
194
!(matches!(byte_source.as_ref(), &DynByteSource::Buffer(_)) && compression.is_none());
195
196
// NDJSON: We just use the projected schema - the parser will automatically append NULL if
197
// the field is not found.
198
//
199
// TODO
200
// We currently always use the projected dtype, but this may cause
201
// issues e.g. with temporal types. This can be improved to better choose
202
// between the 2 dtypes.
203
let schema = projected_schema;
204
205
if let Some(tx) = file_schema_tx {
206
_ = tx.send(schema.clone())
207
}
208
209
// Convert (offset, len) to Range
210
// Note: This is converted to right-to-left for negative slice (i.e. range.start is position
211
// from end).
212
let global_slice: Option<Range<usize>> = if let Some(slice) = pre_slice.clone() {
213
match slice {
214
Slice::Positive { offset, len } => Some(offset..offset.saturating_add(len)),
215
Slice::Negative {
216
offset_from_end,
217
len,
218
} => {
219
// array: [_ _ _ _ _]
220
// slice: [ _ _ ]
221
// in: offset_from_end: 3, len: 2
222
// out: 1..3 (right-to-left)
223
Some(offset_from_end.saturating_sub(len)..offset_from_end)
224
},
225
}
226
} else {
227
None
228
};
229
230
let (total_row_count_tx, total_row_count_rx) = if is_negative_slice && row_index.is_some() {
231
let (tx, rx) = oneshot_channel::channel();
232
(Some(tx), Some(rx))
233
} else {
234
(None, None)
235
};
236
237
let needs_total_row_count = total_row_count_tx.is_some()
238
|| n_rows_in_file_tx.is_some()
239
|| (row_position_on_end_tx.is_some()
240
&& matches!(pre_slice, Some(Slice::Negative { .. })));
241
242
if verbose {
243
eprintln!(
244
"[NDJsonFileReader]: \
245
project: {}, \
246
global_slice: {:?}, \
247
row_index: {:?}, \
248
is_negative_slice: {}, \
249
use_async_prefetch: {}",
250
schema.len(),
251
&global_slice,
252
&row_index,
253
is_negative_slice,
254
use_async_prefetch
255
);
256
}
257
258
// Note: This counts from the end of file for negative slice.
259
let n_rows_to_skip = global_slice.as_ref().map_or(0, |x| x.start);
260
261
let (opt_linearizer, mut linearizer_inserters) =
262
if global_slice.is_some() || row_index.is_some() {
263
let (a, b) =
264
Linearizer::<Priority<Reverse<MorselSeq>, DataFrame>>::new(num_pipelines, 1);
265
(Some(a), b)
266
} else {
267
(None, vec![])
268
};
269
270
let output_to_linearizer = opt_linearizer.is_some();
271
let mut output_port = None;
272
273
let opt_post_process_handle = if is_negative_slice {
274
// Note: This is right-to-left
275
let negative_slice = global_slice.unwrap();
276
277
if verbose {
278
eprintln!("[NDJsonFileReader]: Initialize morsel stream reverser");
279
}
280
281
let (morsel_senders, rx) = FileReaderOutputSend::new_parallel(num_pipelines);
282
output_port = Some(rx);
283
284
Some(AbortOnDropHandle::new(spawn(
285
TaskPriority::High,
286
MorselStreamReverser {
287
morsel_receiver: opt_linearizer.unwrap(),
288
morsel_senders,
289
offset_len_rtl: (
290
negative_slice.start,
291
negative_slice.end - negative_slice.start,
292
),
293
// The correct row index offset can only be known after total row count is
294
// available. This is handled by the MorselStreamReverser.
295
row_index: row_index.take().map(|x| (x, total_row_count_rx.unwrap())),
296
verbose,
297
}
298
.run(),
299
)))
300
} else if global_slice.is_some() || row_index.is_some() {
301
let mut row_index = row_index.take();
302
303
if verbose {
304
eprintln!("[NDJsonFileReader]: Initialize ApplyRowIndexOrLimit");
305
}
306
307
if let Some(ri) = row_index.as_mut() {
308
// Update the row index offset according to the slice start.
309
let Some(v) = ri.offset.checked_add(n_rows_to_skip as IdxSize) else {
310
let offset = ri.offset;
311
312
polars_bail!(
313
ComputeError:
314
"row_index with offset {} overflows at {} rows",
315
offset, n_rows_to_skip
316
)
317
};
318
ri.offset = v;
319
}
320
321
let (morsel_tx, rx) = FileReaderOutputSend::new_serial();
322
output_port = Some(rx);
323
324
let limit = global_slice.as_ref().map(|x| x.len());
325
326
let task = ApplyRowIndexOrLimit {
327
morsel_receiver: opt_linearizer.unwrap(),
328
morsel_tx,
329
// Note: The line batch distributor handles skipping lines until the offset,
330
// we only need to handle the limit here.
331
limit,
332
row_index,
333
verbose,
334
};
335
336
if is_empty_slice {
337
None
338
} else {
339
Some(AbortOnDropHandle::new(spawn(
340
TaskPriority::High,
341
task.run(),
342
)))
343
}
344
} else {
345
None
346
};
347
348
let chunk_reader = self.chunk_reader_builder.build(schema);
349
350
let (line_batch_distribute_tx, line_batch_distribute_receivers) =
351
distributor_channel(num_pipelines, 1);
352
353
let mut morsel_senders = if !output_to_linearizer {
354
let (senders, outp) = FileReaderOutputSend::new_parallel(num_pipelines);
355
assert!(output_port.is_none());
356
output_port = Some(outp);
357
senders
358
} else {
359
vec![]
360
};
361
362
// Initialize in reverse as we want to manually pop from either the linearizer or the phase receivers depending
363
// on if we have negative slice.
364
let line_batch_processor_handles = line_batch_distribute_receivers
365
.into_iter()
366
.enumerate()
367
.rev()
368
.map(|(worker_idx, line_batch_rx)| {
369
let chunk_reader = chunk_reader.clone();
370
let count_rows_fn = self.count_rows_fn;
371
// Note: We don't use this (it is handled by the bridge). But morsels require a source token.
372
let source_token = SourceToken::new();
373
374
AbortOnDropHandle::new(spawn(
375
TaskPriority::Low,
376
LineBatchProcessor {
377
worker_idx,
378
379
chunk_reader,
380
count_rows_fn,
381
382
line_batch_rx,
383
output_port: if is_empty_slice {
384
LineBatchProcessorOutputPort::Closed
385
} else if output_to_linearizer {
386
LineBatchProcessorOutputPort::Linearize {
387
tx: linearizer_inserters.pop().unwrap(),
388
}
389
} else {
390
LineBatchProcessorOutputPort::Direct {
391
tx: morsel_senders.pop().unwrap(),
392
source_token,
393
}
394
},
395
needs_total_row_count,
396
397
// Only log from the last worker to prevent flooding output.
398
verbose: verbose && worker_idx == num_pipelines - 1,
399
}
400
.run(),
401
))
402
})
403
.collect::<Vec<_>>();
404
405
let row_skipper = RowSkipper {
406
cfg_n_rows_to_skip: n_rows_to_skip,
407
n_rows_skipped: 0,
408
is_line: self.chunk_reader_builder.is_line_fn(),
409
reverse: is_negative_slice,
410
};
411
412
// Unify the two source options (uncompressed local file mmapp'ed, or streaming async with transparent
413
// decompression), into one unified reader object.
414
let byte_source_reader: ByteSourceReader<ReaderSource> = if use_async_prefetch {
415
// Prepare parameters for Prefetch task.
416
const DEFAULT_NDJSON_CHUNK_SIZE: usize = 32 * 1024 * 1024;
417
let memory_prefetch_func = get_memory_prefetch_func(verbose);
418
let chunk_size = std::env::var("POLARS_NDJSON_CHUNK_SIZE")
419
.map(|x| {
420
x.parse::<NonZeroUsize>()
421
.unwrap_or_else(|_| {
422
panic!("invalid value for POLARS_NDJSON_CHUNK_SIZE: {x}")
423
})
424
.get()
425
})
426
.unwrap_or(DEFAULT_NDJSON_CHUNK_SIZE);
427
428
let prefetch_limit = self
429
.chunk_prefetch_sync
430
.prefetch_limit
431
.min(file_size.div_ceil(chunk_size))
432
.max(1);
433
434
let (prefetch_send, prefetch_recv) = tokio::sync::mpsc::channel(prefetch_limit);
435
436
// Task: Prefetch.
437
// Initiate parallel downloads of raw data chunks.
438
let byte_source = byte_source.clone();
439
let prefetch_task = {
440
let io_runtime = polars_io::pl_async::get_runtime();
441
442
let prefetch_semaphore = Arc::clone(&self.chunk_prefetch_sync.prefetch_semaphore);
443
let prefetch_prev_all_spawned =
444
Option::take(&mut self.chunk_prefetch_sync.prev_all_spawned);
445
let prefetch_current_all_spawned =
446
Option::take(&mut self.chunk_prefetch_sync.current_all_spawned);
447
448
tokio_handle_ext::AbortOnDropHandle(io_runtime.spawn(async move {
449
let mut chunk_data_fetcher = ChunkDataFetcher {
450
memory_prefetch_func,
451
byte_source,
452
file_size,
453
chunk_size,
454
prefetch_send,
455
prefetch_semaphore,
456
prefetch_current_all_spawned,
457
};
458
459
if let Some(prefetch_prev_all_spawned) = prefetch_prev_all_spawned {
460
prefetch_prev_all_spawned.wait().await;
461
}
462
463
chunk_data_fetcher.run().await?;
464
465
Ok(())
466
}))
467
};
468
469
// Wrap into ByteSourceReader to enable sync `BufRead` access.
470
let stream_buf_reader = StreamBufReader::new(prefetch_recv, prefetch_task);
471
ByteSourceReader::try_new(ReaderSource::Streaming(stream_buf_reader), compression)?
472
} else {
473
let memslice = self
474
.scan_source
475
.as_scan_source_ref()
476
.to_buffer_async_assume_latest(self.scan_source.run_async())?;
477
478
ByteSourceReader::from_memory(memslice, compression)?
479
};
480
481
const ASSUMED_COMPRESSION_RATIO: usize = 4;
482
let uncompressed_file_size_hint = Some(match compression {
483
Some(_) => file_size * ASSUMED_COMPRESSION_RATIO,
484
None => file_size,
485
});
486
487
let line_batch_distributor_task_handle = AbortOnDropHandle::new(spawn(
488
TaskPriority::Low,
489
line_batch_distributor::LineBatchDistributor {
490
reader: byte_source_reader,
491
reverse: is_negative_slice,
492
row_skipper,
493
line_batch_distribute_tx,
494
uncompressed_file_size_hint,
495
}
496
.run(),
497
));
498
499
// Task. Finishing handle.
500
let finishing_handle = spawn(TaskPriority::Low, async move {
501
// Number of rows skipped by the line batch distributor.
502
let n_rows_skipped: usize = line_batch_distributor_task_handle.await?;
503
504
// Number of rows processed by the line batch processors.
505
let mut n_rows_processed: usize = 0;
506
507
if verbose {
508
eprintln!("[NDJsonFileReader]: line batch distributor handle returned");
509
}
510
511
for handle in line_batch_processor_handles {
512
n_rows_processed = n_rows_processed.saturating_add(handle.await?);
513
}
514
515
let total_row_count =
516
needs_total_row_count.then_some(n_rows_skipped.saturating_add(n_rows_processed));
517
518
if verbose {
519
eprintln!("[NDJsonFileReader]: line batch processor handles returned");
520
}
521
522
if let Some(row_position_on_end_tx) = row_position_on_end_tx {
523
let n = match pre_slice {
524
None => n_rows_skipped.saturating_add(n_rows_processed),
525
526
Some(Slice::Positive { offset, len }) => n_rows_skipped
527
.saturating_add(n_rows_processed)
528
.min(offset.saturating_add(len)),
529
530
Some(Slice::Negative { .. }) => {
531
total_row_count.unwrap().saturating_sub(n_rows_skipped)
532
},
533
};
534
535
let n = IdxSize::try_from(n)
536
.map_err(|_| polars_err!(bigidx, ctx = "ndjson file", size = n))?;
537
538
_ = row_position_on_end_tx.send(n);
539
}
540
541
if let Some(tx) = total_row_count_tx {
542
let total_row_count = total_row_count.unwrap();
543
544
if verbose {
545
eprintln!(
546
"[NDJsonFileReader]: \
547
send total row count: {total_row_count}"
548
)
549
}
550
_ = tx.send(total_row_count);
551
}
552
553
if let Some(n_rows_in_file_tx) = n_rows_in_file_tx {
554
let total_row_count = total_row_count.unwrap();
555
556
if verbose {
557
eprintln!("[NDJsonFileReader]: send n_rows_in_file: {total_row_count}");
558
}
559
560
let num_rows = total_row_count;
561
let num_rows = IdxSize::try_from(num_rows)
562
.map_err(|_| polars_err!(bigidx, ctx = "ndjson file", size = num_rows))?;
563
_ = n_rows_in_file_tx.send(num_rows);
564
}
565
566
if let Some(handle) = opt_post_process_handle {
567
handle.await?;
568
}
569
570
if verbose {
571
eprintln!("[NDJsonFileReader]: returning");
572
}
573
574
Ok(())
575
});
576
577
Ok((output_port.unwrap(), finishing_handle))
578
}
579
}
580
581