Path: blob/main/crates/polars-json/src/json/write/serialize.rs
6939 views
use std::io::Write;12use arrow::array::*;3use arrow::bitmap::utils::ZipValidity;4#[cfg(feature = "dtype-decimal")]5use arrow::compute::decimal::get_trim_decimal_zeros;6use arrow::datatypes::{ArrowDataType, IntegerType, TimeUnit};7use arrow::io::iterator::BufStreamingIterator;8use arrow::offset::Offset;9#[cfg(feature = "timezones")]10use arrow::temporal_conversions::parse_offset_tz;11use arrow::temporal_conversions::{12date32_to_date, duration_ms_to_duration, duration_ns_to_duration, duration_us_to_duration,13parse_offset, time64ns_to_time, timestamp_ms_to_datetime, timestamp_ns_to_datetime,14timestamp_to_datetime, timestamp_us_to_datetime,15};16use arrow::types::NativeType;17use chrono::{Duration, NaiveDate, NaiveDateTime, NaiveTime};18use streaming_iterator::StreamingIterator;1920use super::utf8;2122fn write_integer<I: itoa::Integer>(buf: &mut Vec<u8>, val: I) {23let mut buffer = itoa::Buffer::new();24let value = buffer.format(val);25buf.extend_from_slice(value.as_bytes())26}2728fn write_float<I: ryu::Float>(f: &mut Vec<u8>, val: I) {29let mut buffer = ryu::Buffer::new();30let value = buffer.format(val);31f.extend_from_slice(value.as_bytes())32}3334fn materialize_serializer<'a, I, F, T>(35f: F,36iterator: I,37offset: usize,38take: usize,39) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync>40where41T: 'a,42I: Iterator<Item = T> + Send + Sync + 'a,43F: FnMut(T, &mut Vec<u8>) + Send + Sync + 'a,44{45if offset > 0 || take < usize::MAX {46Box::new(BufStreamingIterator::new(47iterator.skip(offset).take(take),48f,49vec![],50))51} else {52Box::new(BufStreamingIterator::new(iterator, f, vec![]))53}54}5556fn boolean_serializer<'a>(57array: &'a BooleanArray,58offset: usize,59take: usize,60) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {61let f = |x: Option<bool>, buf: &mut Vec<u8>| match x {62Some(true) => buf.extend_from_slice(b"true"),63Some(false) => buf.extend_from_slice(b"false"),64None => buf.extend_from_slice(b"null"),65};66materialize_serializer(f, array.iter(), offset, take)67}6869fn null_serializer(70len: usize,71offset: usize,72take: usize,73) -> Box<dyn StreamingIterator<Item = [u8]> + Send + Sync> {74let f = |_x: (), buf: &mut Vec<u8>| buf.extend_from_slice(b"null");75materialize_serializer(f, std::iter::repeat_n((), len), offset, take)76}7778fn primitive_serializer<'a, T: NativeType + itoa::Integer>(79array: &'a PrimitiveArray<T>,80offset: usize,81take: usize,82) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {83let f = |x: Option<&T>, buf: &mut Vec<u8>| {84if let Some(x) = x {85write_integer(buf, *x)86} else {87buf.extend(b"null")88}89};90materialize_serializer(f, array.iter(), offset, take)91}9293fn float_serializer<'a, T>(94array: &'a PrimitiveArray<T>,95offset: usize,96take: usize,97) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync>98where99T: num_traits::Float + NativeType + ryu::Float,100{101let f = |x: Option<&T>, buf: &mut Vec<u8>| {102if let Some(x) = x {103if T::is_nan(*x) || T::is_infinite(*x) {104buf.extend(b"null")105} else {106write_float(buf, *x)107}108} else {109buf.extend(b"null")110}111};112113materialize_serializer(f, array.iter(), offset, take)114}115116#[cfg(feature = "dtype-decimal")]117fn decimal_serializer<'a>(118array: &'a PrimitiveArray<i128>,119scale: usize,120offset: usize,121take: usize,122) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {123let trim_zeros = get_trim_decimal_zeros();124let mut fmt_buf = arrow::compute::decimal::DecimalFmtBuffer::new();125let f = move |x: Option<&i128>, buf: &mut Vec<u8>| {126if let Some(x) = x {127utf8::write_str(buf, fmt_buf.format(*x, scale, trim_zeros)).unwrap()128} else {129buf.extend(b"null")130}131};132133materialize_serializer(f, array.iter(), offset, take)134}135136fn dictionary_utf8view_serializer<'a, K: DictionaryKey>(137array: &'a DictionaryArray<K>,138offset: usize,139take: usize,140) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {141let iter = array.iter_typed::<Utf8ViewArray>().unwrap().skip(offset);142let f = |x: Option<&str>, buf: &mut Vec<u8>| {143if let Some(x) = x {144utf8::write_str(buf, x).unwrap();145} else {146buf.extend_from_slice(b"null")147}148};149materialize_serializer(f, iter, offset, take)150}151152fn utf8_serializer<'a, O: Offset>(153array: &'a Utf8Array<O>,154offset: usize,155take: usize,156) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {157let f = |x: Option<&str>, buf: &mut Vec<u8>| {158if let Some(x) = x {159utf8::write_str(buf, x).unwrap();160} else {161buf.extend_from_slice(b"null")162}163};164materialize_serializer(f, array.iter(), offset, take)165}166167fn utf8view_serializer<'a>(168array: &'a Utf8ViewArray,169offset: usize,170take: usize,171) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {172let f = |x: Option<&str>, buf: &mut Vec<u8>| {173if let Some(x) = x {174utf8::write_str(buf, x).unwrap();175} else {176buf.extend_from_slice(b"null")177}178};179materialize_serializer(f, array.iter(), offset, take)180}181182fn struct_serializer<'a>(183array: &'a StructArray,184offset: usize,185take: usize,186) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {187// {"a": [1, 2, 3], "b": [a, b, c], "c": {"a": [1, 2, 3]}}188// [189// {"a": 1, "b": a, "c": {"a": 1}},190// {"a": 2, "b": b, "c": {"a": 2}},191// {"a": 3, "b": c, "c": {"a": 3}},192// ]193//194let mut serializers = array195.values()196.iter()197.map(|x| x.as_ref())198.map(|arr| new_serializer(arr, offset, take))199.collect::<Vec<_>>();200201Box::new(BufStreamingIterator::new(202ZipValidity::new_with_validity(0..array.len(), array.validity()),203move |maybe, buf| {204if maybe.is_some() {205let names = array.fields().iter().map(|f| f.name.as_str());206serialize_item(207buf,208names.zip(209serializers210.iter_mut()211.map(|serializer| serializer.next().unwrap()),212),213true,214);215} else {216serializers.iter_mut().for_each(|iter| {217let _ = iter.next();218});219buf.extend(b"null");220}221},222vec![],223))224}225226fn list_serializer<'a, O: Offset>(227array: &'a ListArray<O>,228offset: usize,229take: usize,230) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {231// [[1, 2], [3]]232// [233// [1, 2],234// [3]235// ]236//237let offsets = array.offsets().as_slice();238let start = offsets[0].to_usize();239let end = offsets.last().unwrap().to_usize();240let mut serializer = new_serializer(array.values().as_ref(), start, end - start);241242let mut prev_offset = start;243let f = move |offset: Option<&[O]>, buf: &mut Vec<u8>| {244if let Some(offset) = offset {245if offset[0].to_usize() > prev_offset {246for _ in 0..(offset[0].to_usize() - prev_offset) {247serializer.next().unwrap();248}249}250251let length = (offset[1] - offset[0]).to_usize();252buf.push(b'[');253let mut is_first_row = true;254for _ in 0..length {255if !is_first_row {256buf.push(b',');257}258is_first_row = false;259buf.extend(serializer.next().unwrap());260}261buf.push(b']');262prev_offset = offset[1].to_usize();263} else {264buf.extend(b"null");265}266};267268let iter =269ZipValidity::new_with_validity(array.offsets().buffer().windows(2), array.validity());270materialize_serializer(f, iter, offset, take)271}272273fn fixed_size_list_serializer<'a>(274array: &'a FixedSizeListArray,275offset: usize,276take: usize,277) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {278let mut serializer = new_serializer(279array.values().as_ref(),280offset * array.size(),281take * array.size(),282);283284Box::new(BufStreamingIterator::new(285ZipValidity::new(0..array.len(), array.validity().map(|x| x.iter())),286move |ix, buf| {287if ix.is_some() {288let length = array.size();289buf.push(b'[');290let mut is_first_row = true;291for _ in 0..length {292if !is_first_row {293buf.push(b',');294}295is_first_row = false;296buf.extend(serializer.next().unwrap());297}298buf.push(b']');299} else {300buf.extend(b"null");301}302},303vec![],304))305}306307fn date_serializer<'a, T, F>(308array: &'a PrimitiveArray<T>,309convert: F,310offset: usize,311take: usize,312) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync>313where314T: NativeType,315F: Fn(T) -> NaiveDate + 'static + Send + Sync,316{317let f = move |x: Option<&T>, buf: &mut Vec<u8>| {318if let Some(x) = x {319let nd = convert(*x);320write!(buf, "\"{nd}\"").unwrap();321} else {322buf.extend_from_slice(b"null")323}324};325326materialize_serializer(f, array.iter(), offset, take)327}328329fn duration_serializer<'a, T, F>(330array: &'a PrimitiveArray<T>,331convert: F,332offset: usize,333take: usize,334) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync>335where336T: NativeType,337F: Fn(T) -> Duration + 'static + Send + Sync,338{339let f = move |x: Option<&T>, buf: &mut Vec<u8>| {340if let Some(x) = x {341let duration = convert(*x);342write!(buf, "\"{duration}\"").unwrap();343} else {344buf.extend_from_slice(b"null")345}346};347348materialize_serializer(f, array.iter(), offset, take)349}350351fn time_serializer<'a, T, F>(352array: &'a PrimitiveArray<T>,353convert: F,354offset: usize,355take: usize,356) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync>357where358T: NativeType,359F: Fn(T) -> NaiveTime + 'static + Send + Sync,360{361let f = move |x: Option<&T>, buf: &mut Vec<u8>| {362if let Some(x) = x {363let time = convert(*x);364write!(buf, "\"{time}\"").unwrap();365} else {366buf.extend_from_slice(b"null")367}368};369370materialize_serializer(f, array.iter(), offset, take)371}372373fn timestamp_serializer<'a, F>(374array: &'a PrimitiveArray<i64>,375convert: F,376offset: usize,377take: usize,378) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync>379where380F: Fn(i64) -> NaiveDateTime + 'static + Send + Sync,381{382let f = move |x: Option<&i64>, buf: &mut Vec<u8>| {383if let Some(x) = x {384let ndt = convert(*x);385write!(buf, "\"{ndt}\"").unwrap();386} else {387buf.extend_from_slice(b"null")388}389};390materialize_serializer(f, array.iter(), offset, take)391}392393fn timestamp_tz_serializer<'a>(394array: &'a PrimitiveArray<i64>,395time_unit: TimeUnit,396tz: &str,397offset: usize,398take: usize,399) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {400match parse_offset(tz) {401Ok(parsed_tz) => {402let f = move |x: Option<&i64>, buf: &mut Vec<u8>| {403if let Some(x) = x {404let dt_str = timestamp_to_datetime(*x, time_unit, &parsed_tz).to_rfc3339();405write!(buf, "\"{dt_str}\"").unwrap();406} else {407buf.extend_from_slice(b"null")408}409};410411materialize_serializer(f, array.iter(), offset, take)412},413#[cfg(feature = "timezones")]414_ => match parse_offset_tz(tz) {415Ok(parsed_tz) => {416let f = move |x: Option<&i64>, buf: &mut Vec<u8>| {417if let Some(x) = x {418let dt_str = timestamp_to_datetime(*x, time_unit, &parsed_tz).to_rfc3339();419write!(buf, "\"{dt_str}\"").unwrap();420} else {421buf.extend_from_slice(b"null")422}423};424425materialize_serializer(f, array.iter(), offset, take)426},427_ => {428panic!("Timezone {tz} is invalid or not supported");429},430},431#[cfg(not(feature = "timezones"))]432_ => {433panic!("Invalid Offset format (must be [-]00:00) or timezones feature not active");434},435}436}437438pub(crate) fn new_serializer<'a>(439array: &'a dyn Array,440offset: usize,441take: usize,442) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {443match array.dtype().to_logical_type() {444ArrowDataType::Boolean => {445boolean_serializer(array.as_any().downcast_ref().unwrap(), offset, take)446},447ArrowDataType::Int8 => {448primitive_serializer::<i8>(array.as_any().downcast_ref().unwrap(), offset, take)449},450ArrowDataType::Int16 => {451primitive_serializer::<i16>(array.as_any().downcast_ref().unwrap(), offset, take)452},453ArrowDataType::Int32 => {454primitive_serializer::<i32>(array.as_any().downcast_ref().unwrap(), offset, take)455},456ArrowDataType::Int64 => {457primitive_serializer::<i64>(array.as_any().downcast_ref().unwrap(), offset, take)458},459ArrowDataType::UInt8 => {460primitive_serializer::<u8>(array.as_any().downcast_ref().unwrap(), offset, take)461},462ArrowDataType::UInt16 => {463primitive_serializer::<u16>(array.as_any().downcast_ref().unwrap(), offset, take)464},465ArrowDataType::UInt32 => {466primitive_serializer::<u32>(array.as_any().downcast_ref().unwrap(), offset, take)467},468ArrowDataType::UInt64 => {469primitive_serializer::<u64>(array.as_any().downcast_ref().unwrap(), offset, take)470},471ArrowDataType::Float32 => {472float_serializer::<f32>(array.as_any().downcast_ref().unwrap(), offset, take)473},474ArrowDataType::Float64 => {475float_serializer::<f64>(array.as_any().downcast_ref().unwrap(), offset, take)476},477#[cfg(feature = "dtype-decimal")]478ArrowDataType::Decimal(_, scale) => {479decimal_serializer(array.as_any().downcast_ref().unwrap(), *scale, offset, take)480},481ArrowDataType::LargeUtf8 => {482utf8_serializer::<i64>(array.as_any().downcast_ref().unwrap(), offset, take)483},484ArrowDataType::Utf8View => {485utf8view_serializer(array.as_any().downcast_ref().unwrap(), offset, take)486},487ArrowDataType::Struct(_) => {488struct_serializer(array.as_any().downcast_ref().unwrap(), offset, take)489},490ArrowDataType::FixedSizeList(_, _) => {491fixed_size_list_serializer(array.as_any().downcast_ref().unwrap(), offset, take)492},493ArrowDataType::LargeList(_) => {494list_serializer::<i64>(array.as_any().downcast_ref().unwrap(), offset, take)495},496ArrowDataType::Dictionary(k, v, _) => match (k, &**v) {497(IntegerType::UInt8, ArrowDataType::Utf8View) => {498let array = array499.as_any()500.downcast_ref::<DictionaryArray<u8>>()501.unwrap();502dictionary_utf8view_serializer::<u8>(array, offset, take)503},504(IntegerType::UInt16, ArrowDataType::Utf8View) => {505let array = array506.as_any()507.downcast_ref::<DictionaryArray<u16>>()508.unwrap();509dictionary_utf8view_serializer::<u16>(array, offset, take)510},511(IntegerType::UInt32, ArrowDataType::Utf8View) => {512let array = array513.as_any()514.downcast_ref::<DictionaryArray<u32>>()515.unwrap();516dictionary_utf8view_serializer::<u32>(array, offset, take)517},518_ => {519// Not produced by polars520unreachable!()521},522},523ArrowDataType::Date32 => date_serializer(524array.as_any().downcast_ref().unwrap(),525date32_to_date,526offset,527take,528),529ArrowDataType::Timestamp(tu, None) => {530let convert = match tu {531TimeUnit::Nanosecond => timestamp_ns_to_datetime,532TimeUnit::Microsecond => timestamp_us_to_datetime,533TimeUnit::Millisecond => timestamp_ms_to_datetime,534tu => panic!("Invalid time unit '{tu:?}' for Datetime."),535};536timestamp_serializer(537array.as_any().downcast_ref().unwrap(),538convert,539offset,540take,541)542},543ArrowDataType::Timestamp(time_unit, Some(tz)) => timestamp_tz_serializer(544array.as_any().downcast_ref().unwrap(),545*time_unit,546tz,547offset,548take,549),550ArrowDataType::Duration(tu) => {551let convert = match tu {552TimeUnit::Nanosecond => duration_ns_to_duration,553TimeUnit::Microsecond => duration_us_to_duration,554TimeUnit::Millisecond => duration_ms_to_duration,555tu => panic!("Invalid time unit '{tu:?}' for Duration."),556};557duration_serializer(558array.as_any().downcast_ref().unwrap(),559convert,560offset,561take,562)563},564ArrowDataType::Time64(tu) => {565let convert = match tu {566TimeUnit::Nanosecond => time64ns_to_time,567tu => panic!("Invalid time unit '{tu:?}' for Time."),568};569time_serializer(570array.as_any().downcast_ref().unwrap(),571convert,572offset,573take,574)575},576ArrowDataType::Null => null_serializer(array.len(), offset, take),577other => todo!("Writing {:?} to JSON", other),578}579}580581fn serialize_item<'a>(582buffer: &mut Vec<u8>,583record: impl Iterator<Item = (&'a str, &'a [u8])>,584is_first_row: bool,585) {586if !is_first_row {587buffer.push(b',');588}589buffer.push(b'{');590let mut first_item = true;591for (key, value) in record {592if !first_item {593buffer.push(b',');594}595first_item = false;596utf8::write_str(buffer, key).unwrap();597buffer.push(b':');598buffer.extend(value);599}600buffer.push(b'}');601}602603/// Serializes `array` to a valid JSON to `buffer`604/// # Implementation605/// This operation is CPU-bounded606pub(crate) fn serialize(array: &dyn Array, buffer: &mut Vec<u8>) {607let mut serializer = new_serializer(array, 0, usize::MAX);608609(0..array.len()).for_each(|i| {610if i != 0 {611buffer.push(b',');612}613buffer.extend_from_slice(serializer.next().unwrap());614});615}616617618