Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/ndjson/core.rs
8326 views
1
use std::io::Cursor;
2
use std::num::NonZeroUsize;
3
4
pub use arrow::array::StructArray;
5
use num_traits::pow::Pow;
6
use polars_core::POOL;
7
use polars_core::prelude::*;
8
use polars_core::utils::accumulate_dataframes_vertical;
9
use rayon::prelude::*;
10
11
use crate::RowIndex;
12
use crate::mmap::ReaderBytes;
13
use crate::ndjson::buffer::*;
14
use crate::predicates::PhysicalIoExpr;
15
use crate::prelude::*;
16
const NEWLINE: u8 = b'\n';
17
const CLOSING_BRACKET: u8 = b'}';
18
19
pub(crate) struct CoreJsonReader<'a> {
20
reader_bytes: Option<ReaderBytes<'a>>,
21
n_rows: Option<usize>,
22
schema: SchemaRef,
23
n_threads: Option<usize>,
24
sample_size: usize,
25
chunk_size: NonZeroUsize,
26
low_memory: bool,
27
ignore_errors: bool,
28
row_index: Option<&'a mut RowIndex>,
29
predicate: Option<Arc<dyn PhysicalIoExpr>>,
30
projection: Option<Arc<[PlSmallStr]>>,
31
}
32
impl<'a> CoreJsonReader<'a> {
33
#[allow(clippy::too_many_arguments)]
34
pub(crate) fn new(
35
reader_bytes: ReaderBytes<'a>,
36
n_rows: Option<usize>,
37
schema: Option<SchemaRef>,
38
schema_overwrite: Option<&Schema>,
39
n_threads: Option<usize>,
40
sample_size: usize,
41
chunk_size: NonZeroUsize,
42
low_memory: bool,
43
infer_schema_len: Option<NonZeroUsize>,
44
ignore_errors: bool,
45
row_index: Option<&'a mut RowIndex>,
46
predicate: Option<Arc<dyn PhysicalIoExpr>>,
47
projection: Option<Arc<[PlSmallStr]>>,
48
) -> PolarsResult<CoreJsonReader<'a>> {
49
let reader_bytes = reader_bytes;
50
51
let mut schema = match schema {
52
Some(schema) => schema,
53
None => {
54
let bytes: &[u8] = &reader_bytes;
55
let mut cursor = Cursor::new(bytes);
56
Arc::new(crate::ndjson::infer_schema(&mut cursor, infer_schema_len)?)
57
},
58
};
59
if let Some(overwriting_schema) = schema_overwrite {
60
let schema = Arc::make_mut(&mut schema);
61
overwrite_schema(schema, overwriting_schema)?;
62
}
63
64
Ok(CoreJsonReader {
65
reader_bytes: Some(reader_bytes),
66
schema,
67
sample_size,
68
n_rows,
69
n_threads,
70
chunk_size,
71
low_memory,
72
ignore_errors,
73
row_index,
74
predicate,
75
projection,
76
})
77
}
78
79
fn parse_json(&mut self, mut n_threads: usize, bytes: &[u8]) -> PolarsResult<DataFrame> {
80
let mut bytes = bytes;
81
let mut total_rows = 128;
82
83
if let Some((mean, std)) = get_line_stats_json(bytes, self.sample_size) {
84
let line_length_upper_bound = mean + 1.1 * std;
85
86
total_rows = (bytes.len() as f32 / (mean - 0.01 * std)) as usize;
87
if let Some(n_rows) = self.n_rows {
88
total_rows = std::cmp::min(n_rows, total_rows);
89
// the guessed upper bound of the no. of bytes in the file
90
let n_bytes = (line_length_upper_bound * (n_rows as f32)) as usize;
91
92
if n_bytes < bytes.len() {
93
if let Some(pos) = next_line_position_naive_json(&bytes[n_bytes..]) {
94
bytes = &bytes[..n_bytes + pos]
95
}
96
}
97
}
98
}
99
100
if total_rows <= 128 {
101
n_threads = 1;
102
}
103
104
let rows_per_thread = total_rows / n_threads;
105
106
let max_proxy = bytes.len() / n_threads / 2;
107
let capacity = if self.low_memory {
108
usize::from(self.chunk_size)
109
} else {
110
std::cmp::min(rows_per_thread, max_proxy)
111
};
112
let file_chunks = get_file_chunks_json(bytes, n_threads);
113
114
let row_index = self.row_index.as_ref().map(|ri| ri as &RowIndex);
115
let (mut dfs, prepredicate_heights) = POOL.install(|| {
116
file_chunks
117
.into_par_iter()
118
.map(|(start_pos, stop_at_nbytes)| {
119
let mut local_df = parse_ndjson(
120
&bytes[start_pos..stop_at_nbytes],
121
Some(capacity),
122
&self.schema,
123
self.ignore_errors,
124
)?;
125
126
let prepredicate_height = local_df.height() as IdxSize;
127
if let Some(projection) = self.projection.as_deref() {
128
local_df = local_df.select(projection.iter().cloned())?;
129
}
130
131
if let Some(row_index) = row_index {
132
local_df = local_df
133
.with_row_index(row_index.name.clone(), Some(row_index.offset))?;
134
}
135
136
if let Some(predicate) = &self.predicate {
137
let s = predicate.evaluate_io(&local_df)?;
138
let mask = s.bool()?;
139
local_df = local_df.filter(mask)?;
140
}
141
142
Ok((local_df, prepredicate_height))
143
})
144
.collect::<PolarsResult<(Vec<_>, Vec<_>)>>()
145
})?;
146
147
if let Some(ref mut row_index) = self.row_index {
148
update_row_counts3(&mut dfs, &prepredicate_heights, 0);
149
row_index.offset += prepredicate_heights.iter().copied().sum::<IdxSize>();
150
}
151
152
accumulate_dataframes_vertical(dfs)
153
}
154
155
pub fn as_df(&mut self) -> PolarsResult<DataFrame> {
156
let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());
157
158
let reader_bytes = self.reader_bytes.take().unwrap();
159
160
let mut df = self.parse_json(n_threads, &reader_bytes)?;
161
162
// if multi-threaded the n_rows was probabilistically determined.
163
// Let's slice to correct number of rows if possible.
164
if let Some(n_rows) = self.n_rows {
165
if n_rows < df.height() {
166
df = df.slice(0, n_rows)
167
}
168
}
169
Ok(df)
170
}
171
}
172
173
#[inline(always)]
174
fn parse_impl(
175
bytes: &[u8],
176
buffers: &mut PlIndexMap<BufferKey, Buffer>,
177
scratch: &mut Scratch,
178
ignore_errors: bool,
179
) -> PolarsResult<usize> {
180
scratch.json.clear();
181
scratch.json.extend_from_slice(bytes);
182
let n = scratch.json.len();
183
let value = simd_json::to_borrowed_value_with_buffers(&mut scratch.json, &mut scratch.buffers)
184
.map_err(|e| polars_err!(ComputeError: "error parsing line: {}", e))?;
185
match value {
186
simd_json::BorrowedValue::Object(value) => {
187
buffers.iter_mut().try_for_each(|(s, inner)| {
188
match s.0.map_lookup(&value) {
189
Some(v) => inner.add(v)?,
190
None => inner.add_null(),
191
}
192
PolarsResult::Ok(())
193
})?;
194
},
195
_ if ignore_errors => {
196
buffers.iter_mut().for_each(|(_, inner)| inner.add_null());
197
},
198
v => {
199
polars_bail!(ComputeError: "NDJSON line expected to contain JSON object: {v}");
200
},
201
};
202
Ok(n)
203
}
204
205
#[derive(Default)]
206
struct Scratch {
207
json: Vec<u8>,
208
buffers: simd_json::Buffers,
209
}
210
211
pub fn json_lines(bytes: &[u8]) -> impl Iterator<Item = &[u8]> {
212
// This previously used `serde_json`'s `RawValue` to deserialize chunks without really deserializing them.
213
// However, this convenience comes at a cost. serde_json allocates and parses and does UTF-8 validation, all
214
// things we don't need since we use simd_json for them. Also, `serde_json::StreamDeserializer` has a more
215
// ambitious goal: it wants to parse potentially *non-delimited* sequences of JSON values, while we know
216
// our values are line-delimited. Turns out, custom splitting is very easy, and gives a very nice performance boost.
217
bytes
218
.split(|&byte| byte == b'\n')
219
.filter(|bytes| is_json_line(bytes))
220
}
221
222
#[inline]
223
pub fn is_json_line(bytes: &[u8]) -> bool {
224
bytes
225
.iter()
226
.any(|byte| !matches!(*byte, b' ' | b'\t' | b'\r'))
227
}
228
229
fn parse_lines(
230
bytes: &[u8],
231
buffers: &mut PlIndexMap<BufferKey, Buffer>,
232
ignore_errors: bool,
233
) -> PolarsResult<()> {
234
let mut scratch = Scratch::default();
235
236
let iter = json_lines(bytes);
237
for bytes in iter {
238
parse_impl(bytes, buffers, &mut scratch, ignore_errors)?;
239
}
240
Ok(())
241
}
242
243
pub fn parse_ndjson(
244
bytes: &[u8],
245
n_rows_hint: Option<usize>,
246
schema: &Schema,
247
ignore_errors: bool,
248
) -> PolarsResult<DataFrame> {
249
let capacity = n_rows_hint.unwrap_or_else(|| estimate_n_lines_in_chunk(bytes));
250
251
let mut buffers = init_buffers(schema, capacity, ignore_errors)?;
252
parse_lines(bytes, &mut buffers, ignore_errors)?;
253
254
DataFrame::new_infer_height(
255
buffers
256
.into_values()
257
.map(|buf| Ok(buf.into_series()?.into_column()))
258
.collect::<PolarsResult<_>>()
259
.map_err(|e| match e {
260
// Nested types raise SchemaMismatch instead of ComputeError, we map it back here to
261
// be consistent.
262
PolarsError::ComputeError(..) => e,
263
PolarsError::SchemaMismatch(e) => PolarsError::ComputeError(e),
264
e => e,
265
})?,
266
)
267
}
268
269
pub fn estimate_n_lines_in_file(file_bytes: &[u8], sample_size: usize) -> usize {
270
if let Some((mean, std)) = get_line_stats_json(file_bytes, sample_size) {
271
(file_bytes.len() as f32 / (mean - 0.01 * std)) as usize
272
} else {
273
estimate_n_lines_in_chunk(file_bytes)
274
}
275
}
276
277
/// Total len divided by max len of first and last non-empty lines. This is intended to be cheaper
278
/// than `estimate_n_lines_in_file`.
279
pub fn estimate_n_lines_in_chunk(chunk: &[u8]) -> usize {
280
chunk
281
.split(|&c| c == b'\n')
282
.find(|x| !x.is_empty())
283
.map_or(1, |x| {
284
chunk.len().div_ceil(
285
x.len().max(
286
chunk
287
.rsplit(|&c| c == b'\n')
288
.find(|x| !x.is_empty())
289
.unwrap()
290
.len(),
291
),
292
)
293
})
294
}
295
296
/// Find the nearest next line position.
297
/// Does not check for new line characters embedded in String fields.
298
/// This just looks for `}\n`
299
pub(crate) fn next_line_position_naive_json(input: &[u8]) -> Option<usize> {
300
let pos = memchr::memchr(NEWLINE, input)?;
301
if pos == 0 {
302
return Some(1);
303
}
304
305
let is_closing_bracket = input.get(pos - 1) == Some(&CLOSING_BRACKET);
306
if is_closing_bracket {
307
Some(pos + 1)
308
} else {
309
None
310
}
311
}
312
313
/// Get the mean and standard deviation of length of lines in bytes
314
pub(crate) fn get_line_stats_json(bytes: &[u8], n_lines: usize) -> Option<(f32, f32)> {
315
let mut lengths = Vec::with_capacity(n_lines);
316
317
let mut bytes_trunc;
318
let n_lines_per_iter = n_lines / 2;
319
320
let mut n_read = 0;
321
322
let bytes_len = bytes.len();
323
324
// sample from start and 75% in the file
325
for offset in [0, (bytes_len as f32 * 0.75) as usize] {
326
bytes_trunc = &bytes[offset..];
327
let pos = next_line_position_naive_json(bytes_trunc)?;
328
if pos >= bytes_len {
329
return None;
330
}
331
bytes_trunc = &bytes_trunc[pos + 1..];
332
333
for _ in offset..(offset + n_lines_per_iter) {
334
let pos = next_line_position_naive_json(bytes_trunc);
335
if let Some(pos) = pos {
336
lengths.push(pos);
337
let next_bytes = &bytes_trunc[pos..];
338
if next_bytes.is_empty() {
339
return None;
340
}
341
bytes_trunc = next_bytes;
342
n_read += pos;
343
} else {
344
break;
345
}
346
}
347
}
348
349
let n_samples = lengths.len();
350
let mean = (n_read as f32) / (n_samples as f32);
351
let mut std = 0.0;
352
for &len in lengths.iter() {
353
std += (len as f32 - mean).pow(2.0)
354
}
355
std = (std / n_samples as f32).sqrt();
356
Some((mean, std))
357
}
358
359
pub(crate) fn get_file_chunks_json(bytes: &[u8], n_threads: usize) -> Vec<(usize, usize)> {
360
let mut last_pos = 0;
361
let total_len = bytes.len();
362
let chunk_size = total_len / n_threads;
363
let mut offsets = Vec::with_capacity(n_threads);
364
for _ in 0..n_threads {
365
let search_pos = last_pos + chunk_size;
366
367
if search_pos >= bytes.len() {
368
break;
369
}
370
371
let end_pos = match next_line_position_naive_json(&bytes[search_pos..]) {
372
Some(pos) => search_pos + pos,
373
None => {
374
break;
375
},
376
};
377
offsets.push((last_pos, end_pos));
378
last_pos = end_pos;
379
}
380
offsets.push((last_pos, total_len));
381
offsets
382
}
383
384