Path: blob/main/crates/polars-json/src/json/write/serialize.rs
8407 views
use std::io::Write;12use arrow::array::*;3use arrow::bitmap::utils::ZipValidity;4#[cfg(feature = "dtype-decimal")]5use arrow::compute::decimal::get_trim_decimal_zeros;6use arrow::datatypes::{ArrowDataType, IntegerType, TimeUnit};7use arrow::io::iterator::BufStreamingIterator;8use arrow::offset::Offset;9#[cfg(feature = "timezones")]10use arrow::temporal_conversions::parse_offset_tz;11use arrow::temporal_conversions::{12date32_to_date, duration_ms_to_duration, duration_ns_to_duration, duration_us_to_duration,13parse_offset, time64ns_to_time, timestamp_ms_to_datetime, timestamp_ns_to_datetime,14timestamp_to_datetime, timestamp_us_to_datetime,15};16use arrow::types::NativeType;17use chrono::{Duration, NaiveDate, NaiveDateTime, NaiveTime};18use num_traits::{Float, ToPrimitive};19use polars_utils::float16::pf16;20use streaming_iterator::StreamingIterator;2122use super::utf8;2324fn write_integer<I: itoa::Integer>(buf: &mut Vec<u8>, val: I) {25let mut buffer = itoa::Buffer::new();26let value = buffer.format(val);27buf.extend_from_slice(value.as_bytes())28}2930fn write_float<I: zmij::Float>(f: &mut Vec<u8>, val: I) {31let mut buffer = zmij::Buffer::new();32let value = buffer.format(val);33f.extend_from_slice(value.as_bytes())34}3536pub trait JsonSerializer: StreamingIterator<Item = [u8]> {37/// Serializes all rows directly into `to`, bypassing the `BufStreamingIterator` buffer.38fn serialize_json_lines_to_vec(self: Box<Self>, to: &mut Vec<u8>, n: usize);39}4041impl<I, F, T> JsonSerializer for BufStreamingIterator<I, F, T>42where43I: Iterator<Item = T>,44F: FnMut(T, &mut Vec<u8>),45{46fn serialize_json_lines_to_vec(self: Box<Self>, to: &mut Vec<u8>, n: usize) {47let (mut iterator, mut f) = self.into_inner();4849for _ in 0..n {50if let Some(v) = iterator.next() {51f(v, to);52to.push(b'\n')53}54}55}56}5758fn materialize_serializer<'a, I, F, T>(59f: F,60iterator: I,61offset: usize,62take: usize,63) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync>64where65T: 'a,66I: Iterator<Item = T> + Send + Sync + 'a,67F: FnMut(T, &mut Vec<u8>) + Send + Sync + 'a,68{69if offset > 0 || take < usize::MAX {70Box::new(BufStreamingIterator::new(71iterator.skip(offset).take(take),72f,73vec![],74))75} else {76Box::new(BufStreamingIterator::new(iterator, f, vec![]))77}78}7980fn boolean_serializer<'a>(81array: &'a BooleanArray,82offset: usize,83take: usize,84) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync> {85let f = |x: Option<bool>, buf: &mut Vec<u8>| match x {86Some(true) => buf.extend_from_slice(b"true"),87Some(false) => buf.extend_from_slice(b"false"),88None => buf.extend_from_slice(b"null"),89};90materialize_serializer(f, array.iter(), offset, take)91}9293fn null_serializer(94len: usize,95offset: usize,96take: usize,97) -> Box<dyn JsonSerializer<Item = [u8]> + Send + Sync> {98let f = |_x: (), buf: &mut Vec<u8>| buf.extend_from_slice(b"null");99materialize_serializer(f, std::iter::repeat_n((), len), offset, take)100}101102fn primitive_serializer<'a, T: NativeType + itoa::Integer>(103array: &'a PrimitiveArray<T>,104offset: usize,105take: usize,106) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync> {107let f = |x: Option<&T>, buf: &mut Vec<u8>| {108if let Some(x) = x {109write_integer(buf, *x)110} else {111buf.extend(b"null")112}113};114materialize_serializer(f, array.iter(), offset, take)115}116117fn float_serializer<'a, T>(118array: &'a PrimitiveArray<T>,119offset: usize,120take: usize,121) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync>122where123T: num_traits::Float + NativeType + zmij::Float,124{125let f = |x: Option<&T>, buf: &mut Vec<u8>| {126if let Some(x) = x {127if T::is_nan(*x) || T::is_infinite(*x) {128buf.extend(b"null")129} else {130write_float(buf, *x)131}132} else {133buf.extend(b"null")134}135};136137materialize_serializer(f, array.iter(), offset, take)138}139140fn float16_serializer<'a>(141array: &'a PrimitiveArray<pf16>,142offset: usize,143take: usize,144) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync> {145let f = |x: Option<&pf16>, buf: &mut Vec<u8>| {146if let Some(x) = x {147if pf16::is_nan(*x) || pf16::is_infinite(*x) {148buf.extend(b"null")149} else {150write_float(buf, x.to_f32().unwrap())151}152} else {153buf.extend(b"null")154}155};156157materialize_serializer(f, array.iter(), offset, take)158}159160#[cfg(feature = "dtype-decimal")]161fn decimal_serializer<'a>(162array: &'a PrimitiveArray<i128>,163scale: usize,164offset: usize,165take: usize,166) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync> {167let trim_zeros = get_trim_decimal_zeros();168let mut fmt_buf = polars_compute::decimal::DecimalFmtBuffer::new();169let f = move |x: Option<&i128>, buf: &mut Vec<u8>| {170if let Some(x) = x {171utf8::write_str(buf, fmt_buf.format_dec128(*x, scale, trim_zeros, false)).unwrap()172} else {173buf.extend(b"null")174}175};176177materialize_serializer(f, array.iter(), offset, take)178}179180fn dictionary_utf8view_serializer<'a, K: DictionaryKey>(181array: &'a DictionaryArray<K>,182offset: usize,183take: usize,184) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync> {185let iter = array.iter_typed::<Utf8ViewArray>().unwrap().skip(offset);186let f = |x: Option<&str>, buf: &mut Vec<u8>| {187if let Some(x) = x {188utf8::write_str(buf, x).unwrap();189} else {190buf.extend_from_slice(b"null")191}192};193materialize_serializer(f, iter, offset, take)194}195196fn utf8_serializer<'a, O: Offset>(197array: &'a Utf8Array<O>,198offset: usize,199take: usize,200) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync> {201let f = |x: Option<&str>, buf: &mut Vec<u8>| {202if let Some(x) = x {203utf8::write_str(buf, x).unwrap();204} else {205buf.extend_from_slice(b"null")206}207};208materialize_serializer(f, array.iter(), offset, take)209}210211fn utf8view_serializer<'a>(212array: &'a Utf8ViewArray,213offset: usize,214take: usize,215) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync> {216let f = |x: Option<&str>, buf: &mut Vec<u8>| {217if let Some(x) = x {218utf8::write_str(buf, x).unwrap();219} else {220buf.extend_from_slice(b"null")221}222};223materialize_serializer(f, array.iter(), offset, take)224}225226fn struct_serializer<'a>(227array: &'a StructArray,228offset: usize,229take: usize,230) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync> {231// {"a": [1, 2, 3], "b": [a, b, c], "c": {"a": [1, 2, 3]}}232// [233// {"a": 1, "b": a, "c": {"a": 1}},234// {"a": 2, "b": b, "c": {"a": 2}},235// {"a": 3, "b": c, "c": {"a": 3}},236// ]237//238let mut serializers = array239.values()240.iter()241.map(|x| x.as_ref())242.map(|arr| new_serializer(arr, offset, take))243.collect::<Vec<_>>();244245Box::new(BufStreamingIterator::new(246ZipValidity::new_with_validity(0..array.len(), array.validity()),247move |maybe, buf| {248if maybe.is_some() {249let names = array.fields().iter().map(|f| f.name.as_str());250serialize_item(251buf,252names.zip(253serializers254.iter_mut()255.map(|serializer| serializer.next().unwrap()),256),257true,258);259} else {260serializers.iter_mut().for_each(|iter| {261let _ = iter.next();262});263buf.extend(b"null");264}265},266vec![],267))268}269270fn list_serializer<'a, O: Offset>(271array: &'a ListArray<O>,272offset: usize,273take: usize,274) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync> {275// [[1, 2], [3]]276// [277// [1, 2],278// [3]279// ]280//281let offsets = array.offsets().as_slice();282let start = offsets[0].to_usize();283let end = offsets.last().unwrap().to_usize();284let mut serializer = new_serializer(array.values().as_ref(), start, end - start);285286let mut prev_offset = start;287let f = move |offset: Option<&[O]>, buf: &mut Vec<u8>| {288if let Some(offset) = offset {289if offset[0].to_usize() > prev_offset {290for _ in 0..(offset[0].to_usize() - prev_offset) {291serializer.next().unwrap();292}293}294295let length = (offset[1] - offset[0]).to_usize();296buf.push(b'[');297let mut is_first_row = true;298for _ in 0..length {299if !is_first_row {300buf.push(b',');301}302is_first_row = false;303buf.extend(serializer.next().unwrap());304}305buf.push(b']');306prev_offset = offset[1].to_usize();307} else {308buf.extend(b"null");309}310};311312let iter =313ZipValidity::new_with_validity(array.offsets().buffer().windows(2), array.validity());314materialize_serializer(f, iter, offset, take)315}316317fn fixed_size_list_serializer<'a>(318array: &'a FixedSizeListArray,319offset: usize,320take: usize,321) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync> {322let mut serializer = new_serializer(323array.values().as_ref(),324offset * array.size(),325take * array.size(),326);327328Box::new(BufStreamingIterator::new(329ZipValidity::new(0..array.len(), array.validity().map(|x| x.iter())),330move |ix, buf| {331if ix.is_some() {332let length = array.size();333buf.push(b'[');334let mut is_first_row = true;335for _ in 0..length {336if !is_first_row {337buf.push(b',');338}339is_first_row = false;340buf.extend(serializer.next().unwrap());341}342buf.push(b']');343} else {344buf.extend(b"null");345}346},347vec![],348))349}350351fn date_serializer<'a, T, F>(352array: &'a PrimitiveArray<T>,353convert: F,354offset: usize,355take: usize,356) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync>357where358T: NativeType,359F: Fn(T) -> NaiveDate + 'static + Send + Sync,360{361let f = move |x: Option<&T>, buf: &mut Vec<u8>| {362if let Some(x) = x {363let nd = convert(*x);364write!(buf, "\"{nd}\"").unwrap();365} else {366buf.extend_from_slice(b"null")367}368};369370materialize_serializer(f, array.iter(), offset, take)371}372373fn duration_serializer<'a, T, F>(374array: &'a PrimitiveArray<T>,375convert: F,376offset: usize,377take: usize,378) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync>379where380T: NativeType,381F: Fn(T) -> Duration + 'static + Send + Sync,382{383let f = move |x: Option<&T>, buf: &mut Vec<u8>| {384if let Some(x) = x {385let duration = convert(*x);386write!(buf, "\"{duration}\"").unwrap();387} else {388buf.extend_from_slice(b"null")389}390};391392materialize_serializer(f, array.iter(), offset, take)393}394395fn time_serializer<'a, T, F>(396array: &'a PrimitiveArray<T>,397convert: F,398offset: usize,399take: usize,400) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync>401where402T: NativeType,403F: Fn(T) -> NaiveTime + 'static + Send + Sync,404{405let f = move |x: Option<&T>, buf: &mut Vec<u8>| {406if let Some(x) = x {407let time = convert(*x);408write!(buf, "\"{time}\"").unwrap();409} else {410buf.extend_from_slice(b"null")411}412};413414materialize_serializer(f, array.iter(), offset, take)415}416417fn timestamp_serializer<'a, F>(418array: &'a PrimitiveArray<i64>,419convert: F,420offset: usize,421take: usize,422) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync>423where424F: Fn(i64) -> NaiveDateTime + 'static + Send + Sync,425{426let f = move |x: Option<&i64>, buf: &mut Vec<u8>| {427if let Some(x) = x {428let ndt = convert(*x);429write!(buf, "\"{ndt}\"").unwrap();430} else {431buf.extend_from_slice(b"null")432}433};434materialize_serializer(f, array.iter(), offset, take)435}436437fn timestamp_tz_serializer<'a>(438array: &'a PrimitiveArray<i64>,439time_unit: TimeUnit,440tz: &str,441offset: usize,442take: usize,443) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync> {444match parse_offset(tz) {445Ok(parsed_tz) => {446let f = move |x: Option<&i64>, buf: &mut Vec<u8>| {447if let Some(x) = x {448let dt_str = timestamp_to_datetime(*x, time_unit, &parsed_tz).to_rfc3339();449write!(buf, "\"{dt_str}\"").unwrap();450} else {451buf.extend_from_slice(b"null")452}453};454455materialize_serializer(f, array.iter(), offset, take)456},457#[cfg(feature = "timezones")]458_ => match parse_offset_tz(tz) {459Ok(parsed_tz) => {460let f = move |x: Option<&i64>, buf: &mut Vec<u8>| {461if let Some(x) = x {462let dt_str = timestamp_to_datetime(*x, time_unit, &parsed_tz).to_rfc3339();463write!(buf, "\"{dt_str}\"").unwrap();464} else {465buf.extend_from_slice(b"null")466}467};468469materialize_serializer(f, array.iter(), offset, take)470},471_ => {472panic!("Timezone {tz} is invalid or not supported");473},474},475#[cfg(not(feature = "timezones"))]476_ => {477panic!("Invalid Offset format (must be [-]00:00) or timezones feature not active");478},479}480}481482pub fn new_serializer<'a>(483array: &'a dyn Array,484offset: usize,485take: usize,486) -> Box<dyn JsonSerializer<Item = [u8]> + 'a + Send + Sync> {487match array.dtype().to_storage() {488ArrowDataType::Boolean => {489boolean_serializer(array.as_any().downcast_ref().unwrap(), offset, take)490},491ArrowDataType::Int8 => {492primitive_serializer::<i8>(array.as_any().downcast_ref().unwrap(), offset, take)493},494ArrowDataType::Int16 => {495primitive_serializer::<i16>(array.as_any().downcast_ref().unwrap(), offset, take)496},497ArrowDataType::Int32 => {498primitive_serializer::<i32>(array.as_any().downcast_ref().unwrap(), offset, take)499},500ArrowDataType::Int64 => {501primitive_serializer::<i64>(array.as_any().downcast_ref().unwrap(), offset, take)502},503ArrowDataType::Int128 => {504primitive_serializer::<i128>(array.as_any().downcast_ref().unwrap(), offset, take)505},506ArrowDataType::UInt8 => {507primitive_serializer::<u8>(array.as_any().downcast_ref().unwrap(), offset, take)508},509ArrowDataType::UInt16 => {510primitive_serializer::<u16>(array.as_any().downcast_ref().unwrap(), offset, take)511},512ArrowDataType::UInt32 => {513primitive_serializer::<u32>(array.as_any().downcast_ref().unwrap(), offset, take)514},515ArrowDataType::UInt64 => {516primitive_serializer::<u64>(array.as_any().downcast_ref().unwrap(), offset, take)517},518ArrowDataType::UInt128 => {519primitive_serializer::<u128>(array.as_any().downcast_ref().unwrap(), offset, take)520},521ArrowDataType::Float16 => {522float16_serializer(array.as_any().downcast_ref().unwrap(), offset, take)523},524ArrowDataType::Float32 => {525float_serializer::<f32>(array.as_any().downcast_ref().unwrap(), offset, take)526},527ArrowDataType::Float64 => {528float_serializer::<f64>(array.as_any().downcast_ref().unwrap(), offset, take)529},530#[cfg(feature = "dtype-decimal")]531ArrowDataType::Decimal(_, scale) => {532decimal_serializer(array.as_any().downcast_ref().unwrap(), *scale, offset, take)533},534ArrowDataType::LargeUtf8 => {535utf8_serializer::<i64>(array.as_any().downcast_ref().unwrap(), offset, take)536},537ArrowDataType::Utf8View => {538utf8view_serializer(array.as_any().downcast_ref().unwrap(), offset, take)539},540ArrowDataType::Struct(_) => {541struct_serializer(array.as_any().downcast_ref().unwrap(), offset, take)542},543ArrowDataType::FixedSizeList(_, _) => {544fixed_size_list_serializer(array.as_any().downcast_ref().unwrap(), offset, take)545},546ArrowDataType::LargeList(_) => {547list_serializer::<i64>(array.as_any().downcast_ref().unwrap(), offset, take)548},549ArrowDataType::Dictionary(k, v, _) => match (k, &**v) {550(IntegerType::UInt8, ArrowDataType::Utf8View) => {551let array = array552.as_any()553.downcast_ref::<DictionaryArray<u8>>()554.unwrap();555dictionary_utf8view_serializer::<u8>(array, offset, take)556},557(IntegerType::UInt16, ArrowDataType::Utf8View) => {558let array = array559.as_any()560.downcast_ref::<DictionaryArray<u16>>()561.unwrap();562dictionary_utf8view_serializer::<u16>(array, offset, take)563},564(IntegerType::UInt32, ArrowDataType::Utf8View) => {565let array = array566.as_any()567.downcast_ref::<DictionaryArray<u32>>()568.unwrap();569dictionary_utf8view_serializer::<u32>(array, offset, take)570},571_ => {572// Not produced by polars573unreachable!()574},575},576ArrowDataType::Date32 => date_serializer(577array.as_any().downcast_ref().unwrap(),578date32_to_date,579offset,580take,581),582ArrowDataType::Timestamp(tu, None) => {583let convert = match tu {584TimeUnit::Nanosecond => timestamp_ns_to_datetime,585TimeUnit::Microsecond => timestamp_us_to_datetime,586TimeUnit::Millisecond => timestamp_ms_to_datetime,587tu => panic!("Invalid time unit '{tu:?}' for Datetime."),588};589timestamp_serializer(590array.as_any().downcast_ref().unwrap(),591convert,592offset,593take,594)595},596ArrowDataType::Timestamp(time_unit, Some(tz)) => timestamp_tz_serializer(597array.as_any().downcast_ref().unwrap(),598*time_unit,599tz,600offset,601take,602),603ArrowDataType::Duration(tu) => {604let convert = match tu {605TimeUnit::Nanosecond => duration_ns_to_duration,606TimeUnit::Microsecond => duration_us_to_duration,607TimeUnit::Millisecond => duration_ms_to_duration,608tu => panic!("Invalid time unit '{tu:?}' for Duration."),609};610duration_serializer(611array.as_any().downcast_ref().unwrap(),612convert,613offset,614take,615)616},617ArrowDataType::Time64(tu) => {618let convert = match tu {619TimeUnit::Nanosecond => time64ns_to_time,620tu => panic!("Invalid time unit '{tu:?}' for Time."),621};622time_serializer(623array.as_any().downcast_ref().unwrap(),624convert,625offset,626take,627)628},629ArrowDataType::Null => null_serializer(array.len(), offset, take),630other => todo!("Writing {:?} to JSON", other),631}632}633634fn serialize_item<'a>(635buffer: &mut Vec<u8>,636record: impl Iterator<Item = (&'a str, &'a [u8])>,637is_first_row: bool,638) {639if !is_first_row {640buffer.push(b',');641}642buffer.push(b'{');643let mut first_item = true;644for (key, value) in record {645if !first_item {646buffer.push(b',');647}648first_item = false;649utf8::write_str(buffer, key).unwrap();650buffer.push(b':');651buffer.extend(value);652}653buffer.push(b'}');654}655656/// Serializes `array` to a valid JSON to `buffer`657/// # Implementation658/// This operation is CPU-bounded659pub(crate) fn serialize(array: &dyn Array, buffer: &mut Vec<u8>) {660let mut serializer = new_serializer(array, 0, usize::MAX);661662(0..array.len()).for_each(|i| {663if i != 0 {664buffer.push(b',');665}666buffer.extend_from_slice(serializer.next().unwrap());667});668}669670671