Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/json/mod.rs
8421 views
1
//! # (De)serialize JSON files.
2
//!
3
//! ## Read JSON to a DataFrame
4
//!
5
//! ## Example
6
//!
7
//! ```
8
//! use polars_core::prelude::*;
9
//! use polars_io::prelude::*;
10
//! use std::io::Cursor;
11
//! use std::num::NonZeroUsize;
12
//!
13
//! let basic_json = r#"{"a":1, "b":2.0, "c":false, "d":"4"}
14
//! {"a":-10, "b":-3.5, "c":true, "d":"4"}
15
//! {"a":2, "b":0.6, "c":false, "d":"text"}
16
//! {"a":1, "b":2.0, "c":false, "d":"4"}
17
//! {"a":7, "b":-3.5, "c":true, "d":"4"}
18
//! {"a":1, "b":0.6, "c":false, "d":"text"}
19
//! {"a":1, "b":2.0, "c":false, "d":"4"}
20
//! {"a":5, "b":-3.5, "c":true, "d":"4"}
21
//! {"a":1, "b":0.6, "c":false, "d":"text"}
22
//! {"a":1, "b":2.0, "c":false, "d":"4"}
23
//! {"a":1, "b":-3.5, "c":true, "d":"4"}
24
//! {"a":1, "b":0.6, "c":false, "d":"text"}"#;
25
//! let file = Cursor::new(basic_json);
26
//! let df = JsonReader::new(file)
27
//! .with_json_format(JsonFormat::JsonLines)
28
//! .infer_schema_len(NonZeroUsize::new(3))
29
//! .with_batch_size(NonZeroUsize::new(3).unwrap())
30
//! .finish()
31
//! .unwrap();
32
//!
33
//! println!("{:?}", df);
34
//! ```
35
//! >>> Outputs:
36
//!
37
//! ```text
38
//! +-----+--------+-------+--------+
39
//! | a | b | c | d |
40
//! | --- | --- | --- | --- |
41
//! | i64 | f64 | bool | str |
42
//! +=====+========+=======+========+
43
//! | 1 | 2 | false | "4" |
44
//! +-----+--------+-------+--------+
45
//! | -10 | -3.5e0 | true | "4" |
46
//! +-----+--------+-------+--------+
47
//! | 2 | 0.6 | false | "text" |
48
//! +-----+--------+-------+--------+
49
//! | 1 | 2 | false | "4" |
50
//! +-----+--------+-------+--------+
51
//! | 7 | -3.5e0 | true | "4" |
52
//! +-----+--------+-------+--------+
53
//! | 1 | 0.6 | false | "text" |
54
//! +-----+--------+-------+--------+
55
//! | 1 | 2 | false | "4" |
56
//! +-----+--------+-------+--------+
57
//! | 5 | -3.5e0 | true | "4" |
58
//! +-----+--------+-------+--------+
59
//! | 1 | 0.6 | false | "text" |
60
//! +-----+--------+-------+--------+
61
//! | 1 | 2 | false | "4" |
62
//! +-----+--------+-------+--------+
63
//! ```
64
//!
65
pub(crate) mod infer;
66
67
use std::io::Write;
68
use std::num::NonZeroUsize;
69
use std::ops::Deref;
70
71
use arrow::array::LIST_VALUES_NAME;
72
use arrow::legacy::conversion::chunk_to_struct;
73
use polars_core::chunked_array::cast::CastOptions;
74
use polars_core::error::to_compute_err;
75
use polars_core::prelude::*;
76
use polars_error::{PolarsResult, polars_bail};
77
use polars_json::json::write::FallibleStreamingIterator;
78
use simd_json::BorrowedValue;
79
80
use crate::mmap::{MmapBytesReader, ReaderBytes};
81
use crate::prelude::*;
82
83
/// The format to use to write the DataFrame to JSON: `Json` (a JSON array)
84
/// or `JsonLines` (each row output on a separate line).
85
///
86
/// In either case, each row is serialized as a JSON object whose keys are the column names and
87
/// whose values are the row's corresponding values.
88
pub enum JsonFormat {
89
/// A single JSON array containing each DataFrame row as an object. The length of the array is the number of rows in
90
/// the DataFrame.
91
///
92
/// Use this to create valid JSON that can be deserialized back into an array in one fell swoop.
93
Json,
94
/// Each DataFrame row is serialized as a JSON object on a separate line. The number of lines in the output is the
95
/// number of rows in the DataFrame.
96
///
97
/// The [JSON Lines](https://jsonlines.org) format makes it easy to read records in a streaming fashion, one (line)
98
/// at a time. But the output in its entirety is not valid JSON; only the individual lines are.
99
///
100
/// It is recommended to use the file extension `.jsonl` when saving as JSON Lines.
101
JsonLines,
102
}
103
104
/// Writes a DataFrame to JSON.
105
///
106
/// Under the hood, this uses [`arrow2::io::json`](https://docs.rs/arrow2/latest/arrow2/io/json/write/fn.write.html).
107
/// `arrow2` generally serializes types that are not JSON primitives, such as Date and DateTime, as their
108
/// `Display`-formatted versions. For instance, a (naive) DateTime column is formatted as the String `"yyyy-mm-dd
109
/// HH:MM:SS"`. To control how non-primitive columns are serialized, convert them to String or another primitive type
110
/// before serializing.
111
#[must_use]
112
pub struct JsonWriter<W: Write> {
113
/// File or Stream handler
114
buffer: W,
115
json_format: JsonFormat,
116
}
117
118
impl<W: Write> JsonWriter<W> {
119
pub fn with_json_format(mut self, format: JsonFormat) -> Self {
120
self.json_format = format;
121
self
122
}
123
}
124
125
impl<W> SerWriter<W> for JsonWriter<W>
126
where
127
W: Write,
128
{
129
/// Create a new `JsonWriter` writing to `buffer` with format `JsonFormat::JsonLines`. To specify a different
130
/// format, use e.g., [`JsonWriter::new(buffer).with_json_format(JsonFormat::Json)`](JsonWriter::with_json_format).
131
fn new(buffer: W) -> Self {
132
JsonWriter {
133
buffer,
134
json_format: JsonFormat::JsonLines,
135
}
136
}
137
138
fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
139
df.align_chunks_par();
140
let fields = df.columns()
141
.iter()
142
.map(|s| {
143
#[cfg(feature = "object")]
144
polars_ensure!(!matches!(s.dtype(), DataType::Object(_)), ComputeError: "cannot write 'Object' datatype to json");
145
Ok(s.field().to_arrow(CompatLevel::newest()))
146
})
147
.collect::<PolarsResult<Vec<_>>>()?;
148
let batches = df
149
.iter_chunks(CompatLevel::newest(), false)
150
.map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef));
151
152
match self.json_format {
153
JsonFormat::JsonLines => {
154
let serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]);
155
let writer =
156
polars_json::ndjson::write::FileWriter::new(&mut self.buffer, serializer);
157
writer.collect::<PolarsResult<()>>()?;
158
},
159
JsonFormat::Json => {
160
let serializer = polars_json::json::write::Serializer::new(batches, vec![]);
161
polars_json::json::write::write(&mut self.buffer, serializer)?;
162
},
163
}
164
165
Ok(())
166
}
167
}
168
169
pub struct BatchedWriter<W: Write> {
170
writer: W,
171
}
172
173
impl<W> BatchedWriter<W>
174
where
175
W: Write,
176
{
177
pub fn new(writer: W) -> Self {
178
BatchedWriter { writer }
179
}
180
/// Write a batch to the json writer.
181
///
182
/// # Panics
183
/// The caller must ensure the chunks in the given [`DataFrame`] are aligned.
184
pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
185
let fields = df.columns()
186
.iter()
187
.map(|s| {
188
#[cfg(feature = "object")]
189
polars_ensure!(!matches!(s.dtype(), DataType::Object(_)), ComputeError: "cannot write 'Object' datatype to json");
190
Ok(s.field().to_arrow(CompatLevel::newest()))
191
})
192
.collect::<PolarsResult<Vec<_>>>()?;
193
let chunks = df.iter_chunks(CompatLevel::newest(), false);
194
let batches =
195
chunks.map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef));
196
let mut serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]);
197
while let Some(block) = serializer.next()? {
198
self.writer.write_all(block)?;
199
}
200
Ok(())
201
}
202
}
203
204
/// Reads JSON in one of the formats in [`JsonFormat`] into a DataFrame.
205
#[must_use]
206
pub struct JsonReader<'a, R>
207
where
208
R: MmapBytesReader,
209
{
210
reader: R,
211
rechunk: bool,
212
ignore_errors: bool,
213
infer_schema_len: Option<NonZeroUsize>,
214
batch_size: NonZeroUsize,
215
projection: Option<Vec<PlSmallStr>>,
216
schema: Option<SchemaRef>,
217
schema_overwrite: Option<&'a Schema>,
218
json_format: JsonFormat,
219
}
220
221
pub fn remove_bom(bytes: &[u8]) -> PolarsResult<&[u8]> {
222
if bytes.starts_with(&[0xEF, 0xBB, 0xBF]) {
223
// UTF-8 BOM
224
Ok(&bytes[3..])
225
} else if bytes.starts_with(&[0xFE, 0xFF]) || bytes.starts_with(&[0xFF, 0xFE]) {
226
// UTF-16 BOM
227
polars_bail!(ComputeError: "utf-16 not supported")
228
} else {
229
Ok(bytes)
230
}
231
}
232
impl<R> SerReader<R> for JsonReader<'_, R>
233
where
234
R: MmapBytesReader,
235
{
236
fn new(reader: R) -> Self {
237
JsonReader {
238
reader,
239
rechunk: true,
240
ignore_errors: false,
241
infer_schema_len: Some(NonZeroUsize::new(100).unwrap()),
242
batch_size: NonZeroUsize::new(8192).unwrap(),
243
projection: None,
244
schema: None,
245
schema_overwrite: None,
246
json_format: JsonFormat::Json,
247
}
248
}
249
250
fn set_rechunk(mut self, rechunk: bool) -> Self {
251
self.rechunk = rechunk;
252
self
253
}
254
255
/// Take the SerReader and return a parsed DataFrame.
256
///
257
/// Because JSON values specify their types (number, string, etc), no upcasting or conversion is performed between
258
/// incompatible types in the input. In the event that a column contains mixed dtypes, is it unspecified whether an
259
/// error is returned or whether elements of incompatible dtypes are replaced with `null`.
260
fn finish(mut self) -> PolarsResult<DataFrame> {
261
let pre_rb: ReaderBytes = (&mut self.reader).into();
262
let bytes = remove_bom(pre_rb.deref())?;
263
let rb = ReaderBytes::Borrowed(bytes);
264
let out = match self.json_format {
265
JsonFormat::Json => {
266
polars_ensure!(!self.ignore_errors, InvalidOperation: "'ignore_errors' only supported in ndjson");
267
let mut bytes = rb.deref().to_vec();
268
let owned = &mut vec![];
269
#[expect(deprecated)] // JSON is not a row-format
270
compression::maybe_decompress_bytes(&bytes, owned)?;
271
// the easiest way to avoid ownership issues is by implicitly figuring out if
272
// decompression happened (owned is only populated on decompress), then pick which bytes to parse
273
let json_value = if owned.is_empty() {
274
simd_json::to_borrowed_value(&mut bytes).map_err(to_compute_err)?
275
} else {
276
simd_json::to_borrowed_value(owned).map_err(to_compute_err)?
277
};
278
if let BorrowedValue::Array(array) = &json_value {
279
if array.is_empty() & self.schema.is_none() & self.schema_overwrite.is_none() {
280
return Ok(DataFrame::empty());
281
}
282
}
283
284
let allow_extra_fields_in_struct = self.schema.is_some();
285
286
let mut schema = if let Some(schema) = self.schema {
287
Arc::unwrap_or_clone(schema)
288
} else {
289
// Infer.
290
let inner_dtype = if let BorrowedValue::Array(values) = &json_value {
291
infer::json_values_to_supertype(
292
values,
293
self.infer_schema_len
294
.unwrap_or(NonZeroUsize::new(usize::MAX).unwrap()),
295
)?
296
} else {
297
DataType::from_arrow_dtype(&polars_json::json::infer(&json_value)?)
298
};
299
300
let DataType::Struct(fields) = inner_dtype else {
301
polars_bail!(ComputeError: "can only deserialize json objects")
302
};
303
304
Schema::from_iter(fields)
305
};
306
307
if let Some(overwrite) = self.schema_overwrite {
308
overwrite_schema(&mut schema, overwrite)?;
309
}
310
311
let mut needs_cast = false;
312
let deserialize_schema = schema
313
.iter()
314
.map(|(name, dt)| {
315
Field::new(
316
name.clone(),
317
dt.clone().map_leaves(&mut |leaf_dt| {
318
// Deserialize enums and categoricals as strings first.
319
match leaf_dt {
320
#[cfg(feature = "dtype-categorical")]
321
DataType::Enum(..) | DataType::Categorical(..) => {
322
needs_cast = true;
323
DataType::String
324
},
325
leaf_dt => leaf_dt,
326
}
327
}),
328
)
329
})
330
.collect();
331
332
let arrow_dtype =
333
DataType::Struct(deserialize_schema).to_arrow(CompatLevel::newest());
334
335
let arrow_dtype = if let BorrowedValue::Array(_) = &json_value {
336
ArrowDataType::LargeList(Box::new(arrow::datatypes::Field::new(
337
LIST_VALUES_NAME,
338
arrow_dtype,
339
true,
340
)))
341
} else {
342
arrow_dtype
343
};
344
345
let arr = polars_json::json::deserialize(
346
&json_value,
347
arrow_dtype,
348
allow_extra_fields_in_struct,
349
)?;
350
351
let arr = arr.as_any().downcast_ref::<StructArray>().ok_or_else(
352
|| polars_err!(ComputeError: "can only deserialize json objects"),
353
)?;
354
355
let mut df = DataFrame::try_from(arr.clone())?;
356
357
if df.width() == 0 && df.height() <= 1 {
358
// read_json("{}")
359
unsafe { df.set_height(0) };
360
}
361
362
if needs_cast {
363
for (col, dt) in unsafe { df.columns_mut() }
364
.iter_mut()
365
.zip(schema.iter_values())
366
{
367
*col = col.cast_with_options(
368
dt,
369
if self.ignore_errors {
370
CastOptions::NonStrict
371
} else {
372
CastOptions::Strict
373
},
374
)?;
375
}
376
}
377
378
df
379
},
380
JsonFormat::JsonLines => {
381
let mut json_reader = CoreJsonReader::new(
382
rb,
383
None,
384
self.schema,
385
self.schema_overwrite,
386
None,
387
1024, // sample size
388
NonZeroUsize::new(1 << 18).unwrap(),
389
false,
390
self.infer_schema_len,
391
self.ignore_errors,
392
None,
393
None,
394
None,
395
)?;
396
let mut df: DataFrame = json_reader.as_df()?;
397
if self.rechunk {
398
df.rechunk_mut_par();
399
}
400
401
df
402
},
403
};
404
405
// TODO! Ensure we don't materialize the columns we don't need
406
if let Some(proj) = self.projection.as_deref() {
407
out.select(proj.iter().cloned())
408
} else {
409
Ok(out)
410
}
411
}
412
}
413
414
impl<'a, R> JsonReader<'a, R>
415
where
416
R: MmapBytesReader,
417
{
418
/// Set the JSON file's schema
419
pub fn with_schema(mut self, schema: SchemaRef) -> Self {
420
self.schema = Some(schema);
421
self
422
}
423
424
/// Overwrite parts of the inferred schema.
425
pub fn with_schema_overwrite(mut self, schema: &'a Schema) -> Self {
426
self.schema_overwrite = Some(schema);
427
self
428
}
429
430
/// Set the JSON reader to infer the schema of the file. Currently, this is only used when reading from
431
/// [`JsonFormat::JsonLines`], as [`JsonFormat::Json`] reads in the entire array anyway.
432
///
433
/// When using [`JsonFormat::JsonLines`], `max_records = None` will read the entire buffer in order to infer the
434
/// schema, `Some(1)` would look only at the first record, `Some(2)` the first two records, etc.
435
///
436
/// It is an error to pass `max_records = Some(0)`, as a schema cannot be inferred from 0 records when deserializing
437
/// from JSON (unlike CSVs, there is no header row to inspect for column names).
438
pub fn infer_schema_len(mut self, max_records: Option<NonZeroUsize>) -> Self {
439
self.infer_schema_len = max_records;
440
self
441
}
442
443
/// Set the batch size (number of records to load at one time)
444
///
445
/// This heavily influences loading time.
446
pub fn with_batch_size(mut self, batch_size: NonZeroUsize) -> Self {
447
self.batch_size = batch_size;
448
self
449
}
450
451
/// Set the reader's column projection: the names of the columns to keep after deserialization. If `None`, all
452
/// columns are kept.
453
///
454
/// Setting `projection` to the columns you want to keep is more efficient than deserializing all of the columns and
455
/// then dropping the ones you don't want.
456
pub fn with_projection(mut self, projection: Option<Vec<PlSmallStr>>) -> Self {
457
self.projection = projection;
458
self
459
}
460
461
pub fn with_json_format(mut self, format: JsonFormat) -> Self {
462
self.json_format = format;
463
self
464
}
465
466
/// Return a `null` if an error occurs during parsing.
467
pub fn with_ignore_errors(mut self, ignore: bool) -> Self {
468
self.ignore_errors = ignore;
469
self
470
}
471
}
472
473