pub(crate) mod infer;
use std::io::Write;
use std::num::NonZeroUsize;
use std::ops::Deref;
use arrow::array::LIST_VALUES_NAME;
use arrow::legacy::conversion::chunk_to_struct;
use polars_core::error::to_compute_err;
use polars_core::prelude::*;
use polars_error::{PolarsResult, polars_bail};
use polars_json::json::write::FallibleStreamingIterator;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use simd_json::BorrowedValue;
use crate::mmap::{MmapBytesReader, ReaderBytes};
use crate::prelude::*;
#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
pub struct JsonWriterOptions {}
pub enum JsonFormat {
Json,
JsonLines,
}
#[must_use]
pub struct JsonWriter<W: Write> {
buffer: W,
json_format: JsonFormat,
}
impl<W: Write> JsonWriter<W> {
pub fn with_json_format(mut self, format: JsonFormat) -> Self {
self.json_format = format;
self
}
}
impl<W> SerWriter<W> for JsonWriter<W>
where
W: Write,
{
fn new(buffer: W) -> Self {
JsonWriter {
buffer,
json_format: JsonFormat::JsonLines,
}
}
fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
df.align_chunks_par();
let fields = df
.iter()
.map(|s| {
#[cfg(feature = "object")]
polars_ensure!(!matches!(s.dtype(), DataType::Object(_)), ComputeError: "cannot write 'Object' datatype to json");
Ok(s.field().to_arrow(CompatLevel::newest()))
})
.collect::<PolarsResult<Vec<_>>>()?;
let batches = df
.iter_chunks(CompatLevel::newest(), false)
.map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef));
match self.json_format {
JsonFormat::JsonLines => {
let serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]);
let writer =
polars_json::ndjson::write::FileWriter::new(&mut self.buffer, serializer);
writer.collect::<PolarsResult<()>>()?;
},
JsonFormat::Json => {
let serializer = polars_json::json::write::Serializer::new(batches, vec![]);
polars_json::json::write::write(&mut self.buffer, serializer)?;
},
}
Ok(())
}
}
pub struct BatchedWriter<W: Write> {
writer: W,
}
impl<W> BatchedWriter<W>
where
W: Write,
{
pub fn new(writer: W) -> Self {
BatchedWriter { writer }
}
pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
let fields = df
.iter()
.map(|s| {
#[cfg(feature = "object")]
polars_ensure!(!matches!(s.dtype(), DataType::Object(_)), ComputeError: "cannot write 'Object' datatype to json");
Ok(s.field().to_arrow(CompatLevel::newest()))
})
.collect::<PolarsResult<Vec<_>>>()?;
let chunks = df.iter_chunks(CompatLevel::newest(), false);
let batches =
chunks.map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef));
let mut serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]);
while let Some(block) = serializer.next()? {
self.writer.write_all(block)?;
}
Ok(())
}
}
#[must_use]
pub struct JsonReader<'a, R>
where
R: MmapBytesReader,
{
reader: R,
rechunk: bool,
ignore_errors: bool,
infer_schema_len: Option<NonZeroUsize>,
batch_size: NonZeroUsize,
projection: Option<Vec<PlSmallStr>>,
schema: Option<SchemaRef>,
schema_overwrite: Option<&'a Schema>,
json_format: JsonFormat,
}
pub fn remove_bom(bytes: &[u8]) -> PolarsResult<&[u8]> {
if bytes.starts_with(&[0xEF, 0xBB, 0xBF]) {
Ok(&bytes[3..])
} else if bytes.starts_with(&[0xFE, 0xFF]) || bytes.starts_with(&[0xFF, 0xFE]) {
polars_bail!(ComputeError: "utf-16 not supported")
} else {
Ok(bytes)
}
}
impl<R> SerReader<R> for JsonReader<'_, R>
where
R: MmapBytesReader,
{
fn new(reader: R) -> Self {
JsonReader {
reader,
rechunk: true,
ignore_errors: false,
infer_schema_len: Some(NonZeroUsize::new(100).unwrap()),
batch_size: NonZeroUsize::new(8192).unwrap(),
projection: None,
schema: None,
schema_overwrite: None,
json_format: JsonFormat::Json,
}
}
fn set_rechunk(mut self, rechunk: bool) -> Self {
self.rechunk = rechunk;
self
}
fn finish(mut self) -> PolarsResult<DataFrame> {
let pre_rb: ReaderBytes = (&mut self.reader).into();
let bytes = remove_bom(pre_rb.deref())?;
let rb = ReaderBytes::Borrowed(bytes);
let out = match self.json_format {
JsonFormat::Json => {
polars_ensure!(!self.ignore_errors, InvalidOperation: "'ignore_errors' only supported in ndjson");
let mut bytes = rb.deref().to_vec();
let owned = &mut vec![];
compression::maybe_decompress_bytes(&bytes, owned)?;
let json_value = if owned.is_empty() {
simd_json::to_borrowed_value(&mut bytes).map_err(to_compute_err)?
} else {
simd_json::to_borrowed_value(owned).map_err(to_compute_err)?
};
if let BorrowedValue::Array(array) = &json_value {
if array.is_empty() & self.schema.is_none() & self.schema_overwrite.is_none() {
return Ok(DataFrame::empty());
}
}
let allow_extra_fields_in_struct = self.schema.is_some();
let dtype = if let Some(mut schema) = self.schema {
if let Some(overwrite) = self.schema_overwrite {
let mut_schema = Arc::make_mut(&mut schema);
overwrite_schema(mut_schema, overwrite)?;
}
DataType::Struct(schema.iter_fields().collect()).to_arrow(CompatLevel::newest())
} else {
let inner_dtype = if let BorrowedValue::Array(values) = &json_value {
infer::json_values_to_supertype(
values,
self.infer_schema_len
.unwrap_or(NonZeroUsize::new(usize::MAX).unwrap()),
)?
.to_arrow(CompatLevel::newest())
} else {
polars_json::json::infer(&json_value)?
};
if let Some(overwrite) = self.schema_overwrite {
let ArrowDataType::Struct(fields) = inner_dtype else {
polars_bail!(ComputeError: "can only deserialize json objects")
};
let mut schema = Schema::from_iter(fields.iter().map(Into::<Field>::into));
overwrite_schema(&mut schema, overwrite)?;
DataType::Struct(
schema
.into_iter()
.map(|(name, dt)| Field::new(name, dt))
.collect(),
)
.to_arrow(CompatLevel::newest())
} else {
inner_dtype
}
};
let dtype = if let BorrowedValue::Array(_) = &json_value {
ArrowDataType::LargeList(Box::new(arrow::datatypes::Field::new(
LIST_VALUES_NAME,
dtype,
true,
)))
} else {
dtype
};
let arr = polars_json::json::deserialize(
&json_value,
dtype,
allow_extra_fields_in_struct,
)?;
let arr = arr.as_any().downcast_ref::<StructArray>().ok_or_else(
|| polars_err!(ComputeError: "can only deserialize json objects"),
)?;
DataFrame::try_from(arr.clone())
},
JsonFormat::JsonLines => {
let mut json_reader = CoreJsonReader::new(
rb,
None,
self.schema,
self.schema_overwrite,
None,
1024,
NonZeroUsize::new(1 << 18).unwrap(),
false,
self.infer_schema_len,
self.ignore_errors,
None,
None,
None,
)?;
let mut df: DataFrame = json_reader.as_df()?;
if self.rechunk {
df.as_single_chunk_par();
}
Ok(df)
},
}?;
if let Some(proj) = self.projection.as_deref() {
out.select(proj.iter().cloned())
} else {
Ok(out)
}
}
}
impl<'a, R> JsonReader<'a, R>
where
R: MmapBytesReader,
{
pub fn with_schema(mut self, schema: SchemaRef) -> Self {
self.schema = Some(schema);
self
}
pub fn with_schema_overwrite(mut self, schema: &'a Schema) -> Self {
self.schema_overwrite = Some(schema);
self
}
pub fn infer_schema_len(mut self, max_records: Option<NonZeroUsize>) -> Self {
self.infer_schema_len = max_records;
self
}
pub fn with_batch_size(mut self, batch_size: NonZeroUsize) -> Self {
self.batch_size = batch_size;
self
}
pub fn with_projection(mut self, projection: Option<Vec<PlSmallStr>>) -> Self {
self.projection = projection;
self
}
pub fn with_json_format(mut self, format: JsonFormat) -> Self {
self.json_format = format;
self
}
pub fn with_ignore_errors(mut self, ignore: bool) -> Self {
self.ignore_errors = ignore;
self
}
}