Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-json/src/json/deserialize.rs
6939 views
1
use std::borrow::Borrow;
2
use std::fmt::Write;
3
4
use arrow::array::*;
5
use arrow::bitmap::BitmapBuilder;
6
use arrow::datatypes::{ArrowDataType, IntervalUnit};
7
use arrow::offset::{Offset, Offsets};
8
use arrow::temporal_conversions;
9
use arrow::types::NativeType;
10
use num_traits::NumCast;
11
use simd_json::{BorrowedValue, StaticNode};
12
13
use super::*;
14
15
const JSON_NULL_VALUE: BorrowedValue = BorrowedValue::Static(StaticNode::Null);
16
17
fn deserialize_boolean_into<'a, A: Borrow<BorrowedValue<'a>>>(
18
target: &mut MutableBooleanArray,
19
rows: &[A],
20
) -> PolarsResult<()> {
21
let mut err_idx = rows.len();
22
let iter = rows.iter().enumerate().map(|(i, row)| match row.borrow() {
23
BorrowedValue::Static(StaticNode::Bool(v)) => Some(v),
24
BorrowedValue::Static(StaticNode::Null) => None,
25
_ => {
26
err_idx = if err_idx == rows.len() { i } else { err_idx };
27
None
28
},
29
});
30
target.extend_trusted_len(iter);
31
check_err_idx(rows, err_idx, "boolean")
32
}
33
34
fn deserialize_primitive_into<'a, T: NativeType + NumCast, A: Borrow<BorrowedValue<'a>>>(
35
target: &mut MutablePrimitiveArray<T>,
36
rows: &[A],
37
) -> PolarsResult<()> {
38
let mut err_idx = rows.len();
39
let iter = rows.iter().enumerate().map(|(i, row)| match row.borrow() {
40
BorrowedValue::Static(StaticNode::I64(v)) => T::from(*v),
41
BorrowedValue::Static(StaticNode::U64(v)) => T::from(*v),
42
BorrowedValue::Static(StaticNode::F64(v)) => T::from(*v),
43
BorrowedValue::Static(StaticNode::Bool(v)) => T::from(*v as u8),
44
BorrowedValue::Static(StaticNode::Null) => None,
45
_ => {
46
err_idx = if err_idx == rows.len() { i } else { err_idx };
47
None
48
},
49
});
50
target.extend_trusted_len(iter);
51
check_err_idx(rows, err_idx, "numeric")
52
}
53
54
fn deserialize_binary<'a, A: Borrow<BorrowedValue<'a>>>(
55
rows: &[A],
56
) -> PolarsResult<BinaryArray<i64>> {
57
let mut err_idx = rows.len();
58
let iter = rows.iter().enumerate().map(|(i, row)| match row.borrow() {
59
BorrowedValue::String(v) => Some(v.as_bytes()),
60
BorrowedValue::Static(StaticNode::Null) => None,
61
_ => {
62
err_idx = if err_idx == rows.len() { i } else { err_idx };
63
None
64
},
65
});
66
let out = BinaryArray::from_trusted_len_iter(iter);
67
check_err_idx(rows, err_idx, "binary")?;
68
Ok(out)
69
}
70
71
fn deserialize_utf8_into<'a, O: Offset, A: Borrow<BorrowedValue<'a>>>(
72
target: &mut MutableUtf8Array<O>,
73
rows: &[A],
74
) -> PolarsResult<()> {
75
let mut err_idx = rows.len();
76
let mut scratch = String::new();
77
for (i, row) in rows.iter().enumerate() {
78
match row.borrow() {
79
BorrowedValue::String(v) => target.push(Some(v.as_ref())),
80
BorrowedValue::Static(StaticNode::Bool(v)) => {
81
target.push(Some(if *v { "true" } else { "false" }))
82
},
83
BorrowedValue::Static(StaticNode::Null) => target.push_null(),
84
BorrowedValue::Static(node) => {
85
write!(scratch, "{node}").unwrap();
86
target.push(Some(scratch.as_str()));
87
scratch.clear();
88
},
89
_ => {
90
err_idx = if err_idx == rows.len() { i } else { err_idx };
91
},
92
}
93
}
94
check_err_idx(rows, err_idx, "string")
95
}
96
97
fn deserialize_utf8view_into<'a, A: Borrow<BorrowedValue<'a>>>(
98
target: &mut MutableBinaryViewArray<str>,
99
rows: &[A],
100
) -> PolarsResult<()> {
101
let mut err_idx = rows.len();
102
let mut scratch = String::new();
103
for (i, row) in rows.iter().enumerate() {
104
match row.borrow() {
105
BorrowedValue::String(v) => target.push_value(v.as_ref()),
106
BorrowedValue::Static(StaticNode::Bool(v)) => {
107
target.push_value(if *v { "true" } else { "false" })
108
},
109
BorrowedValue::Static(StaticNode::Null) => target.push_null(),
110
BorrowedValue::Static(node) => {
111
write!(scratch, "{node}").unwrap();
112
target.push_value(scratch.as_str());
113
scratch.clear();
114
},
115
_ => {
116
err_idx = if err_idx == rows.len() { i } else { err_idx };
117
},
118
}
119
}
120
check_err_idx(rows, err_idx, "string")
121
}
122
123
fn deserialize_list<'a, A: Borrow<BorrowedValue<'a>>>(
124
rows: &[A],
125
dtype: ArrowDataType,
126
allow_extra_fields_in_struct: bool,
127
) -> PolarsResult<ListArray<i64>> {
128
let mut err_idx = rows.len();
129
let child = ListArray::<i64>::get_child_type(&dtype);
130
131
let mut validity = BitmapBuilder::with_capacity(rows.len());
132
let mut offsets = Offsets::<i64>::with_capacity(rows.len());
133
let mut inner = vec![];
134
rows.iter()
135
.enumerate()
136
.for_each(|(i, row)| match row.borrow() {
137
BorrowedValue::Array(value) => {
138
inner.extend(value.iter());
139
validity.push(true);
140
offsets
141
.try_push(value.len())
142
.expect("List offset is too large :/");
143
},
144
BorrowedValue::Static(StaticNode::Null) => {
145
validity.push(false);
146
offsets.extend_constant(1)
147
},
148
value @ (BorrowedValue::Static(_) | BorrowedValue::String(_)) => {
149
inner.push(value);
150
validity.push(true);
151
offsets.try_push(1).expect("List offset is too large :/");
152
},
153
_ => {
154
err_idx = if err_idx == rows.len() { i } else { err_idx };
155
},
156
});
157
158
check_err_idx(rows, err_idx, "list")?;
159
160
let values = _deserialize(&inner, child.clone(), allow_extra_fields_in_struct)?;
161
162
Ok(ListArray::<i64>::new(
163
dtype,
164
offsets.into(),
165
values,
166
validity.into_opt_validity(),
167
))
168
}
169
170
fn deserialize_struct<'a, A: Borrow<BorrowedValue<'a>>>(
171
rows: &[A],
172
dtype: ArrowDataType,
173
allow_extra_fields_in_struct: bool,
174
) -> PolarsResult<StructArray> {
175
let mut err_idx = rows.len();
176
let fields = StructArray::get_fields(&dtype);
177
178
let mut out_values = fields
179
.iter()
180
.map(|f| (f.name.as_str(), (f.dtype(), vec![])))
181
.collect::<PlHashMap<_, _>>();
182
183
let mut validity = BitmapBuilder::with_capacity(rows.len());
184
// Custom error tracker
185
let mut extra_field = None;
186
187
rows.iter().enumerate().for_each(|(i, row)| {
188
match row.borrow() {
189
BorrowedValue::Object(values) => {
190
let mut n_matched = 0usize;
191
for (&key, &mut (_, ref mut inner)) in out_values.iter_mut() {
192
if let Some(v) = values.get(key) {
193
n_matched += 1;
194
inner.push(v)
195
} else {
196
inner.push(&JSON_NULL_VALUE)
197
}
198
}
199
200
validity.push(true);
201
202
if n_matched < values.len() && extra_field.is_none() {
203
for k in values.keys() {
204
if !out_values.contains_key(k.as_ref()) {
205
extra_field = Some(k.as_ref())
206
}
207
}
208
}
209
},
210
BorrowedValue::Static(StaticNode::Null) => {
211
out_values
212
.iter_mut()
213
.for_each(|(_, (_, inner))| inner.push(&JSON_NULL_VALUE));
214
validity.push(false);
215
},
216
_ => {
217
err_idx = if err_idx == rows.len() { i } else { err_idx };
218
},
219
};
220
});
221
222
if let Some(v) = extra_field {
223
if !allow_extra_fields_in_struct {
224
polars_bail!(
225
ComputeError:
226
"extra field in struct data: {}, consider increasing infer_schema_length, or \
227
manually specifying the full schema to ignore extra fields",
228
v
229
)
230
}
231
}
232
233
check_err_idx(rows, err_idx, "struct")?;
234
235
// ensure we collect in the proper order
236
let values = fields
237
.iter()
238
.map(|fld| {
239
let (dtype, vals) = out_values.get(fld.name.as_str()).unwrap();
240
_deserialize(vals, (*dtype).clone(), allow_extra_fields_in_struct)
241
})
242
.collect::<PolarsResult<Vec<_>>>()?;
243
244
Ok(StructArray::new(
245
dtype.clone(),
246
rows.len(),
247
values,
248
validity.into_opt_validity(),
249
))
250
}
251
252
fn fill_array_from<B, T, A>(
253
f: fn(&mut MutablePrimitiveArray<T>, &[B]) -> PolarsResult<()>,
254
dtype: ArrowDataType,
255
rows: &[B],
256
) -> PolarsResult<Box<dyn Array>>
257
where
258
T: NativeType,
259
A: From<MutablePrimitiveArray<T>> + Array,
260
{
261
let mut array = MutablePrimitiveArray::<T>::with_capacity(rows.len()).to(dtype);
262
f(&mut array, rows)?;
263
Ok(Box::new(A::from(array)))
264
}
265
266
/// A trait describing an array with a backing store that can be preallocated to
267
/// a given size.
268
pub(crate) trait Container {
269
/// Create this array with a given capacity.
270
fn with_capacity(capacity: usize) -> Self
271
where
272
Self: Sized;
273
}
274
275
impl<O: Offset> Container for MutableBinaryArray<O> {
276
fn with_capacity(capacity: usize) -> Self {
277
MutableBinaryArray::with_capacity(capacity)
278
}
279
}
280
281
impl Container for MutableBooleanArray {
282
fn with_capacity(capacity: usize) -> Self {
283
MutableBooleanArray::with_capacity(capacity)
284
}
285
}
286
287
impl Container for MutableFixedSizeBinaryArray {
288
fn with_capacity(capacity: usize) -> Self {
289
MutableFixedSizeBinaryArray::with_capacity(capacity, 0)
290
}
291
}
292
293
impl Container for MutableBinaryViewArray<str> {
294
fn with_capacity(capacity: usize) -> Self
295
where
296
Self: Sized,
297
{
298
MutableBinaryViewArray::with_capacity(capacity)
299
}
300
}
301
302
impl<O: Offset, M: MutableArray + Default + 'static> Container for MutableListArray<O, M> {
303
fn with_capacity(capacity: usize) -> Self {
304
MutableListArray::with_capacity(capacity)
305
}
306
}
307
308
impl<T: NativeType> Container for MutablePrimitiveArray<T> {
309
fn with_capacity(capacity: usize) -> Self {
310
MutablePrimitiveArray::with_capacity(capacity)
311
}
312
}
313
314
impl<O: Offset> Container for MutableUtf8Array<O> {
315
fn with_capacity(capacity: usize) -> Self {
316
MutableUtf8Array::with_capacity(capacity)
317
}
318
}
319
320
fn fill_generic_array_from<B, M, A>(
321
f: fn(&mut M, &[B]) -> PolarsResult<()>,
322
rows: &[B],
323
) -> PolarsResult<Box<dyn Array>>
324
where
325
M: Container,
326
A: From<M> + Array,
327
{
328
let mut array = M::with_capacity(rows.len());
329
f(&mut array, rows)?;
330
Ok(Box::new(A::from(array)))
331
}
332
333
pub(crate) fn _deserialize<'a, A: Borrow<BorrowedValue<'a>>>(
334
rows: &[A],
335
dtype: ArrowDataType,
336
allow_extra_fields_in_struct: bool,
337
) -> PolarsResult<Box<dyn Array>> {
338
match &dtype {
339
ArrowDataType::Null => {
340
if let Some(err_idx) = (0..rows.len())
341
.find(|i| !matches!(rows[*i].borrow(), BorrowedValue::Static(StaticNode::Null)))
342
{
343
check_err_idx(rows, err_idx, "null")?;
344
}
345
346
Ok(Box::new(NullArray::new(dtype, rows.len())))
347
},
348
ArrowDataType::Boolean => {
349
fill_generic_array_from::<_, _, BooleanArray>(deserialize_boolean_into, rows)
350
},
351
ArrowDataType::Int8 => {
352
fill_array_from::<_, _, PrimitiveArray<i8>>(deserialize_primitive_into, dtype, rows)
353
},
354
ArrowDataType::Int16 => {
355
fill_array_from::<_, _, PrimitiveArray<i16>>(deserialize_primitive_into, dtype, rows)
356
},
357
ArrowDataType::Int32
358
| ArrowDataType::Date32
359
| ArrowDataType::Time32(_)
360
| ArrowDataType::Interval(IntervalUnit::YearMonth) => {
361
fill_array_from::<_, _, PrimitiveArray<i32>>(deserialize_primitive_into, dtype, rows)
362
},
363
ArrowDataType::Interval(IntervalUnit::DayTime) => {
364
unimplemented!("There is no natural representation of DayTime in JSON.")
365
},
366
ArrowDataType::Int64
367
| ArrowDataType::Date64
368
| ArrowDataType::Time64(_)
369
| ArrowDataType::Duration(_) => {
370
fill_array_from::<_, _, PrimitiveArray<i64>>(deserialize_primitive_into, dtype, rows)
371
},
372
ArrowDataType::Timestamp(tu, tz) => {
373
let mut err_idx = rows.len();
374
let iter = rows.iter().enumerate().map(|(i, row)| match row.borrow() {
375
BorrowedValue::Static(StaticNode::I64(v)) => Some(*v),
376
BorrowedValue::String(v) => match (tu, tz) {
377
(_, None) => {
378
polars_compute::cast::temporal::utf8_to_naive_timestamp_scalar(v, "%+", tu)
379
},
380
(_, Some(tz)) => {
381
let tz = temporal_conversions::parse_offset(tz.as_str()).unwrap();
382
temporal_conversions::utf8_to_timestamp_scalar(v, "%+", &tz, tu)
383
},
384
},
385
BorrowedValue::Static(StaticNode::Null) => None,
386
_ => {
387
err_idx = if err_idx == rows.len() { i } else { err_idx };
388
None
389
},
390
});
391
let out = Box::new(Int64Array::from_iter(iter).to(dtype));
392
check_err_idx(rows, err_idx, "timestamp")?;
393
Ok(out)
394
},
395
ArrowDataType::UInt8 => {
396
fill_array_from::<_, _, PrimitiveArray<u8>>(deserialize_primitive_into, dtype, rows)
397
},
398
ArrowDataType::UInt16 => {
399
fill_array_from::<_, _, PrimitiveArray<u16>>(deserialize_primitive_into, dtype, rows)
400
},
401
ArrowDataType::UInt32 => {
402
fill_array_from::<_, _, PrimitiveArray<u32>>(deserialize_primitive_into, dtype, rows)
403
},
404
ArrowDataType::UInt64 => {
405
fill_array_from::<_, _, PrimitiveArray<u64>>(deserialize_primitive_into, dtype, rows)
406
},
407
ArrowDataType::Float16 => unreachable!(),
408
ArrowDataType::Float32 => {
409
fill_array_from::<_, _, PrimitiveArray<f32>>(deserialize_primitive_into, dtype, rows)
410
},
411
ArrowDataType::Float64 => {
412
fill_array_from::<_, _, PrimitiveArray<f64>>(deserialize_primitive_into, dtype, rows)
413
},
414
ArrowDataType::LargeUtf8 => {
415
fill_generic_array_from::<_, _, Utf8Array<i64>>(deserialize_utf8_into, rows)
416
},
417
ArrowDataType::Utf8View => {
418
fill_generic_array_from::<_, _, Utf8ViewArray>(deserialize_utf8view_into, rows)
419
},
420
ArrowDataType::LargeList(_) => Ok(Box::new(deserialize_list(
421
rows,
422
dtype,
423
allow_extra_fields_in_struct,
424
)?)),
425
ArrowDataType::LargeBinary => Ok(Box::new(deserialize_binary(rows)?)),
426
ArrowDataType::Struct(_) => Ok(Box::new(deserialize_struct(
427
rows,
428
dtype,
429
allow_extra_fields_in_struct,
430
)?)),
431
_ => todo!(),
432
}
433
}
434
435
pub fn deserialize(
436
json: &BorrowedValue,
437
dtype: ArrowDataType,
438
allow_extra_fields_in_struct: bool,
439
) -> PolarsResult<Box<dyn Array>> {
440
match json {
441
BorrowedValue::Array(rows) => match dtype {
442
ArrowDataType::LargeList(inner) => {
443
_deserialize(rows, inner.dtype, allow_extra_fields_in_struct)
444
},
445
_ => todo!("read an Array from a non-Array data type"),
446
},
447
_ => _deserialize(&[json], dtype, allow_extra_fields_in_struct),
448
}
449
}
450
451
fn check_err_idx<'a>(
452
rows: &[impl Borrow<BorrowedValue<'a>>],
453
err_idx: usize,
454
type_name: &'static str,
455
) -> PolarsResult<()> {
456
if err_idx != rows.len() {
457
polars_bail!(
458
ComputeError:
459
r#"error deserializing value "{:?}" as {}. \
460
Try increasing `infer_schema_length` or specifying a schema.
461
"#,
462
rows[err_idx].borrow(), type_name,
463
)
464
}
465
466
Ok(())
467
}
468
469