Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-json/src/json/write/serialize.rs
6939 views
1
use std::io::Write;
2
3
use arrow::array::*;
4
use arrow::bitmap::utils::ZipValidity;
5
#[cfg(feature = "dtype-decimal")]
6
use arrow::compute::decimal::get_trim_decimal_zeros;
7
use arrow::datatypes::{ArrowDataType, IntegerType, TimeUnit};
8
use arrow::io::iterator::BufStreamingIterator;
9
use arrow::offset::Offset;
10
#[cfg(feature = "timezones")]
11
use arrow::temporal_conversions::parse_offset_tz;
12
use arrow::temporal_conversions::{
13
date32_to_date, duration_ms_to_duration, duration_ns_to_duration, duration_us_to_duration,
14
parse_offset, time64ns_to_time, timestamp_ms_to_datetime, timestamp_ns_to_datetime,
15
timestamp_to_datetime, timestamp_us_to_datetime,
16
};
17
use arrow::types::NativeType;
18
use chrono::{Duration, NaiveDate, NaiveDateTime, NaiveTime};
19
use streaming_iterator::StreamingIterator;
20
21
use super::utf8;
22
23
fn write_integer<I: itoa::Integer>(buf: &mut Vec<u8>, val: I) {
24
let mut buffer = itoa::Buffer::new();
25
let value = buffer.format(val);
26
buf.extend_from_slice(value.as_bytes())
27
}
28
29
fn write_float<I: ryu::Float>(f: &mut Vec<u8>, val: I) {
30
let mut buffer = ryu::Buffer::new();
31
let value = buffer.format(val);
32
f.extend_from_slice(value.as_bytes())
33
}
34
35
fn materialize_serializer<'a, I, F, T>(
36
f: F,
37
iterator: I,
38
offset: usize,
39
take: usize,
40
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync>
41
where
42
T: 'a,
43
I: Iterator<Item = T> + Send + Sync + 'a,
44
F: FnMut(T, &mut Vec<u8>) + Send + Sync + 'a,
45
{
46
if offset > 0 || take < usize::MAX {
47
Box::new(BufStreamingIterator::new(
48
iterator.skip(offset).take(take),
49
f,
50
vec![],
51
))
52
} else {
53
Box::new(BufStreamingIterator::new(iterator, f, vec![]))
54
}
55
}
56
57
fn boolean_serializer<'a>(
58
array: &'a BooleanArray,
59
offset: usize,
60
take: usize,
61
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
62
let f = |x: Option<bool>, buf: &mut Vec<u8>| match x {
63
Some(true) => buf.extend_from_slice(b"true"),
64
Some(false) => buf.extend_from_slice(b"false"),
65
None => buf.extend_from_slice(b"null"),
66
};
67
materialize_serializer(f, array.iter(), offset, take)
68
}
69
70
fn null_serializer(
71
len: usize,
72
offset: usize,
73
take: usize,
74
) -> Box<dyn StreamingIterator<Item = [u8]> + Send + Sync> {
75
let f = |_x: (), buf: &mut Vec<u8>| buf.extend_from_slice(b"null");
76
materialize_serializer(f, std::iter::repeat_n((), len), offset, take)
77
}
78
79
fn primitive_serializer<'a, T: NativeType + itoa::Integer>(
80
array: &'a PrimitiveArray<T>,
81
offset: usize,
82
take: usize,
83
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
84
let f = |x: Option<&T>, buf: &mut Vec<u8>| {
85
if let Some(x) = x {
86
write_integer(buf, *x)
87
} else {
88
buf.extend(b"null")
89
}
90
};
91
materialize_serializer(f, array.iter(), offset, take)
92
}
93
94
fn float_serializer<'a, T>(
95
array: &'a PrimitiveArray<T>,
96
offset: usize,
97
take: usize,
98
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync>
99
where
100
T: num_traits::Float + NativeType + ryu::Float,
101
{
102
let f = |x: Option<&T>, buf: &mut Vec<u8>| {
103
if let Some(x) = x {
104
if T::is_nan(*x) || T::is_infinite(*x) {
105
buf.extend(b"null")
106
} else {
107
write_float(buf, *x)
108
}
109
} else {
110
buf.extend(b"null")
111
}
112
};
113
114
materialize_serializer(f, array.iter(), offset, take)
115
}
116
117
#[cfg(feature = "dtype-decimal")]
118
fn decimal_serializer<'a>(
119
array: &'a PrimitiveArray<i128>,
120
scale: usize,
121
offset: usize,
122
take: usize,
123
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
124
let trim_zeros = get_trim_decimal_zeros();
125
let mut fmt_buf = arrow::compute::decimal::DecimalFmtBuffer::new();
126
let f = move |x: Option<&i128>, buf: &mut Vec<u8>| {
127
if let Some(x) = x {
128
utf8::write_str(buf, fmt_buf.format(*x, scale, trim_zeros)).unwrap()
129
} else {
130
buf.extend(b"null")
131
}
132
};
133
134
materialize_serializer(f, array.iter(), offset, take)
135
}
136
137
fn dictionary_utf8view_serializer<'a, K: DictionaryKey>(
138
array: &'a DictionaryArray<K>,
139
offset: usize,
140
take: usize,
141
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
142
let iter = array.iter_typed::<Utf8ViewArray>().unwrap().skip(offset);
143
let f = |x: Option<&str>, buf: &mut Vec<u8>| {
144
if let Some(x) = x {
145
utf8::write_str(buf, x).unwrap();
146
} else {
147
buf.extend_from_slice(b"null")
148
}
149
};
150
materialize_serializer(f, iter, offset, take)
151
}
152
153
fn utf8_serializer<'a, O: Offset>(
154
array: &'a Utf8Array<O>,
155
offset: usize,
156
take: usize,
157
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
158
let f = |x: Option<&str>, buf: &mut Vec<u8>| {
159
if let Some(x) = x {
160
utf8::write_str(buf, x).unwrap();
161
} else {
162
buf.extend_from_slice(b"null")
163
}
164
};
165
materialize_serializer(f, array.iter(), offset, take)
166
}
167
168
fn utf8view_serializer<'a>(
169
array: &'a Utf8ViewArray,
170
offset: usize,
171
take: usize,
172
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
173
let f = |x: Option<&str>, buf: &mut Vec<u8>| {
174
if let Some(x) = x {
175
utf8::write_str(buf, x).unwrap();
176
} else {
177
buf.extend_from_slice(b"null")
178
}
179
};
180
materialize_serializer(f, array.iter(), offset, take)
181
}
182
183
fn struct_serializer<'a>(
184
array: &'a StructArray,
185
offset: usize,
186
take: usize,
187
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
188
// {"a": [1, 2, 3], "b": [a, b, c], "c": {"a": [1, 2, 3]}}
189
// [
190
// {"a": 1, "b": a, "c": {"a": 1}},
191
// {"a": 2, "b": b, "c": {"a": 2}},
192
// {"a": 3, "b": c, "c": {"a": 3}},
193
// ]
194
//
195
let mut serializers = array
196
.values()
197
.iter()
198
.map(|x| x.as_ref())
199
.map(|arr| new_serializer(arr, offset, take))
200
.collect::<Vec<_>>();
201
202
Box::new(BufStreamingIterator::new(
203
ZipValidity::new_with_validity(0..array.len(), array.validity()),
204
move |maybe, buf| {
205
if maybe.is_some() {
206
let names = array.fields().iter().map(|f| f.name.as_str());
207
serialize_item(
208
buf,
209
names.zip(
210
serializers
211
.iter_mut()
212
.map(|serializer| serializer.next().unwrap()),
213
),
214
true,
215
);
216
} else {
217
serializers.iter_mut().for_each(|iter| {
218
let _ = iter.next();
219
});
220
buf.extend(b"null");
221
}
222
},
223
vec![],
224
))
225
}
226
227
fn list_serializer<'a, O: Offset>(
228
array: &'a ListArray<O>,
229
offset: usize,
230
take: usize,
231
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
232
// [[1, 2], [3]]
233
// [
234
// [1, 2],
235
// [3]
236
// ]
237
//
238
let offsets = array.offsets().as_slice();
239
let start = offsets[0].to_usize();
240
let end = offsets.last().unwrap().to_usize();
241
let mut serializer = new_serializer(array.values().as_ref(), start, end - start);
242
243
let mut prev_offset = start;
244
let f = move |offset: Option<&[O]>, buf: &mut Vec<u8>| {
245
if let Some(offset) = offset {
246
if offset[0].to_usize() > prev_offset {
247
for _ in 0..(offset[0].to_usize() - prev_offset) {
248
serializer.next().unwrap();
249
}
250
}
251
252
let length = (offset[1] - offset[0]).to_usize();
253
buf.push(b'[');
254
let mut is_first_row = true;
255
for _ in 0..length {
256
if !is_first_row {
257
buf.push(b',');
258
}
259
is_first_row = false;
260
buf.extend(serializer.next().unwrap());
261
}
262
buf.push(b']');
263
prev_offset = offset[1].to_usize();
264
} else {
265
buf.extend(b"null");
266
}
267
};
268
269
let iter =
270
ZipValidity::new_with_validity(array.offsets().buffer().windows(2), array.validity());
271
materialize_serializer(f, iter, offset, take)
272
}
273
274
fn fixed_size_list_serializer<'a>(
275
array: &'a FixedSizeListArray,
276
offset: usize,
277
take: usize,
278
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
279
let mut serializer = new_serializer(
280
array.values().as_ref(),
281
offset * array.size(),
282
take * array.size(),
283
);
284
285
Box::new(BufStreamingIterator::new(
286
ZipValidity::new(0..array.len(), array.validity().map(|x| x.iter())),
287
move |ix, buf| {
288
if ix.is_some() {
289
let length = array.size();
290
buf.push(b'[');
291
let mut is_first_row = true;
292
for _ in 0..length {
293
if !is_first_row {
294
buf.push(b',');
295
}
296
is_first_row = false;
297
buf.extend(serializer.next().unwrap());
298
}
299
buf.push(b']');
300
} else {
301
buf.extend(b"null");
302
}
303
},
304
vec![],
305
))
306
}
307
308
fn date_serializer<'a, T, F>(
309
array: &'a PrimitiveArray<T>,
310
convert: F,
311
offset: usize,
312
take: usize,
313
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync>
314
where
315
T: NativeType,
316
F: Fn(T) -> NaiveDate + 'static + Send + Sync,
317
{
318
let f = move |x: Option<&T>, buf: &mut Vec<u8>| {
319
if let Some(x) = x {
320
let nd = convert(*x);
321
write!(buf, "\"{nd}\"").unwrap();
322
} else {
323
buf.extend_from_slice(b"null")
324
}
325
};
326
327
materialize_serializer(f, array.iter(), offset, take)
328
}
329
330
fn duration_serializer<'a, T, F>(
331
array: &'a PrimitiveArray<T>,
332
convert: F,
333
offset: usize,
334
take: usize,
335
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync>
336
where
337
T: NativeType,
338
F: Fn(T) -> Duration + 'static + Send + Sync,
339
{
340
let f = move |x: Option<&T>, buf: &mut Vec<u8>| {
341
if let Some(x) = x {
342
let duration = convert(*x);
343
write!(buf, "\"{duration}\"").unwrap();
344
} else {
345
buf.extend_from_slice(b"null")
346
}
347
};
348
349
materialize_serializer(f, array.iter(), offset, take)
350
}
351
352
fn time_serializer<'a, T, F>(
353
array: &'a PrimitiveArray<T>,
354
convert: F,
355
offset: usize,
356
take: usize,
357
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync>
358
where
359
T: NativeType,
360
F: Fn(T) -> NaiveTime + 'static + Send + Sync,
361
{
362
let f = move |x: Option<&T>, buf: &mut Vec<u8>| {
363
if let Some(x) = x {
364
let time = convert(*x);
365
write!(buf, "\"{time}\"").unwrap();
366
} else {
367
buf.extend_from_slice(b"null")
368
}
369
};
370
371
materialize_serializer(f, array.iter(), offset, take)
372
}
373
374
fn timestamp_serializer<'a, F>(
375
array: &'a PrimitiveArray<i64>,
376
convert: F,
377
offset: usize,
378
take: usize,
379
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync>
380
where
381
F: Fn(i64) -> NaiveDateTime + 'static + Send + Sync,
382
{
383
let f = move |x: Option<&i64>, buf: &mut Vec<u8>| {
384
if let Some(x) = x {
385
let ndt = convert(*x);
386
write!(buf, "\"{ndt}\"").unwrap();
387
} else {
388
buf.extend_from_slice(b"null")
389
}
390
};
391
materialize_serializer(f, array.iter(), offset, take)
392
}
393
394
fn timestamp_tz_serializer<'a>(
395
array: &'a PrimitiveArray<i64>,
396
time_unit: TimeUnit,
397
tz: &str,
398
offset: usize,
399
take: usize,
400
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
401
match parse_offset(tz) {
402
Ok(parsed_tz) => {
403
let f = move |x: Option<&i64>, buf: &mut Vec<u8>| {
404
if let Some(x) = x {
405
let dt_str = timestamp_to_datetime(*x, time_unit, &parsed_tz).to_rfc3339();
406
write!(buf, "\"{dt_str}\"").unwrap();
407
} else {
408
buf.extend_from_slice(b"null")
409
}
410
};
411
412
materialize_serializer(f, array.iter(), offset, take)
413
},
414
#[cfg(feature = "timezones")]
415
_ => match parse_offset_tz(tz) {
416
Ok(parsed_tz) => {
417
let f = move |x: Option<&i64>, buf: &mut Vec<u8>| {
418
if let Some(x) = x {
419
let dt_str = timestamp_to_datetime(*x, time_unit, &parsed_tz).to_rfc3339();
420
write!(buf, "\"{dt_str}\"").unwrap();
421
} else {
422
buf.extend_from_slice(b"null")
423
}
424
};
425
426
materialize_serializer(f, array.iter(), offset, take)
427
},
428
_ => {
429
panic!("Timezone {tz} is invalid or not supported");
430
},
431
},
432
#[cfg(not(feature = "timezones"))]
433
_ => {
434
panic!("Invalid Offset format (must be [-]00:00) or timezones feature not active");
435
},
436
}
437
}
438
439
pub(crate) fn new_serializer<'a>(
440
array: &'a dyn Array,
441
offset: usize,
442
take: usize,
443
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
444
match array.dtype().to_logical_type() {
445
ArrowDataType::Boolean => {
446
boolean_serializer(array.as_any().downcast_ref().unwrap(), offset, take)
447
},
448
ArrowDataType::Int8 => {
449
primitive_serializer::<i8>(array.as_any().downcast_ref().unwrap(), offset, take)
450
},
451
ArrowDataType::Int16 => {
452
primitive_serializer::<i16>(array.as_any().downcast_ref().unwrap(), offset, take)
453
},
454
ArrowDataType::Int32 => {
455
primitive_serializer::<i32>(array.as_any().downcast_ref().unwrap(), offset, take)
456
},
457
ArrowDataType::Int64 => {
458
primitive_serializer::<i64>(array.as_any().downcast_ref().unwrap(), offset, take)
459
},
460
ArrowDataType::UInt8 => {
461
primitive_serializer::<u8>(array.as_any().downcast_ref().unwrap(), offset, take)
462
},
463
ArrowDataType::UInt16 => {
464
primitive_serializer::<u16>(array.as_any().downcast_ref().unwrap(), offset, take)
465
},
466
ArrowDataType::UInt32 => {
467
primitive_serializer::<u32>(array.as_any().downcast_ref().unwrap(), offset, take)
468
},
469
ArrowDataType::UInt64 => {
470
primitive_serializer::<u64>(array.as_any().downcast_ref().unwrap(), offset, take)
471
},
472
ArrowDataType::Float32 => {
473
float_serializer::<f32>(array.as_any().downcast_ref().unwrap(), offset, take)
474
},
475
ArrowDataType::Float64 => {
476
float_serializer::<f64>(array.as_any().downcast_ref().unwrap(), offset, take)
477
},
478
#[cfg(feature = "dtype-decimal")]
479
ArrowDataType::Decimal(_, scale) => {
480
decimal_serializer(array.as_any().downcast_ref().unwrap(), *scale, offset, take)
481
},
482
ArrowDataType::LargeUtf8 => {
483
utf8_serializer::<i64>(array.as_any().downcast_ref().unwrap(), offset, take)
484
},
485
ArrowDataType::Utf8View => {
486
utf8view_serializer(array.as_any().downcast_ref().unwrap(), offset, take)
487
},
488
ArrowDataType::Struct(_) => {
489
struct_serializer(array.as_any().downcast_ref().unwrap(), offset, take)
490
},
491
ArrowDataType::FixedSizeList(_, _) => {
492
fixed_size_list_serializer(array.as_any().downcast_ref().unwrap(), offset, take)
493
},
494
ArrowDataType::LargeList(_) => {
495
list_serializer::<i64>(array.as_any().downcast_ref().unwrap(), offset, take)
496
},
497
ArrowDataType::Dictionary(k, v, _) => match (k, &**v) {
498
(IntegerType::UInt8, ArrowDataType::Utf8View) => {
499
let array = array
500
.as_any()
501
.downcast_ref::<DictionaryArray<u8>>()
502
.unwrap();
503
dictionary_utf8view_serializer::<u8>(array, offset, take)
504
},
505
(IntegerType::UInt16, ArrowDataType::Utf8View) => {
506
let array = array
507
.as_any()
508
.downcast_ref::<DictionaryArray<u16>>()
509
.unwrap();
510
dictionary_utf8view_serializer::<u16>(array, offset, take)
511
},
512
(IntegerType::UInt32, ArrowDataType::Utf8View) => {
513
let array = array
514
.as_any()
515
.downcast_ref::<DictionaryArray<u32>>()
516
.unwrap();
517
dictionary_utf8view_serializer::<u32>(array, offset, take)
518
},
519
_ => {
520
// Not produced by polars
521
unreachable!()
522
},
523
},
524
ArrowDataType::Date32 => date_serializer(
525
array.as_any().downcast_ref().unwrap(),
526
date32_to_date,
527
offset,
528
take,
529
),
530
ArrowDataType::Timestamp(tu, None) => {
531
let convert = match tu {
532
TimeUnit::Nanosecond => timestamp_ns_to_datetime,
533
TimeUnit::Microsecond => timestamp_us_to_datetime,
534
TimeUnit::Millisecond => timestamp_ms_to_datetime,
535
tu => panic!("Invalid time unit '{tu:?}' for Datetime."),
536
};
537
timestamp_serializer(
538
array.as_any().downcast_ref().unwrap(),
539
convert,
540
offset,
541
take,
542
)
543
},
544
ArrowDataType::Timestamp(time_unit, Some(tz)) => timestamp_tz_serializer(
545
array.as_any().downcast_ref().unwrap(),
546
*time_unit,
547
tz,
548
offset,
549
take,
550
),
551
ArrowDataType::Duration(tu) => {
552
let convert = match tu {
553
TimeUnit::Nanosecond => duration_ns_to_duration,
554
TimeUnit::Microsecond => duration_us_to_duration,
555
TimeUnit::Millisecond => duration_ms_to_duration,
556
tu => panic!("Invalid time unit '{tu:?}' for Duration."),
557
};
558
duration_serializer(
559
array.as_any().downcast_ref().unwrap(),
560
convert,
561
offset,
562
take,
563
)
564
},
565
ArrowDataType::Time64(tu) => {
566
let convert = match tu {
567
TimeUnit::Nanosecond => time64ns_to_time,
568
tu => panic!("Invalid time unit '{tu:?}' for Time."),
569
};
570
time_serializer(
571
array.as_any().downcast_ref().unwrap(),
572
convert,
573
offset,
574
take,
575
)
576
},
577
ArrowDataType::Null => null_serializer(array.len(), offset, take),
578
other => todo!("Writing {:?} to JSON", other),
579
}
580
}
581
582
fn serialize_item<'a>(
583
buffer: &mut Vec<u8>,
584
record: impl Iterator<Item = (&'a str, &'a [u8])>,
585
is_first_row: bool,
586
) {
587
if !is_first_row {
588
buffer.push(b',');
589
}
590
buffer.push(b'{');
591
let mut first_item = true;
592
for (key, value) in record {
593
if !first_item {
594
buffer.push(b',');
595
}
596
first_item = false;
597
utf8::write_str(buffer, key).unwrap();
598
buffer.push(b':');
599
buffer.extend(value);
600
}
601
buffer.push(b'}');
602
}
603
604
/// Serializes `array` to a valid JSON to `buffer`
605
/// # Implementation
606
/// This operation is CPU-bounded
607
pub(crate) fn serialize(array: &dyn Array, buffer: &mut Vec<u8>) {
608
let mut serializer = new_serializer(array, 0, usize::MAX);
609
610
(0..array.len()).for_each(|i| {
611
if i != 0 {
612
buffer.push(b',');
613
}
614
buffer.extend_from_slice(serializer.next().unwrap());
615
});
616
}
617
618