Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/ndjson/core.rs
6939 views
1
use std::fs::File;
2
use std::io::Cursor;
3
use std::num::NonZeroUsize;
4
use std::path::PathBuf;
5
6
pub use arrow::array::StructArray;
7
use num_traits::pow::Pow;
8
use polars_core::POOL;
9
use polars_core::prelude::*;
10
use polars_core::utils::accumulate_dataframes_vertical;
11
use rayon::prelude::*;
12
13
use crate::mmap::{MmapBytesReader, ReaderBytes};
14
use crate::ndjson::buffer::*;
15
use crate::predicates::PhysicalIoExpr;
16
use crate::prelude::*;
17
use crate::{RowIndex, SerReader};
18
const NEWLINE: u8 = b'\n';
19
const CLOSING_BRACKET: u8 = b'}';
20
21
#[must_use]
22
pub struct JsonLineReader<'a, R>
23
where
24
R: MmapBytesReader,
25
{
26
reader: R,
27
rechunk: bool,
28
n_rows: Option<usize>,
29
n_threads: Option<usize>,
30
infer_schema_len: Option<NonZeroUsize>,
31
chunk_size: NonZeroUsize,
32
schema: Option<SchemaRef>,
33
schema_overwrite: Option<&'a Schema>,
34
path: Option<PathBuf>,
35
low_memory: bool,
36
ignore_errors: bool,
37
row_index: Option<&'a mut RowIndex>,
38
predicate: Option<Arc<dyn PhysicalIoExpr>>,
39
projection: Option<Arc<[PlSmallStr]>>,
40
}
41
42
impl<'a, R> JsonLineReader<'a, R>
43
where
44
R: 'a + MmapBytesReader,
45
{
46
pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
47
self.n_rows = num_rows;
48
self
49
}
50
pub fn with_schema(mut self, schema: SchemaRef) -> Self {
51
self.schema = Some(schema);
52
self
53
}
54
55
pub fn with_schema_overwrite(mut self, schema: &'a Schema) -> Self {
56
self.schema_overwrite = Some(schema);
57
self
58
}
59
60
pub fn with_rechunk(mut self, rechunk: bool) -> Self {
61
self.rechunk = rechunk;
62
self
63
}
64
65
pub fn with_predicate(mut self, predicate: Option<Arc<dyn PhysicalIoExpr>>) -> Self {
66
self.predicate = predicate;
67
self
68
}
69
70
pub fn with_projection(mut self, projection: Option<Arc<[PlSmallStr]>>) -> Self {
71
self.projection = projection;
72
self
73
}
74
75
pub fn with_row_index(mut self, row_index: Option<&'a mut RowIndex>) -> Self {
76
self.row_index = row_index;
77
self
78
}
79
80
pub fn infer_schema_len(mut self, infer_schema_len: Option<NonZeroUsize>) -> Self {
81
self.infer_schema_len = infer_schema_len;
82
self
83
}
84
85
pub fn with_n_threads(mut self, n: Option<usize>) -> Self {
86
self.n_threads = n;
87
self
88
}
89
90
pub fn with_path<P: Into<PathBuf>>(mut self, path: Option<P>) -> Self {
91
self.path = path.map(|p| p.into());
92
self
93
}
94
/// Sets the chunk size used by the parser. This influences performance
95
pub fn with_chunk_size(mut self, chunk_size: Option<NonZeroUsize>) -> Self {
96
if let Some(chunk_size) = chunk_size {
97
self.chunk_size = chunk_size;
98
};
99
100
self
101
}
102
/// Reduce memory consumption at the expense of performance
103
pub fn low_memory(mut self, toggle: bool) -> Self {
104
self.low_memory = toggle;
105
self
106
}
107
108
/// Set values as `Null` if parsing fails because of schema mismatches.
109
pub fn with_ignore_errors(mut self, ignore_errors: bool) -> Self {
110
self.ignore_errors = ignore_errors;
111
self
112
}
113
114
pub fn count(mut self) -> PolarsResult<usize> {
115
let reader_bytes = get_reader_bytes(&mut self.reader)?;
116
let json_reader = CoreJsonReader::new(
117
reader_bytes,
118
self.n_rows,
119
self.schema,
120
self.schema_overwrite,
121
self.n_threads,
122
1024, // sample size
123
self.chunk_size,
124
self.low_memory,
125
self.infer_schema_len,
126
self.ignore_errors,
127
self.row_index,
128
self.predicate,
129
self.projection,
130
)?;
131
132
json_reader.count()
133
}
134
}
135
136
impl JsonLineReader<'_, File> {
137
/// This is the recommended way to create a json reader as this allows for fastest parsing.
138
pub fn from_path<P: Into<PathBuf>>(path: P) -> PolarsResult<Self> {
139
let path = crate::resolve_homedir(&path.into());
140
let f = polars_utils::open_file(&path)?;
141
Ok(Self::new(f).with_path(Some(path)))
142
}
143
}
144
impl<R> SerReader<R> for JsonLineReader<'_, R>
145
where
146
R: MmapBytesReader,
147
{
148
/// Create a new JsonLineReader from a file/ stream
149
fn new(reader: R) -> Self {
150
JsonLineReader {
151
reader,
152
rechunk: true,
153
n_rows: None,
154
n_threads: None,
155
infer_schema_len: Some(NonZeroUsize::new(100).unwrap()),
156
schema: None,
157
schema_overwrite: None,
158
path: None,
159
chunk_size: NonZeroUsize::new(1 << 18).unwrap(),
160
low_memory: false,
161
ignore_errors: false,
162
row_index: None,
163
predicate: None,
164
projection: None,
165
}
166
}
167
fn finish(mut self) -> PolarsResult<DataFrame> {
168
let rechunk = self.rechunk;
169
let reader_bytes = get_reader_bytes(&mut self.reader)?;
170
let mut json_reader = CoreJsonReader::new(
171
reader_bytes,
172
self.n_rows,
173
self.schema,
174
self.schema_overwrite,
175
self.n_threads,
176
1024, // sample size
177
self.chunk_size,
178
self.low_memory,
179
self.infer_schema_len,
180
self.ignore_errors,
181
self.row_index,
182
self.predicate,
183
self.projection,
184
)?;
185
186
let mut df: DataFrame = json_reader.as_df()?;
187
if rechunk && df.first_col_n_chunks() > 1 {
188
df.as_single_chunk_par();
189
}
190
Ok(df)
191
}
192
}
193
194
pub(crate) struct CoreJsonReader<'a> {
195
reader_bytes: Option<ReaderBytes<'a>>,
196
n_rows: Option<usize>,
197
schema: SchemaRef,
198
n_threads: Option<usize>,
199
sample_size: usize,
200
chunk_size: NonZeroUsize,
201
low_memory: bool,
202
ignore_errors: bool,
203
row_index: Option<&'a mut RowIndex>,
204
predicate: Option<Arc<dyn PhysicalIoExpr>>,
205
projection: Option<Arc<[PlSmallStr]>>,
206
}
207
impl<'a> CoreJsonReader<'a> {
208
#[allow(clippy::too_many_arguments)]
209
pub(crate) fn new(
210
reader_bytes: ReaderBytes<'a>,
211
n_rows: Option<usize>,
212
schema: Option<SchemaRef>,
213
schema_overwrite: Option<&Schema>,
214
n_threads: Option<usize>,
215
sample_size: usize,
216
chunk_size: NonZeroUsize,
217
low_memory: bool,
218
infer_schema_len: Option<NonZeroUsize>,
219
ignore_errors: bool,
220
row_index: Option<&'a mut RowIndex>,
221
predicate: Option<Arc<dyn PhysicalIoExpr>>,
222
projection: Option<Arc<[PlSmallStr]>>,
223
) -> PolarsResult<CoreJsonReader<'a>> {
224
let reader_bytes = reader_bytes;
225
226
let mut schema = match schema {
227
Some(schema) => schema,
228
None => {
229
let bytes: &[u8] = &reader_bytes;
230
let mut cursor = Cursor::new(bytes);
231
Arc::new(crate::ndjson::infer_schema(&mut cursor, infer_schema_len)?)
232
},
233
};
234
if let Some(overwriting_schema) = schema_overwrite {
235
let schema = Arc::make_mut(&mut schema);
236
overwrite_schema(schema, overwriting_schema)?;
237
}
238
239
Ok(CoreJsonReader {
240
reader_bytes: Some(reader_bytes),
241
schema,
242
sample_size,
243
n_rows,
244
n_threads,
245
chunk_size,
246
low_memory,
247
ignore_errors,
248
row_index,
249
predicate,
250
projection,
251
})
252
}
253
254
fn count(mut self) -> PolarsResult<usize> {
255
let bytes = self.reader_bytes.take().unwrap();
256
Ok(super::count_rows_par(&bytes, self.n_threads))
257
}
258
259
fn parse_json(&mut self, mut n_threads: usize, bytes: &[u8]) -> PolarsResult<DataFrame> {
260
let mut bytes = bytes;
261
let mut total_rows = 128;
262
263
if let Some((mean, std)) = get_line_stats_json(bytes, self.sample_size) {
264
let line_length_upper_bound = mean + 1.1 * std;
265
266
total_rows = (bytes.len() as f32 / (mean - 0.01 * std)) as usize;
267
if let Some(n_rows) = self.n_rows {
268
total_rows = std::cmp::min(n_rows, total_rows);
269
// the guessed upper bound of the no. of bytes in the file
270
let n_bytes = (line_length_upper_bound * (n_rows as f32)) as usize;
271
272
if n_bytes < bytes.len() {
273
if let Some(pos) = next_line_position_naive_json(&bytes[n_bytes..]) {
274
bytes = &bytes[..n_bytes + pos]
275
}
276
}
277
}
278
}
279
280
if total_rows <= 128 {
281
n_threads = 1;
282
}
283
284
let rows_per_thread = total_rows / n_threads;
285
286
let max_proxy = bytes.len() / n_threads / 2;
287
let capacity = if self.low_memory {
288
usize::from(self.chunk_size)
289
} else {
290
std::cmp::min(rows_per_thread, max_proxy)
291
};
292
let file_chunks = get_file_chunks_json(bytes, n_threads);
293
294
let row_index = self.row_index.as_ref().map(|ri| ri as &RowIndex);
295
let (mut dfs, prepredicate_heights) = POOL.install(|| {
296
file_chunks
297
.into_par_iter()
298
.map(|(start_pos, stop_at_nbytes)| {
299
let mut local_df = parse_ndjson(
300
&bytes[start_pos..stop_at_nbytes],
301
Some(capacity),
302
&self.schema,
303
self.ignore_errors,
304
)?;
305
306
let prepredicate_height = local_df.height() as IdxSize;
307
if let Some(projection) = self.projection.as_deref() {
308
local_df = local_df.select(projection.iter().cloned())?;
309
}
310
311
if let Some(row_index) = row_index {
312
local_df = local_df
313
.with_row_index(row_index.name.clone(), Some(row_index.offset))?;
314
}
315
316
if let Some(predicate) = &self.predicate {
317
let s = predicate.evaluate_io(&local_df)?;
318
let mask = s.bool()?;
319
local_df = local_df.filter(mask)?;
320
}
321
322
Ok((local_df, prepredicate_height))
323
})
324
.collect::<PolarsResult<(Vec<_>, Vec<_>)>>()
325
})?;
326
327
if let Some(ref mut row_index) = self.row_index {
328
update_row_counts3(&mut dfs, &prepredicate_heights, 0);
329
row_index.offset += prepredicate_heights.iter().copied().sum::<IdxSize>();
330
}
331
332
accumulate_dataframes_vertical(dfs)
333
}
334
335
pub fn as_df(&mut self) -> PolarsResult<DataFrame> {
336
let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());
337
338
let reader_bytes = self.reader_bytes.take().unwrap();
339
340
let mut df = self.parse_json(n_threads, &reader_bytes)?;
341
342
// if multi-threaded the n_rows was probabilistically determined.
343
// Let's slice to correct number of rows if possible.
344
if let Some(n_rows) = self.n_rows {
345
if n_rows < df.height() {
346
df = df.slice(0, n_rows)
347
}
348
}
349
Ok(df)
350
}
351
}
352
353
#[inline(always)]
354
fn parse_impl(
355
bytes: &[u8],
356
buffers: &mut PlIndexMap<BufferKey, Buffer>,
357
scratch: &mut Scratch,
358
) -> PolarsResult<usize> {
359
scratch.json.clear();
360
scratch.json.extend_from_slice(bytes);
361
let n = scratch.json.len();
362
let value = simd_json::to_borrowed_value_with_buffers(&mut scratch.json, &mut scratch.buffers)
363
.map_err(|e| polars_err!(ComputeError: "error parsing line: {}", e))?;
364
match value {
365
simd_json::BorrowedValue::Object(value) => {
366
buffers.iter_mut().try_for_each(|(s, inner)| {
367
match s.0.map_lookup(&value) {
368
Some(v) => inner.add(v)?,
369
None => inner.add_null(),
370
}
371
PolarsResult::Ok(())
372
})?;
373
},
374
_ => {
375
buffers.iter_mut().for_each(|(_, inner)| inner.add_null());
376
},
377
};
378
Ok(n)
379
}
380
381
#[derive(Default)]
382
struct Scratch {
383
json: Vec<u8>,
384
buffers: simd_json::Buffers,
385
}
386
387
pub fn json_lines(bytes: &[u8]) -> impl Iterator<Item = &[u8]> {
388
// This previously used `serde_json`'s `RawValue` to deserialize chunks without really deserializing them.
389
// However, this convenience comes at a cost. serde_json allocates and parses and does UTF-8 validation, all
390
// things we don't need since we use simd_json for them. Also, `serde_json::StreamDeserializer` has a more
391
// ambitious goal: it wants to parse potentially *non-delimited* sequences of JSON values, while we know
392
// our values are line-delimited. Turns out, custom splitting is very easy, and gives a very nice performance boost.
393
bytes.split(|&byte| byte == b'\n').filter(|&bytes| {
394
bytes
395
.iter()
396
.any(|&byte| !matches!(byte, b' ' | b'\t' | b'\r'))
397
})
398
}
399
400
fn parse_lines(bytes: &[u8], buffers: &mut PlIndexMap<BufferKey, Buffer>) -> PolarsResult<()> {
401
let mut scratch = Scratch::default();
402
403
let iter = json_lines(bytes);
404
for bytes in iter {
405
parse_impl(bytes, buffers, &mut scratch)?;
406
}
407
Ok(())
408
}
409
410
pub fn parse_ndjson(
411
bytes: &[u8],
412
n_rows_hint: Option<usize>,
413
schema: &Schema,
414
ignore_errors: bool,
415
) -> PolarsResult<DataFrame> {
416
let capacity = n_rows_hint.unwrap_or_else(|| estimate_n_lines_in_chunk(bytes));
417
418
let mut buffers = init_buffers(schema, capacity, ignore_errors)?;
419
parse_lines(bytes, &mut buffers)?;
420
421
DataFrame::new(
422
buffers
423
.into_values()
424
.map(|buf| Ok(buf.into_series()?.into_column()))
425
.collect::<PolarsResult<_>>()
426
.map_err(|e| match e {
427
// Nested types raise SchemaMismatch instead of ComputeError, we map it back here to
428
// be consistent.
429
PolarsError::ComputeError(..) => e,
430
PolarsError::SchemaMismatch(e) => PolarsError::ComputeError(e),
431
e => e,
432
})?,
433
)
434
}
435
436
pub fn estimate_n_lines_in_file(file_bytes: &[u8], sample_size: usize) -> usize {
437
if let Some((mean, std)) = get_line_stats_json(file_bytes, sample_size) {
438
(file_bytes.len() as f32 / (mean - 0.01 * std)) as usize
439
} else {
440
estimate_n_lines_in_chunk(file_bytes)
441
}
442
}
443
444
/// Total len divided by max len of first and last non-empty lines. This is intended to be cheaper
445
/// than `estimate_n_lines_in_file`.
446
pub fn estimate_n_lines_in_chunk(chunk: &[u8]) -> usize {
447
chunk
448
.split(|&c| c == b'\n')
449
.find(|x| !x.is_empty())
450
.map_or(1, |x| {
451
chunk.len().div_ceil(
452
x.len().max(
453
chunk
454
.rsplit(|&c| c == b'\n')
455
.find(|x| !x.is_empty())
456
.unwrap()
457
.len(),
458
),
459
)
460
})
461
}
462
463
/// Find the nearest next line position.
464
/// Does not check for new line characters embedded in String fields.
465
/// This just looks for `}\n`
466
pub(crate) fn next_line_position_naive_json(input: &[u8]) -> Option<usize> {
467
let pos = memchr::memchr(NEWLINE, input)?;
468
if pos == 0 {
469
return Some(1);
470
}
471
472
let is_closing_bracket = input.get(pos - 1) == Some(&CLOSING_BRACKET);
473
if is_closing_bracket {
474
Some(pos + 1)
475
} else {
476
None
477
}
478
}
479
480
/// Get the mean and standard deviation of length of lines in bytes
481
pub(crate) fn get_line_stats_json(bytes: &[u8], n_lines: usize) -> Option<(f32, f32)> {
482
let mut lengths = Vec::with_capacity(n_lines);
483
484
let mut bytes_trunc;
485
let n_lines_per_iter = n_lines / 2;
486
487
let mut n_read = 0;
488
489
let bytes_len = bytes.len();
490
491
// sample from start and 75% in the file
492
for offset in [0, (bytes_len as f32 * 0.75) as usize] {
493
bytes_trunc = &bytes[offset..];
494
let pos = next_line_position_naive_json(bytes_trunc)?;
495
if pos >= bytes_len {
496
return None;
497
}
498
bytes_trunc = &bytes_trunc[pos + 1..];
499
500
for _ in offset..(offset + n_lines_per_iter) {
501
let pos = next_line_position_naive_json(bytes_trunc);
502
if let Some(pos) = pos {
503
lengths.push(pos);
504
let next_bytes = &bytes_trunc[pos..];
505
if next_bytes.is_empty() {
506
return None;
507
}
508
bytes_trunc = next_bytes;
509
n_read += pos;
510
} else {
511
break;
512
}
513
}
514
}
515
516
let n_samples = lengths.len();
517
let mean = (n_read as f32) / (n_samples as f32);
518
let mut std = 0.0;
519
for &len in lengths.iter() {
520
std += (len as f32 - mean).pow(2.0)
521
}
522
std = (std / n_samples as f32).sqrt();
523
Some((mean, std))
524
}
525
526
pub(crate) fn get_file_chunks_json(bytes: &[u8], n_threads: usize) -> Vec<(usize, usize)> {
527
let mut last_pos = 0;
528
let total_len = bytes.len();
529
let chunk_size = total_len / n_threads;
530
let mut offsets = Vec::with_capacity(n_threads);
531
for _ in 0..n_threads {
532
let search_pos = last_pos + chunk_size;
533
534
if search_pos >= bytes.len() {
535
break;
536
}
537
538
let end_pos = match next_line_position_naive_json(&bytes[search_pos..]) {
539
Some(pos) => search_pos + pos,
540
None => {
541
break;
542
},
543
};
544
offsets.push((last_pos, end_pos));
545
last_pos = end_pos;
546
}
547
offsets.push((last_pos, total_len));
548
offsets
549
}
550
551