Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-json/src/json/write/serialize.rs
8407 views
1
use std::io::Write;
2
3
use arrow::array::*;
4
use arrow::bitmap::utils::ZipValidity;
5
#[cfg(feature = "dtype-decimal")]
6
use arrow::compute::decimal::get_trim_decimal_zeros;
7
use arrow::datatypes::{ArrowDataType, IntegerType, TimeUnit};
8
use arrow::io::iterator::BufStreamingIterator;
9
use arrow::offset::Offset;
10
#[cfg(feature = "timezones")]
11
use arrow::temporal_conversions::parse_offset_tz;
12
use arrow::temporal_conversions::{
13
date32_to_date, duration_ms_to_duration, duration_ns_to_duration, duration_us_to_duration,
14
parse_offset, time64ns_to_time, timestamp_ms_to_datetime, timestamp_ns_to_datetime,
15
timestamp_to_datetime, timestamp_us_to_datetime,
16
};
17
use arrow::types::NativeType;
18
use chrono::{Duration, NaiveDate, NaiveDateTime, NaiveTime};
19
use num_traits::{Float, ToPrimitive};
20
use polars_utils::float16::pf16;
21
use streaming_iterator::StreamingIterator;
22
23
use super::utf8;
24
25
fn write_integer<I: itoa::Integer>(buf: &mut Vec<u8>, val: I) {
26
let mut buffer = itoa::Buffer::new();
27
let value = buffer.format(val);
28
buf.extend_from_slice(value.as_bytes())
29
}
30
31
fn write_float<I: zmij::Float>(f: &mut Vec<u8>, val: I) {
32
let mut buffer = zmij::Buffer::new();
33
let value = buffer.format(val);
34
f.extend_from_slice(value.as_bytes())
35
}
36
37
pub trait JsonSerializer: StreamingIterator<Item = [u8]> {
38
/// Serializes all rows directly into `to`, bypassing the `BufStreamingIterator` buffer.
39
fn serialize_json_lines_to_vec(self: Box<Self>, to: &mut Vec<u8>, n: usize);
40
}
41
42
impl<I, F, T> JsonSerializer for BufStreamingIterator<I, F, T>
43
where
44
I: Iterator<Item = T>,
45
F: FnMut(T, &mut Vec<u8>),
46
{
47
fn serialize_json_lines_to_vec(self: Box<Self>, to: &mut Vec<u8>, n: usize) {
48
let (mut iterator, mut f) = self.into_inner();
49
50
for _ in 0..n {
51
if let Some(v) = iterator.next() {
52
f(v, to);
53
to.push(b'\n')
54
}
55
}
56
}
57
}
58
59
fn materialize_serializer<'a, I, F, T>(
60
f: F,
61
iterator: I,
62
offset: usize,
63
take: usize,
64
) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync>
65
where
66
T: 'a,
67
I: Iterator<Item = T> + Send + Sync + 'a,
68
F: FnMut(T, &mut Vec<u8>) + Send + Sync + 'a,
69
{
70
if offset > 0 || take < usize::MAX {
71
Box::new(BufStreamingIterator::new(
72
iterator.skip(offset).take(take),
73
f,
74
vec![],
75
))
76
} else {
77
Box::new(BufStreamingIterator::new(iterator, f, vec![]))
78
}
79
}
80
81
fn boolean_serializer<'a>(
82
array: &'a BooleanArray,
83
offset: usize,
84
take: usize,
85
) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync> {
86
let f = |x: Option<bool>, buf: &mut Vec<u8>| match x {
87
Some(true) => buf.extend_from_slice(b"true"),
88
Some(false) => buf.extend_from_slice(b"false"),
89
None => buf.extend_from_slice(b"null"),
90
};
91
materialize_serializer(f, array.iter(), offset, take)
92
}
93
94
fn null_serializer(
95
len: usize,
96
offset: usize,
97
take: usize,
98
) -> Box<dyn JsonSerializer<Item = [u8]> + Send + Sync> {
99
let f = |_x: (), buf: &mut Vec<u8>| buf.extend_from_slice(b"null");
100
materialize_serializer(f, std::iter::repeat_n((), len), offset, take)
101
}
102
103
fn primitive_serializer<'a, T: NativeType + itoa::Integer>(
104
array: &'a PrimitiveArray<T>,
105
offset: usize,
106
take: usize,
107
) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync> {
108
let f = |x: Option<&T>, buf: &mut Vec<u8>| {
109
if let Some(x) = x {
110
write_integer(buf, *x)
111
} else {
112
buf.extend(b"null")
113
}
114
};
115
materialize_serializer(f, array.iter(), offset, take)
116
}
117
118
fn float_serializer<'a, T>(
119
array: &'a PrimitiveArray<T>,
120
offset: usize,
121
take: usize,
122
) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync>
123
where
124
T: num_traits::Float + NativeType + zmij::Float,
125
{
126
let f = |x: Option<&T>, buf: &mut Vec<u8>| {
127
if let Some(x) = x {
128
if T::is_nan(*x) || T::is_infinite(*x) {
129
buf.extend(b"null")
130
} else {
131
write_float(buf, *x)
132
}
133
} else {
134
buf.extend(b"null")
135
}
136
};
137
138
materialize_serializer(f, array.iter(), offset, take)
139
}
140
141
fn float16_serializer<'a>(
142
array: &'a PrimitiveArray<pf16>,
143
offset: usize,
144
take: usize,
145
) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync> {
146
let f = |x: Option<&pf16>, buf: &mut Vec<u8>| {
147
if let Some(x) = x {
148
if pf16::is_nan(*x) || pf16::is_infinite(*x) {
149
buf.extend(b"null")
150
} else {
151
write_float(buf, x.to_f32().unwrap())
152
}
153
} else {
154
buf.extend(b"null")
155
}
156
};
157
158
materialize_serializer(f, array.iter(), offset, take)
159
}
160
161
#[cfg(feature = "dtype-decimal")]
162
fn decimal_serializer<'a>(
163
array: &'a PrimitiveArray<i128>,
164
scale: usize,
165
offset: usize,
166
take: usize,
167
) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync> {
168
let trim_zeros = get_trim_decimal_zeros();
169
let mut fmt_buf = polars_compute::decimal::DecimalFmtBuffer::new();
170
let f = move |x: Option<&i128>, buf: &mut Vec<u8>| {
171
if let Some(x) = x {
172
utf8::write_str(buf, fmt_buf.format_dec128(*x, scale, trim_zeros, false)).unwrap()
173
} else {
174
buf.extend(b"null")
175
}
176
};
177
178
materialize_serializer(f, array.iter(), offset, take)
179
}
180
181
fn dictionary_utf8view_serializer<'a, K: DictionaryKey>(
182
array: &'a DictionaryArray<K>,
183
offset: usize,
184
take: usize,
185
) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync> {
186
let iter = array.iter_typed::<Utf8ViewArray>().unwrap().skip(offset);
187
let f = |x: Option<&str>, buf: &mut Vec<u8>| {
188
if let Some(x) = x {
189
utf8::write_str(buf, x).unwrap();
190
} else {
191
buf.extend_from_slice(b"null")
192
}
193
};
194
materialize_serializer(f, iter, offset, take)
195
}
196
197
fn utf8_serializer<'a, O: Offset>(
198
array: &'a Utf8Array<O>,
199
offset: usize,
200
take: usize,
201
) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync> {
202
let f = |x: Option<&str>, buf: &mut Vec<u8>| {
203
if let Some(x) = x {
204
utf8::write_str(buf, x).unwrap();
205
} else {
206
buf.extend_from_slice(b"null")
207
}
208
};
209
materialize_serializer(f, array.iter(), offset, take)
210
}
211
212
fn utf8view_serializer<'a>(
213
array: &'a Utf8ViewArray,
214
offset: usize,
215
take: usize,
216
) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync> {
217
let f = |x: Option<&str>, buf: &mut Vec<u8>| {
218
if let Some(x) = x {
219
utf8::write_str(buf, x).unwrap();
220
} else {
221
buf.extend_from_slice(b"null")
222
}
223
};
224
materialize_serializer(f, array.iter(), offset, take)
225
}
226
227
fn struct_serializer<'a>(
228
array: &'a StructArray,
229
offset: usize,
230
take: usize,
231
) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync> {
232
// {"a": [1, 2, 3], "b": [a, b, c], "c": {"a": [1, 2, 3]}}
233
// [
234
// {"a": 1, "b": a, "c": {"a": 1}},
235
// {"a": 2, "b": b, "c": {"a": 2}},
236
// {"a": 3, "b": c, "c": {"a": 3}},
237
// ]
238
//
239
let mut serializers = array
240
.values()
241
.iter()
242
.map(|x| x.as_ref())
243
.map(|arr| new_serializer(arr, offset, take))
244
.collect::<Vec<_>>();
245
246
Box::new(BufStreamingIterator::new(
247
ZipValidity::new_with_validity(0..array.len(), array.validity()),
248
move |maybe, buf| {
249
if maybe.is_some() {
250
let names = array.fields().iter().map(|f| f.name.as_str());
251
serialize_item(
252
buf,
253
names.zip(
254
serializers
255
.iter_mut()
256
.map(|serializer| serializer.next().unwrap()),
257
),
258
true,
259
);
260
} else {
261
serializers.iter_mut().for_each(|iter| {
262
let _ = iter.next();
263
});
264
buf.extend(b"null");
265
}
266
},
267
vec![],
268
))
269
}
270
271
fn list_serializer<'a, O: Offset>(
272
array: &'a ListArray<O>,
273
offset: usize,
274
take: usize,
275
) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync> {
276
// [[1, 2], [3]]
277
// [
278
// [1, 2],
279
// [3]
280
// ]
281
//
282
let offsets = array.offsets().as_slice();
283
let start = offsets[0].to_usize();
284
let end = offsets.last().unwrap().to_usize();
285
let mut serializer = new_serializer(array.values().as_ref(), start, end - start);
286
287
let mut prev_offset = start;
288
let f = move |offset: Option<&[O]>, buf: &mut Vec<u8>| {
289
if let Some(offset) = offset {
290
if offset[0].to_usize() > prev_offset {
291
for _ in 0..(offset[0].to_usize() - prev_offset) {
292
serializer.next().unwrap();
293
}
294
}
295
296
let length = (offset[1] - offset[0]).to_usize();
297
buf.push(b'[');
298
let mut is_first_row = true;
299
for _ in 0..length {
300
if !is_first_row {
301
buf.push(b',');
302
}
303
is_first_row = false;
304
buf.extend(serializer.next().unwrap());
305
}
306
buf.push(b']');
307
prev_offset = offset[1].to_usize();
308
} else {
309
buf.extend(b"null");
310
}
311
};
312
313
let iter =
314
ZipValidity::new_with_validity(array.offsets().buffer().windows(2), array.validity());
315
materialize_serializer(f, iter, offset, take)
316
}
317
318
fn fixed_size_list_serializer<'a>(
319
array: &'a FixedSizeListArray,
320
offset: usize,
321
take: usize,
322
) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync> {
323
let mut serializer = new_serializer(
324
array.values().as_ref(),
325
offset * array.size(),
326
take * array.size(),
327
);
328
329
Box::new(BufStreamingIterator::new(
330
ZipValidity::new(0..array.len(), array.validity().map(|x| x.iter())),
331
move |ix, buf| {
332
if ix.is_some() {
333
let length = array.size();
334
buf.push(b'[');
335
let mut is_first_row = true;
336
for _ in 0..length {
337
if !is_first_row {
338
buf.push(b',');
339
}
340
is_first_row = false;
341
buf.extend(serializer.next().unwrap());
342
}
343
buf.push(b']');
344
} else {
345
buf.extend(b"null");
346
}
347
},
348
vec![],
349
))
350
}
351
352
fn date_serializer<'a, T, F>(
353
array: &'a PrimitiveArray<T>,
354
convert: F,
355
offset: usize,
356
take: usize,
357
) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync>
358
where
359
T: NativeType,
360
F: Fn(T) -> NaiveDate + 'static + Send + Sync,
361
{
362
let f = move |x: Option<&T>, buf: &mut Vec<u8>| {
363
if let Some(x) = x {
364
let nd = convert(*x);
365
write!(buf, "\"{nd}\"").unwrap();
366
} else {
367
buf.extend_from_slice(b"null")
368
}
369
};
370
371
materialize_serializer(f, array.iter(), offset, take)
372
}
373
374
fn duration_serializer<'a, T, F>(
375
array: &'a PrimitiveArray<T>,
376
convert: F,
377
offset: usize,
378
take: usize,
379
) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync>
380
where
381
T: NativeType,
382
F: Fn(T) -> Duration + 'static + Send + Sync,
383
{
384
let f = move |x: Option<&T>, buf: &mut Vec<u8>| {
385
if let Some(x) = x {
386
let duration = convert(*x);
387
write!(buf, "\"{duration}\"").unwrap();
388
} else {
389
buf.extend_from_slice(b"null")
390
}
391
};
392
393
materialize_serializer(f, array.iter(), offset, take)
394
}
395
396
fn time_serializer<'a, T, F>(
397
array: &'a PrimitiveArray<T>,
398
convert: F,
399
offset: usize,
400
take: usize,
401
) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync>
402
where
403
T: NativeType,
404
F: Fn(T) -> NaiveTime + 'static + Send + Sync,
405
{
406
let f = move |x: Option<&T>, buf: &mut Vec<u8>| {
407
if let Some(x) = x {
408
let time = convert(*x);
409
write!(buf, "\"{time}\"").unwrap();
410
} else {
411
buf.extend_from_slice(b"null")
412
}
413
};
414
415
materialize_serializer(f, array.iter(), offset, take)
416
}
417
418
fn timestamp_serializer<'a, F>(
419
array: &'a PrimitiveArray<i64>,
420
convert: F,
421
offset: usize,
422
take: usize,
423
) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync>
424
where
425
F: Fn(i64) -> NaiveDateTime + 'static + Send + Sync,
426
{
427
let f = move |x: Option<&i64>, buf: &mut Vec<u8>| {
428
if let Some(x) = x {
429
let ndt = convert(*x);
430
write!(buf, "\"{ndt}\"").unwrap();
431
} else {
432
buf.extend_from_slice(b"null")
433
}
434
};
435
materialize_serializer(f, array.iter(), offset, take)
436
}
437
438
fn timestamp_tz_serializer<'a>(
439
array: &'a PrimitiveArray<i64>,
440
time_unit: TimeUnit,
441
tz: &str,
442
offset: usize,
443
take: usize,
444
) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync> {
445
match parse_offset(tz) {
446
Ok(parsed_tz) => {
447
let f = move |x: Option<&i64>, buf: &mut Vec<u8>| {
448
if let Some(x) = x {
449
let dt_str = timestamp_to_datetime(*x, time_unit, &parsed_tz).to_rfc3339();
450
write!(buf, "\"{dt_str}\"").unwrap();
451
} else {
452
buf.extend_from_slice(b"null")
453
}
454
};
455
456
materialize_serializer(f, array.iter(), offset, take)
457
},
458
#[cfg(feature = "timezones")]
459
_ => match parse_offset_tz(tz) {
460
Ok(parsed_tz) => {
461
let f = move |x: Option<&i64>, buf: &mut Vec<u8>| {
462
if let Some(x) = x {
463
let dt_str = timestamp_to_datetime(*x, time_unit, &parsed_tz).to_rfc3339();
464
write!(buf, "\"{dt_str}\"").unwrap();
465
} else {
466
buf.extend_from_slice(b"null")
467
}
468
};
469
470
materialize_serializer(f, array.iter(), offset, take)
471
},
472
_ => {
473
panic!("Timezone {tz} is invalid or not supported");
474
},
475
},
476
#[cfg(not(feature = "timezones"))]
477
_ => {
478
panic!("Invalid Offset format (must be [-]00:00) or timezones feature not active");
479
},
480
}
481
}
482
483
pub fn new_serializer<'a>(
484
array: &'a dyn Array,
485
offset: usize,
486
take: usize,
487
) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync> {
488
match array.dtype().to_storage() {
489
ArrowDataType::Boolean => {
490
boolean_serializer(array.as_any().downcast_ref().unwrap(), offset, take)
491
},
492
ArrowDataType::Int8 => {
493
primitive_serializer::<i8>(array.as_any().downcast_ref().unwrap(), offset, take)
494
},
495
ArrowDataType::Int16 => {
496
primitive_serializer::<i16>(array.as_any().downcast_ref().unwrap(), offset, take)
497
},
498
ArrowDataType::Int32 => {
499
primitive_serializer::<i32>(array.as_any().downcast_ref().unwrap(), offset, take)
500
},
501
ArrowDataType::Int64 => {
502
primitive_serializer::<i64>(array.as_any().downcast_ref().unwrap(), offset, take)
503
},
504
ArrowDataType::Int128 => {
505
primitive_serializer::<i128>(array.as_any().downcast_ref().unwrap(), offset, take)
506
},
507
ArrowDataType::UInt8 => {
508
primitive_serializer::<u8>(array.as_any().downcast_ref().unwrap(), offset, take)
509
},
510
ArrowDataType::UInt16 => {
511
primitive_serializer::<u16>(array.as_any().downcast_ref().unwrap(), offset, take)
512
},
513
ArrowDataType::UInt32 => {
514
primitive_serializer::<u32>(array.as_any().downcast_ref().unwrap(), offset, take)
515
},
516
ArrowDataType::UInt64 => {
517
primitive_serializer::<u64>(array.as_any().downcast_ref().unwrap(), offset, take)
518
},
519
ArrowDataType::UInt128 => {
520
primitive_serializer::<u128>(array.as_any().downcast_ref().unwrap(), offset, take)
521
},
522
ArrowDataType::Float16 => {
523
float16_serializer(array.as_any().downcast_ref().unwrap(), offset, take)
524
},
525
ArrowDataType::Float32 => {
526
float_serializer::<f32>(array.as_any().downcast_ref().unwrap(), offset, take)
527
},
528
ArrowDataType::Float64 => {
529
float_serializer::<f64>(array.as_any().downcast_ref().unwrap(), offset, take)
530
},
531
#[cfg(feature = "dtype-decimal")]
532
ArrowDataType::Decimal(_, scale) => {
533
decimal_serializer(array.as_any().downcast_ref().unwrap(), *scale, offset, take)
534
},
535
ArrowDataType::LargeUtf8 => {
536
utf8_serializer::<i64>(array.as_any().downcast_ref().unwrap(), offset, take)
537
},
538
ArrowDataType::Utf8View => {
539
utf8view_serializer(array.as_any().downcast_ref().unwrap(), offset, take)
540
},
541
ArrowDataType::Struct(_) => {
542
struct_serializer(array.as_any().downcast_ref().unwrap(), offset, take)
543
},
544
ArrowDataType::FixedSizeList(_, _) => {
545
fixed_size_list_serializer(array.as_any().downcast_ref().unwrap(), offset, take)
546
},
547
ArrowDataType::LargeList(_) => {
548
list_serializer::<i64>(array.as_any().downcast_ref().unwrap(), offset, take)
549
},
550
ArrowDataType::Dictionary(k, v, _) => match (k, &**v) {
551
(IntegerType::UInt8, ArrowDataType::Utf8View) => {
552
let array = array
553
.as_any()
554
.downcast_ref::<DictionaryArray<u8>>()
555
.unwrap();
556
dictionary_utf8view_serializer::<u8>(array, offset, take)
557
},
558
(IntegerType::UInt16, ArrowDataType::Utf8View) => {
559
let array = array
560
.as_any()
561
.downcast_ref::<DictionaryArray<u16>>()
562
.unwrap();
563
dictionary_utf8view_serializer::<u16>(array, offset, take)
564
},
565
(IntegerType::UInt32, ArrowDataType::Utf8View) => {
566
let array = array
567
.as_any()
568
.downcast_ref::<DictionaryArray<u32>>()
569
.unwrap();
570
dictionary_utf8view_serializer::<u32>(array, offset, take)
571
},
572
_ => {
573
// Not produced by polars
574
unreachable!()
575
},
576
},
577
ArrowDataType::Date32 => date_serializer(
578
array.as_any().downcast_ref().unwrap(),
579
date32_to_date,
580
offset,
581
take,
582
),
583
ArrowDataType::Timestamp(tu, None) => {
584
let convert = match tu {
585
TimeUnit::Nanosecond => timestamp_ns_to_datetime,
586
TimeUnit::Microsecond => timestamp_us_to_datetime,
587
TimeUnit::Millisecond => timestamp_ms_to_datetime,
588
tu => panic!("Invalid time unit '{tu:?}' for Datetime."),
589
};
590
timestamp_serializer(
591
array.as_any().downcast_ref().unwrap(),
592
convert,
593
offset,
594
take,
595
)
596
},
597
ArrowDataType::Timestamp(time_unit, Some(tz)) => timestamp_tz_serializer(
598
array.as_any().downcast_ref().unwrap(),
599
*time_unit,
600
tz,
601
offset,
602
take,
603
),
604
ArrowDataType::Duration(tu) => {
605
let convert = match tu {
606
TimeUnit::Nanosecond => duration_ns_to_duration,
607
TimeUnit::Microsecond => duration_us_to_duration,
608
TimeUnit::Millisecond => duration_ms_to_duration,
609
tu => panic!("Invalid time unit '{tu:?}' for Duration."),
610
};
611
duration_serializer(
612
array.as_any().downcast_ref().unwrap(),
613
convert,
614
offset,
615
take,
616
)
617
},
618
ArrowDataType::Time64(tu) => {
619
let convert = match tu {
620
TimeUnit::Nanosecond => time64ns_to_time,
621
tu => panic!("Invalid time unit '{tu:?}' for Time."),
622
};
623
time_serializer(
624
array.as_any().downcast_ref().unwrap(),
625
convert,
626
offset,
627
take,
628
)
629
},
630
ArrowDataType::Null => null_serializer(array.len(), offset, take),
631
other => todo!("Writing {:?} to JSON", other),
632
}
633
}
634
635
fn serialize_item<'a>(
636
buffer: &mut Vec<u8>,
637
record: impl Iterator<Item = (&'a str, &'a [u8])>,
638
is_first_row: bool,
639
) {
640
if !is_first_row {
641
buffer.push(b',');
642
}
643
buffer.push(b'{');
644
let mut first_item = true;
645
for (key, value) in record {
646
if !first_item {
647
buffer.push(b',');
648
}
649
first_item = false;
650
utf8::write_str(buffer, key).unwrap();
651
buffer.push(b':');
652
buffer.extend(value);
653
}
654
buffer.push(b'}');
655
}
656
657
/// Serializes `array` to a valid JSON to `buffer`
658
/// # Implementation
659
/// This operation is CPU-bounded
660
pub(crate) fn serialize(array: &dyn Array, buffer: &mut Vec<u8>) {
661
let mut serializer = new_serializer(array, 0, usize::MAX);
662
663
(0..array.len()).for_each(|i| {
664
if i != 0 {
665
buffer.push(b',');
666
}
667
buffer.extend_from_slice(serializer.next().unwrap());
668
});
669
}
670
671