Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-json/src/ndjson/deserialize.rs
6939 views
1
use arrow::array::Array;
2
use arrow::compute::concatenate::concatenate_unchecked;
3
use simd_json::BorrowedValue;
4
5
use super::*;
6
7
/// Deserializes an iterator of rows into an [`Array`][Array] of [`DataType`].
8
///
9
/// [Array]: arrow::array::Array
10
///
11
/// # Implementation
12
/// This function is CPU-bounded.
13
/// This function is guaranteed to return an array of length equal to the length
14
/// # Errors
15
/// This function errors iff any of the rows is not a valid JSON (i.e. the format is not valid NDJSON).
16
pub fn deserialize_iter<'a>(
17
rows: impl Iterator<Item = &'a str>,
18
dtype: ArrowDataType,
19
buf_size: usize,
20
count: usize,
21
allow_extra_fields_in_struct: bool,
22
) -> PolarsResult<ArrayRef> {
23
let mut arr: Vec<Box<dyn Array>> = Vec::new();
24
let mut buf = Vec::with_capacity(std::cmp::min(buf_size + count + 2, u32::MAX as usize));
25
buf.push(b'[');
26
27
fn _deserializer(
28
s: &mut [u8],
29
dtype: ArrowDataType,
30
allow_extra_fields_in_struct: bool,
31
) -> PolarsResult<Box<dyn Array>> {
32
let out = simd_json::to_borrowed_value(s)
33
.map_err(|e| PolarsError::ComputeError(format!("json parsing error: '{e}'").into()))?;
34
if let BorrowedValue::Array(rows) = out {
35
super::super::json::deserialize::_deserialize(
36
&rows,
37
dtype,
38
allow_extra_fields_in_struct,
39
)
40
} else {
41
unreachable!()
42
}
43
}
44
let mut row_iter = rows.peekable();
45
46
while let Some(row) = row_iter.next() {
47
buf.extend_from_slice(row.as_bytes());
48
buf.push(b',');
49
50
let next_row_length = row_iter.peek().map(|row| row.len()).unwrap_or(0);
51
if buf.len() + next_row_length >= u32::MAX as usize {
52
let _ = buf.pop();
53
buf.push(b']');
54
arr.push(_deserializer(
55
&mut buf,
56
dtype.clone(),
57
allow_extra_fields_in_struct,
58
)?);
59
buf.clear();
60
buf.push(b'[');
61
}
62
}
63
if buf.len() > 1 {
64
let _ = buf.pop();
65
}
66
buf.push(b']');
67
68
if arr.is_empty() {
69
_deserializer(&mut buf, dtype, allow_extra_fields_in_struct)
70
} else {
71
arr.push(_deserializer(
72
&mut buf,
73
dtype,
74
allow_extra_fields_in_struct,
75
)?);
76
concatenate_unchecked(&arr)
77
}
78
}
79
80