Path: blob/main/crates/polars-json/src/json/write/mod.rs
6939 views
//! APIs to write to JSON1mod serialize;2mod utf8;34use std::io::Write;56use arrow::array::Array;7use arrow::datatypes::ArrowSchema;8use arrow::io::iterator::StreamingIterator;9use arrow::record_batch::RecordBatchT;10pub use fallible_streaming_iterator::*;11use polars_error::{PolarsError, PolarsResult};12pub(crate) use serialize::new_serializer;13use serialize::serialize;14pub use utf8::serialize_to_utf8;1516/// [`FallibleStreamingIterator`] that serializes an [`Array`] to bytes of valid JSON17/// # Implementation18/// Advancing this iterator CPU-bounded19#[derive(Debug, Clone)]20pub struct Serializer<A, I>21where22A: AsRef<dyn Array>,23I: Iterator<Item = PolarsResult<A>>,24{25arrays: I,26buffer: Vec<u8>,27}2829impl<A, I> Serializer<A, I>30where31A: AsRef<dyn Array>,32I: Iterator<Item = PolarsResult<A>>,33{34/// Creates a new [`Serializer`].35pub fn new(arrays: I, buffer: Vec<u8>) -> Self {36Self { arrays, buffer }37}38}3940impl<A, I> FallibleStreamingIterator for Serializer<A, I>41where42A: AsRef<dyn Array>,43I: Iterator<Item = PolarsResult<A>>,44{45type Item = [u8];4647type Error = PolarsError;4849fn advance(&mut self) -> PolarsResult<()> {50self.buffer.clear();51self.arrays52.next()53.map(|maybe_array| maybe_array.map(|array| serialize(array.as_ref(), &mut self.buffer)))54.transpose()?;55Ok(())56}5758fn get(&self) -> Option<&Self::Item> {59if !self.buffer.is_empty() {60Some(&self.buffer)61} else {62None63}64}65}6667/// [`FallibleStreamingIterator`] that serializes a [`RecordBatchT`] into bytes of JSON68/// in a (pandas-compatible) record-oriented format.69///70/// # Implementation71/// Advancing this iterator is CPU-bounded.72pub struct RecordSerializer<'a> {73schema: ArrowSchema,74index: usize,75end: usize,76iterators: Vec<Box<dyn StreamingIterator<Item = [u8]> + Send + Sync + 'a>>,77buffer: Vec<u8>,78}7980impl<'a> RecordSerializer<'a> {81/// Creates a new [`RecordSerializer`].82pub fn new<A>(schema: ArrowSchema, chunk: &'a RecordBatchT<A>, buffer: Vec<u8>) -> Self83where84A: AsRef<dyn Array>,85{86let end = chunk.len();87let iterators = chunk88.arrays()89.iter()90.map(|arr| new_serializer(arr.as_ref(), 0, usize::MAX))91.collect();9293Self {94schema,95index: 0,96end,97iterators,98buffer,99}100}101}102103impl FallibleStreamingIterator for RecordSerializer<'_> {104type Item = [u8];105106type Error = PolarsError;107108fn advance(&mut self) -> PolarsResult<()> {109self.buffer.clear();110if self.index == self.end {111return Ok(());112}113114let mut is_first_row = true;115write!(&mut self.buffer, "{{")?;116for (f, ref mut it) in self.schema.iter_values().zip(self.iterators.iter_mut()) {117if !is_first_row {118write!(&mut self.buffer, ",")?;119}120write!(&mut self.buffer, "\"{}\":", f.name)?;121122self.buffer.extend_from_slice(it.next().unwrap());123is_first_row = false;124}125write!(&mut self.buffer, "}}")?;126127self.index += 1;128Ok(())129}130131fn get(&self) -> Option<&Self::Item> {132if !self.buffer.is_empty() {133Some(&self.buffer)134} else {135None136}137}138}139140/// Writes valid JSON from an iterator of (assumed JSON-encoded) bytes to `writer`141pub fn write<W, I>(writer: &mut W, mut blocks: I) -> PolarsResult<()>142where143W: std::io::Write,144I: FallibleStreamingIterator<Item = [u8], Error = PolarsError>,145{146writer.write_all(b"[")?;147let mut is_first_row = true;148while let Some(block) = blocks.next()? {149if !is_first_row {150writer.write_all(b",")?;151}152is_first_row = false;153writer.write_all(block)?;154}155writer.write_all(b"]")?;156Ok(())157}158159160