pub(crate) mod infer;
use std::io::Write;
use std::num::NonZeroUsize;
use std::ops::Deref;
use arrow::array::LIST_VALUES_NAME;
use arrow::legacy::conversion::chunk_to_struct;
use polars_core::chunked_array::cast::CastOptions;
use polars_core::error::to_compute_err;
use polars_core::prelude::*;
use polars_error::{PolarsResult, polars_bail};
use polars_json::json::write::FallibleStreamingIterator;
use simd_json::BorrowedValue;
use crate::mmap::{MmapBytesReader, ReaderBytes};
use crate::prelude::*;
pub enum JsonFormat {
Json,
JsonLines,
}
#[must_use]
pub struct JsonWriter<W: Write> {
buffer: W,
json_format: JsonFormat,
}
impl<W: Write> JsonWriter<W> {
pub fn with_json_format(mut self, format: JsonFormat) -> Self {
self.json_format = format;
self
}
}
impl<W> SerWriter<W> for JsonWriter<W>
where
W: Write,
{
fn new(buffer: W) -> Self {
JsonWriter {
buffer,
json_format: JsonFormat::JsonLines,
}
}
fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
df.align_chunks_par();
let fields = df.columns()
.iter()
.map(|s| {
#[cfg(feature = "object")]
polars_ensure!(!matches!(s.dtype(), DataType::Object(_)), ComputeError: "cannot write 'Object' datatype to json");
Ok(s.field().to_arrow(CompatLevel::newest()))
})
.collect::<PolarsResult<Vec<_>>>()?;
let batches = df
.iter_chunks(CompatLevel::newest(), false)
.map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef));
match self.json_format {
JsonFormat::JsonLines => {
let serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]);
let writer =
polars_json::ndjson::write::FileWriter::new(&mut self.buffer, serializer);
writer.collect::<PolarsResult<()>>()?;
},
JsonFormat::Json => {
let serializer = polars_json::json::write::Serializer::new(batches, vec![]);
polars_json::json::write::write(&mut self.buffer, serializer)?;
},
}
Ok(())
}
}
pub struct BatchedWriter<W: Write> {
writer: W,
}
impl<W> BatchedWriter<W>
where
W: Write,
{
pub fn new(writer: W) -> Self {
BatchedWriter { writer }
}
pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
let fields = df.columns()
.iter()
.map(|s| {
#[cfg(feature = "object")]
polars_ensure!(!matches!(s.dtype(), DataType::Object(_)), ComputeError: "cannot write 'Object' datatype to json");
Ok(s.field().to_arrow(CompatLevel::newest()))
})
.collect::<PolarsResult<Vec<_>>>()?;
let chunks = df.iter_chunks(CompatLevel::newest(), false);
let batches =
chunks.map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef));
let mut serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]);
while let Some(block) = serializer.next()? {
self.writer.write_all(block)?;
}
Ok(())
}
}
#[must_use]
pub struct JsonReader<'a, R>
where
R: MmapBytesReader,
{
reader: R,
rechunk: bool,
ignore_errors: bool,
infer_schema_len: Option<NonZeroUsize>,
batch_size: NonZeroUsize,
projection: Option<Vec<PlSmallStr>>,
schema: Option<SchemaRef>,
schema_overwrite: Option<&'a Schema>,
json_format: JsonFormat,
}
pub fn remove_bom(bytes: &[u8]) -> PolarsResult<&[u8]> {
if bytes.starts_with(&[0xEF, 0xBB, 0xBF]) {
Ok(&bytes[3..])
} else if bytes.starts_with(&[0xFE, 0xFF]) || bytes.starts_with(&[0xFF, 0xFE]) {
polars_bail!(ComputeError: "utf-16 not supported")
} else {
Ok(bytes)
}
}
impl<R> SerReader<R> for JsonReader<'_, R>
where
R: MmapBytesReader,
{
fn new(reader: R) -> Self {
JsonReader {
reader,
rechunk: true,
ignore_errors: false,
infer_schema_len: Some(NonZeroUsize::new(100).unwrap()),
batch_size: NonZeroUsize::new(8192).unwrap(),
projection: None,
schema: None,
schema_overwrite: None,
json_format: JsonFormat::Json,
}
}
fn set_rechunk(mut self, rechunk: bool) -> Self {
self.rechunk = rechunk;
self
}
fn finish(mut self) -> PolarsResult<DataFrame> {
let pre_rb: ReaderBytes = (&mut self.reader).into();
let bytes = remove_bom(pre_rb.deref())?;
let rb = ReaderBytes::Borrowed(bytes);
let out = match self.json_format {
JsonFormat::Json => {
polars_ensure!(!self.ignore_errors, InvalidOperation: "'ignore_errors' only supported in ndjson");
let mut bytes = rb.deref().to_vec();
let owned = &mut vec![];
#[expect(deprecated)]
compression::maybe_decompress_bytes(&bytes, owned)?;
let json_value = if owned.is_empty() {
simd_json::to_borrowed_value(&mut bytes).map_err(to_compute_err)?
} else {
simd_json::to_borrowed_value(owned).map_err(to_compute_err)?
};
if let BorrowedValue::Array(array) = &json_value {
if array.is_empty() & self.schema.is_none() & self.schema_overwrite.is_none() {
return Ok(DataFrame::empty());
}
}
let allow_extra_fields_in_struct = self.schema.is_some();
let mut schema = if let Some(schema) = self.schema {
Arc::unwrap_or_clone(schema)
} else {
let inner_dtype = if let BorrowedValue::Array(values) = &json_value {
infer::json_values_to_supertype(
values,
self.infer_schema_len
.unwrap_or(NonZeroUsize::new(usize::MAX).unwrap()),
)?
} else {
DataType::from_arrow_dtype(&polars_json::json::infer(&json_value)?)
};
let DataType::Struct(fields) = inner_dtype else {
polars_bail!(ComputeError: "can only deserialize json objects")
};
Schema::from_iter(fields)
};
if let Some(overwrite) = self.schema_overwrite {
overwrite_schema(&mut schema, overwrite)?;
}
let mut needs_cast = false;
let deserialize_schema = schema
.iter()
.map(|(name, dt)| {
Field::new(
name.clone(),
dt.clone().map_leaves(&mut |leaf_dt| {
match leaf_dt {
#[cfg(feature = "dtype-categorical")]
DataType::Enum(..) | DataType::Categorical(..) => {
needs_cast = true;
DataType::String
},
leaf_dt => leaf_dt,
}
}),
)
})
.collect();
let arrow_dtype =
DataType::Struct(deserialize_schema).to_arrow(CompatLevel::newest());
let arrow_dtype = if let BorrowedValue::Array(_) = &json_value {
ArrowDataType::LargeList(Box::new(arrow::datatypes::Field::new(
LIST_VALUES_NAME,
arrow_dtype,
true,
)))
} else {
arrow_dtype
};
let arr = polars_json::json::deserialize(
&json_value,
arrow_dtype,
allow_extra_fields_in_struct,
)?;
let arr = arr.as_any().downcast_ref::<StructArray>().ok_or_else(
|| polars_err!(ComputeError: "can only deserialize json objects"),
)?;
let mut df = DataFrame::try_from(arr.clone())?;
if df.width() == 0 && df.height() <= 1 {
unsafe { df.set_height(0) };
}
if needs_cast {
for (col, dt) in unsafe { df.columns_mut() }
.iter_mut()
.zip(schema.iter_values())
{
*col = col.cast_with_options(
dt,
if self.ignore_errors {
CastOptions::NonStrict
} else {
CastOptions::Strict
},
)?;
}
}
df
},
JsonFormat::JsonLines => {
let mut json_reader = CoreJsonReader::new(
rb,
None,
self.schema,
self.schema_overwrite,
None,
1024,
NonZeroUsize::new(1 << 18).unwrap(),
false,
self.infer_schema_len,
self.ignore_errors,
None,
None,
None,
)?;
let mut df: DataFrame = json_reader.as_df()?;
if self.rechunk {
df.rechunk_mut_par();
}
df
},
};
if let Some(proj) = self.projection.as_deref() {
out.select(proj.iter().cloned())
} else {
Ok(out)
}
}
}
impl<'a, R> JsonReader<'a, R>
where
R: MmapBytesReader,
{
pub fn with_schema(mut self, schema: SchemaRef) -> Self {
self.schema = Some(schema);
self
}
pub fn with_schema_overwrite(mut self, schema: &'a Schema) -> Self {
self.schema_overwrite = Some(schema);
self
}
pub fn infer_schema_len(mut self, max_records: Option<NonZeroUsize>) -> Self {
self.infer_schema_len = max_records;
self
}
pub fn with_batch_size(mut self, batch_size: NonZeroUsize) -> Self {
self.batch_size = batch_size;
self
}
pub fn with_projection(mut self, projection: Option<Vec<PlSmallStr>>) -> Self {
self.projection = projection;
self
}
pub fn with_json_format(mut self, format: JsonFormat) -> Self {
self.json_format = format;
self
}
pub fn with_ignore_errors(mut self, ignore: bool) -> Self {
self.ignore_errors = ignore;
self
}
}