Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sources/ndjson/mod.rs
6939 views
1
pub mod builder;
2
3
use std::cmp::Reverse;
4
use std::ops::Range;
5
use std::sync::Arc;
6
7
use async_trait::async_trait;
8
use chunk_reader::ChunkReader;
9
use line_batch_processor::{LineBatchProcessor, LineBatchProcessorOutputPort};
10
use negative_slice_pass::MorselStreamReverser;
11
use polars_core::schema::SchemaRef;
12
use polars_error::{PolarsResult, polars_bail, polars_err};
13
use polars_io::cloud::CloudOptions;
14
use polars_io::prelude::estimate_n_lines_in_file;
15
use polars_io::utils::compression::maybe_decompress_bytes;
16
use polars_plan::dsl::{NDJsonReadOptions, ScanSource};
17
use polars_utils::IdxSize;
18
use polars_utils::mem::prefetch::get_memory_prefetch_func;
19
use polars_utils::mmap::MemSlice;
20
use polars_utils::priority::Priority;
21
use polars_utils::slice_enum::Slice;
22
use row_index_limit_pass::ApplyRowIndexOrLimit;
23
24
use super::multi_scan::reader_interface::output::FileReaderOutputRecv;
25
use super::multi_scan::reader_interface::{BeginReadArgs, FileReader, FileReaderCallbacks};
26
use crate::async_executor::{AbortOnDropHandle, spawn};
27
use crate::async_primitives::distributor_channel::distributor_channel;
28
use crate::async_primitives::linearizer::Linearizer;
29
use crate::morsel::SourceToken;
30
use crate::nodes::compute_node_prelude::*;
31
use crate::nodes::io_sources::multi_scan::reader_interface::Projection;
32
use crate::nodes::io_sources::multi_scan::reader_interface::output::FileReaderOutputSend;
33
use crate::nodes::{MorselSeq, TaskPriority};
34
mod chunk_reader;
35
mod line_batch_distributor;
36
mod line_batch_processor;
37
mod negative_slice_pass;
38
mod row_index_limit_pass;
39
40
#[derive(Clone)]
41
pub struct NDJsonFileReader {
42
scan_source: ScanSource,
43
#[expect(unused)] // Will be used when implementing cloud streaming.
44
cloud_options: Option<Arc<CloudOptions>>,
45
options: Arc<NDJsonReadOptions>,
46
verbose: bool,
47
// Cached on first access - we may be called multiple times e.g. on negative slice.
48
cached_bytes: Option<MemSlice>,
49
}
50
51
#[async_trait]
52
impl FileReader for NDJsonFileReader {
53
async fn initialize(&mut self) -> PolarsResult<()> {
54
Ok(())
55
}
56
57
fn begin_read(
58
&mut self,
59
args: BeginReadArgs,
60
) -> PolarsResult<(FileReaderOutputRecv, JoinHandle<PolarsResult<()>>)> {
61
let verbose = self.verbose;
62
63
let BeginReadArgs {
64
projection: Projection::Plain(projected_schema),
65
mut row_index,
66
pre_slice,
67
68
num_pipelines,
69
callbacks:
70
FileReaderCallbacks {
71
file_schema_tx,
72
n_rows_in_file_tx,
73
row_position_on_end_tx,
74
},
75
76
predicate: None,
77
cast_columns_policy: _,
78
} = args
79
else {
80
panic!("unsupported args: {:?}", &args)
81
};
82
83
// TODO: This currently downloads and decompresses everything upfront in a blocking manner.
84
// Ideally we have a streaming download/decompression.
85
let global_bytes = self.get_bytes_maybe_decompress()?;
86
87
// NDJSON: We just use the projected schema - the parser will automatically append NULL if
88
// the field is not found.
89
//
90
// TODO
91
// We currently always use the projected dtype, but this may cause
92
// issues e.g. with temporal types. This can be improved to better choose
93
// between the 2 dtypes.
94
let schema = projected_schema;
95
96
if let Some(mut tx) = file_schema_tx {
97
_ = tx.try_send(schema.clone())
98
}
99
100
let is_negative_slice = matches!(pre_slice, Some(Slice::Negative { .. }));
101
102
// Convert (offset, len) to Range
103
// Note: This is converted to right-to-left for negative slice (i.e. range.start is position
104
// from end).
105
let global_slice: Option<Range<usize>> = if let Some(slice) = pre_slice.clone() {
106
match slice {
107
Slice::Positive { offset, len } => Some(offset..offset.saturating_add(len)),
108
Slice::Negative {
109
offset_from_end,
110
len,
111
} => {
112
// array: [_ _ _ _ _]
113
// slice: [ _ _ ]
114
// in: offset_from_end: 3, len: 2
115
// out: 1..3 (right-to-left)
116
Some(offset_from_end.saturating_sub(len)..offset_from_end)
117
},
118
}
119
} else {
120
None
121
};
122
123
let (total_row_count_tx, total_row_count_rx) = if is_negative_slice && row_index.is_some() {
124
let (tx, rx) = tokio::sync::oneshot::channel();
125
(Some(tx), Some(rx))
126
} else {
127
(None, None)
128
};
129
130
let needs_total_row_count = total_row_count_tx.is_some()
131
|| n_rows_in_file_tx.is_some()
132
|| (row_position_on_end_tx.is_some()
133
&& matches!(pre_slice, Some(Slice::Negative { .. })));
134
135
let chunk_size: usize = {
136
let n_bytes_to_split = if let Some(x) = global_slice.as_ref() {
137
if needs_total_row_count {
138
global_bytes.len()
139
} else {
140
// There may be early stopping, try to heuristically use a smaller chunk size to stop faster.
141
let n_rows_to_sample = 8;
142
let n_lines_estimate =
143
estimate_n_lines_in_file(global_bytes.as_ref(), n_rows_to_sample);
144
let line_length_estimate = global_bytes.len().div_ceil(n_lines_estimate);
145
146
if verbose {
147
eprintln!(
148
"[NDJsonFileReader]: n_lines_estimate: {n_lines_estimate}, line_length_estimate: {line_length_estimate}"
149
);
150
}
151
152
// Estimated stopping point in the file
153
x.end.saturating_mul(line_length_estimate)
154
}
155
} else {
156
global_bytes.len()
157
};
158
159
let chunk_size = n_bytes_to_split.div_ceil(16 * num_pipelines);
160
161
let max_chunk_size = 16 * 1024 * 1024;
162
// Use a small min chunk size to catch failures in tests.
163
#[cfg(debug_assertions)]
164
let min_chunk_size = 64;
165
#[cfg(not(debug_assertions))]
166
let min_chunk_size = 1024 * 4;
167
168
let chunk_size = chunk_size.clamp(min_chunk_size, max_chunk_size);
169
170
std::env::var("POLARS_FORCE_NDJSON_CHUNK_SIZE").map_or(chunk_size, |x| {
171
x.parse::<usize>()
172
.expect("expected `POLARS_FORCE_NDJSON_CHUNK_SIZE` to be an integer")
173
})
174
};
175
176
if verbose {
177
eprintln!(
178
"[NDJsonFileReader]: \
179
project: {}, \
180
global_slice: {:?}, \
181
row_index: {:?}, \
182
chunk_size: {}, \
183
n_chunks: {}, \
184
is_negative_slice: {}",
185
schema.len(),
186
&global_slice,
187
&row_index,
188
chunk_size,
189
global_bytes.len().div_ceil(chunk_size),
190
is_negative_slice,
191
);
192
}
193
194
// Note: This counts from the end of file for negative slice.
195
let n_rows_to_skip = global_slice.as_ref().map_or(0, |x| x.start);
196
197
let (opt_linearizer, mut linearizer_inserters) =
198
if global_slice.is_some() || row_index.is_some() {
199
let (a, b) =
200
Linearizer::<Priority<Reverse<MorselSeq>, DataFrame>>::new(num_pipelines, 1);
201
(Some(a), b)
202
} else {
203
(None, vec![])
204
};
205
206
let output_to_linearizer = opt_linearizer.is_some();
207
208
let mut output_port = None;
209
210
let opt_post_process_handle = if is_negative_slice {
211
// Note: This is right-to-left
212
let negative_slice = global_slice.unwrap();
213
214
if verbose {
215
eprintln!("[NDJsonFileReader]: Initialize morsel stream reverser");
216
}
217
218
let (morsel_senders, rx) = FileReaderOutputSend::new_parallel(num_pipelines);
219
output_port = Some(rx);
220
221
Some(AbortOnDropHandle::new(spawn(
222
TaskPriority::High,
223
MorselStreamReverser {
224
morsel_receiver: opt_linearizer.unwrap(),
225
morsel_senders,
226
offset_len_rtl: (
227
negative_slice.start,
228
negative_slice.end - negative_slice.start,
229
),
230
// The correct row index offset can only be known after total row count is
231
// available. This is handled by the MorselStreamReverser.
232
row_index: row_index.take().map(|x| (x, total_row_count_rx.unwrap())),
233
verbose,
234
}
235
.run(),
236
)))
237
} else if global_slice.is_some() || row_index.is_some() {
238
let mut row_index = row_index.take();
239
240
if verbose {
241
eprintln!("[NDJsonFileReader]: Initialize ApplyRowIndexOrLimit");
242
}
243
244
if let Some(ri) = row_index.as_mut() {
245
// Update the row index offset according to the slice start.
246
let Some(v) = ri.offset.checked_add(n_rows_to_skip as IdxSize) else {
247
let offset = ri.offset;
248
249
polars_bail!(
250
ComputeError:
251
"row_index with offset {} overflows at {} rows",
252
offset, n_rows_to_skip
253
)
254
};
255
ri.offset = v;
256
}
257
258
let (morsel_tx, rx) = FileReaderOutputSend::new_serial();
259
output_port = Some(rx);
260
261
let limit = global_slice.as_ref().map(|x| x.len());
262
263
let task = ApplyRowIndexOrLimit {
264
morsel_receiver: opt_linearizer.unwrap(),
265
morsel_tx,
266
// Note: The line batch distributor handles skipping lines until the offset,
267
// we only need to handle the limit here.
268
limit,
269
row_index,
270
verbose,
271
};
272
273
if limit == Some(0) {
274
None
275
} else {
276
Some(AbortOnDropHandle::new(spawn(
277
TaskPriority::High,
278
task.run(),
279
)))
280
}
281
} else {
282
None
283
};
284
285
let schema = Arc::new(schema);
286
let chunk_reader = Arc::new(self.try_init_chunk_reader(&schema)?);
287
288
if !is_negative_slice {
289
get_memory_prefetch_func(verbose)(global_bytes.as_ref());
290
}
291
292
let (line_batch_distribute_tx, line_batch_distribute_receivers) =
293
distributor_channel(num_pipelines, 1);
294
295
let mut morsel_senders = if !output_to_linearizer {
296
let (senders, outp) = FileReaderOutputSend::new_parallel(num_pipelines);
297
assert!(output_port.is_none());
298
output_port = Some(outp);
299
senders
300
} else {
301
vec![]
302
};
303
304
// Initialize in reverse as we want to manually pop from either the linearizer or the phase receivers depending
305
// on if we have negative slice.
306
let line_batch_processor_handles = line_batch_distribute_receivers
307
.into_iter()
308
.enumerate()
309
.rev()
310
.map(|(worker_idx, line_batch_rx)| {
311
let global_bytes = global_bytes.clone();
312
let chunk_reader = chunk_reader.clone();
313
// Note: We don't use this (it is handled by the bridge). But morsels require a source token.
314
let source_token = SourceToken::new();
315
316
AbortOnDropHandle::new(spawn(
317
TaskPriority::Low,
318
LineBatchProcessor {
319
worker_idx,
320
321
global_bytes,
322
chunk_reader,
323
324
line_batch_rx,
325
output_port: if output_to_linearizer {
326
LineBatchProcessorOutputPort::Linearize {
327
tx: linearizer_inserters.pop().unwrap(),
328
}
329
} else {
330
LineBatchProcessorOutputPort::Direct {
331
tx: morsel_senders.pop().unwrap(),
332
source_token,
333
}
334
},
335
needs_total_row_count,
336
337
// Only log from the last worker to prevent flooding output.
338
verbose: verbose && worker_idx == num_pipelines - 1,
339
}
340
.run(),
341
))
342
})
343
.collect::<Vec<_>>();
344
345
let line_batch_distributor_task_handle = AbortOnDropHandle::new(spawn(
346
TaskPriority::Low,
347
line_batch_distributor::LineBatchDistributor {
348
global_bytes,
349
chunk_size,
350
n_rows_to_skip,
351
reverse: is_negative_slice,
352
line_batch_distribute_tx,
353
}
354
.run(),
355
));
356
357
let finishing_handle = spawn(TaskPriority::Low, async move {
358
// Number of rows skipped by the line batch distributor.
359
let n_rows_skipped: usize = line_batch_distributor_task_handle.await?;
360
// Number of rows processed by the line batch processors.
361
let mut n_rows_processed: usize = 0;
362
363
if verbose {
364
eprintln!("[NDJsonFileReader]: line batch distributor handle returned");
365
}
366
367
for handle in line_batch_processor_handles {
368
n_rows_processed = n_rows_processed.saturating_add(handle.await?);
369
}
370
371
let total_row_count =
372
needs_total_row_count.then_some(n_rows_skipped.saturating_add(n_rows_processed));
373
374
if verbose {
375
eprintln!("[NDJsonFileReader]: line batch processor handles returned");
376
}
377
378
if let Some(mut row_position_on_end_tx) = row_position_on_end_tx {
379
let n = match pre_slice {
380
None => n_rows_skipped.saturating_add(n_rows_processed),
381
382
Some(Slice::Positive { offset, len }) => n_rows_skipped
383
.saturating_add(n_rows_processed)
384
.min(offset.saturating_add(len)),
385
386
Some(Slice::Negative { .. }) => {
387
total_row_count.unwrap().saturating_sub(n_rows_skipped)
388
},
389
};
390
391
let n = IdxSize::try_from(n)
392
.map_err(|_| polars_err!(bigidx, ctx = "ndjson file", size = n))?;
393
394
_ = row_position_on_end_tx.try_send(n);
395
}
396
397
if let Some(tx) = total_row_count_tx {
398
let total_row_count = total_row_count.unwrap();
399
400
if verbose {
401
eprintln!(
402
"[NDJsonFileReader]: \
403
send total row count: {total_row_count}"
404
)
405
}
406
_ = tx.send(total_row_count);
407
}
408
409
if let Some(mut n_rows_in_file_tx) = n_rows_in_file_tx {
410
let total_row_count = total_row_count.unwrap();
411
412
if verbose {
413
eprintln!("[NDJsonFileReader]: send n_rows_in_file: {total_row_count}");
414
}
415
416
let num_rows = total_row_count;
417
let num_rows = IdxSize::try_from(num_rows)
418
.map_err(|_| polars_err!(bigidx, ctx = "ndjson file", size = num_rows))?;
419
_ = n_rows_in_file_tx.try_send(num_rows);
420
}
421
422
if let Some(handle) = opt_post_process_handle {
423
handle.await?;
424
}
425
426
if verbose {
427
eprintln!("[NDJsonFileReader]: returning");
428
}
429
430
Ok(())
431
});
432
433
Ok((output_port.unwrap(), finishing_handle))
434
}
435
}
436
437
impl NDJsonFileReader {
438
fn try_init_chunk_reader(&self, schema: &SchemaRef) -> PolarsResult<ChunkReader> {
439
ChunkReader::try_new(&self.options, schema)
440
}
441
442
fn get_bytes_maybe_decompress(&mut self) -> PolarsResult<MemSlice> {
443
if self.cached_bytes.is_none() {
444
let run_async = self.scan_source.run_async();
445
let source = self
446
.scan_source
447
.as_scan_source_ref()
448
.to_memslice_async_assume_latest(run_async)?;
449
450
let memslice = {
451
let mut out = vec![];
452
maybe_decompress_bytes(&source, &mut out)?;
453
454
if out.is_empty() {
455
source
456
} else {
457
MemSlice::from_vec(out)
458
}
459
};
460
461
self.cached_bytes = Some(memslice);
462
}
463
464
Ok(self.cached_bytes.clone().unwrap())
465
}
466
}
467
468