Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/json/mod.rs
6939 views
1
//! # (De)serialize JSON files.
2
//!
3
//! ## Read JSON to a DataFrame
4
//!
5
//! ## Example
6
//!
7
//! ```
8
//! use polars_core::prelude::*;
9
//! use polars_io::prelude::*;
10
//! use std::io::Cursor;
11
//! use std::num::NonZeroUsize;
12
//!
13
//! let basic_json = r#"{"a":1, "b":2.0, "c":false, "d":"4"}
14
//! {"a":-10, "b":-3.5, "c":true, "d":"4"}
15
//! {"a":2, "b":0.6, "c":false, "d":"text"}
16
//! {"a":1, "b":2.0, "c":false, "d":"4"}
17
//! {"a":7, "b":-3.5, "c":true, "d":"4"}
18
//! {"a":1, "b":0.6, "c":false, "d":"text"}
19
//! {"a":1, "b":2.0, "c":false, "d":"4"}
20
//! {"a":5, "b":-3.5, "c":true, "d":"4"}
21
//! {"a":1, "b":0.6, "c":false, "d":"text"}
22
//! {"a":1, "b":2.0, "c":false, "d":"4"}
23
//! {"a":1, "b":-3.5, "c":true, "d":"4"}
24
//! {"a":1, "b":0.6, "c":false, "d":"text"}"#;
25
//! let file = Cursor::new(basic_json);
26
//! let df = JsonReader::new(file)
27
//! .with_json_format(JsonFormat::JsonLines)
28
//! .infer_schema_len(NonZeroUsize::new(3))
29
//! .with_batch_size(NonZeroUsize::new(3).unwrap())
30
//! .finish()
31
//! .unwrap();
32
//!
33
//! println!("{:?}", df);
34
//! ```
35
//! >>> Outputs:
36
//!
37
//! ```text
38
//! +-----+--------+-------+--------+
39
//! | a | b | c | d |
40
//! | --- | --- | --- | --- |
41
//! | i64 | f64 | bool | str |
42
//! +=====+========+=======+========+
43
//! | 1 | 2 | false | "4" |
44
//! +-----+--------+-------+--------+
45
//! | -10 | -3.5e0 | true | "4" |
46
//! +-----+--------+-------+--------+
47
//! | 2 | 0.6 | false | "text" |
48
//! +-----+--------+-------+--------+
49
//! | 1 | 2 | false | "4" |
50
//! +-----+--------+-------+--------+
51
//! | 7 | -3.5e0 | true | "4" |
52
//! +-----+--------+-------+--------+
53
//! | 1 | 0.6 | false | "text" |
54
//! +-----+--------+-------+--------+
55
//! | 1 | 2 | false | "4" |
56
//! +-----+--------+-------+--------+
57
//! | 5 | -3.5e0 | true | "4" |
58
//! +-----+--------+-------+--------+
59
//! | 1 | 0.6 | false | "text" |
60
//! +-----+--------+-------+--------+
61
//! | 1 | 2 | false | "4" |
62
//! +-----+--------+-------+--------+
63
//! ```
64
//!
65
pub(crate) mod infer;
66
67
use std::io::Write;
68
use std::num::NonZeroUsize;
69
use std::ops::Deref;
70
71
use arrow::array::LIST_VALUES_NAME;
72
use arrow::legacy::conversion::chunk_to_struct;
73
use polars_core::error::to_compute_err;
74
use polars_core::prelude::*;
75
use polars_error::{PolarsResult, polars_bail};
76
use polars_json::json::write::FallibleStreamingIterator;
77
#[cfg(feature = "serde")]
78
use serde::{Deserialize, Serialize};
79
use simd_json::BorrowedValue;
80
81
use crate::mmap::{MmapBytesReader, ReaderBytes};
82
use crate::prelude::*;
83
84
#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Hash)]
85
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
86
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
87
pub struct JsonWriterOptions {}
88
89
/// The format to use to write the DataFrame to JSON: `Json` (a JSON array)
90
/// or `JsonLines` (each row output on a separate line).
91
///
92
/// In either case, each row is serialized as a JSON object whose keys are the column names and
93
/// whose values are the row's corresponding values.
94
pub enum JsonFormat {
95
/// A single JSON array containing each DataFrame row as an object. The length of the array is the number of rows in
96
/// the DataFrame.
97
///
98
/// Use this to create valid JSON that can be deserialized back into an array in one fell swoop.
99
Json,
100
/// Each DataFrame row is serialized as a JSON object on a separate line. The number of lines in the output is the
101
/// number of rows in the DataFrame.
102
///
103
/// The [JSON Lines](https://jsonlines.org) format makes it easy to read records in a streaming fashion, one (line)
104
/// at a time. But the output in its entirety is not valid JSON; only the individual lines are.
105
///
106
/// It is recommended to use the file extension `.jsonl` when saving as JSON Lines.
107
JsonLines,
108
}
109
110
/// Writes a DataFrame to JSON.
111
///
112
/// Under the hood, this uses [`arrow2::io::json`](https://docs.rs/arrow2/latest/arrow2/io/json/write/fn.write.html).
113
/// `arrow2` generally serializes types that are not JSON primitives, such as Date and DateTime, as their
114
/// `Display`-formatted versions. For instance, a (naive) DateTime column is formatted as the String `"yyyy-mm-dd
115
/// HH:MM:SS"`. To control how non-primitive columns are serialized, convert them to String or another primitive type
116
/// before serializing.
117
#[must_use]
118
pub struct JsonWriter<W: Write> {
119
/// File or Stream handler
120
buffer: W,
121
json_format: JsonFormat,
122
}
123
124
impl<W: Write> JsonWriter<W> {
125
pub fn with_json_format(mut self, format: JsonFormat) -> Self {
126
self.json_format = format;
127
self
128
}
129
}
130
131
impl<W> SerWriter<W> for JsonWriter<W>
132
where
133
W: Write,
134
{
135
/// Create a new `JsonWriter` writing to `buffer` with format `JsonFormat::JsonLines`. To specify a different
136
/// format, use e.g., [`JsonWriter::new(buffer).with_json_format(JsonFormat::Json)`](JsonWriter::with_json_format).
137
fn new(buffer: W) -> Self {
138
JsonWriter {
139
buffer,
140
json_format: JsonFormat::JsonLines,
141
}
142
}
143
144
fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
145
df.align_chunks_par();
146
let fields = df
147
.iter()
148
.map(|s| {
149
#[cfg(feature = "object")]
150
polars_ensure!(!matches!(s.dtype(), DataType::Object(_)), ComputeError: "cannot write 'Object' datatype to json");
151
Ok(s.field().to_arrow(CompatLevel::newest()))
152
})
153
.collect::<PolarsResult<Vec<_>>>()?;
154
let batches = df
155
.iter_chunks(CompatLevel::newest(), false)
156
.map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef));
157
158
match self.json_format {
159
JsonFormat::JsonLines => {
160
let serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]);
161
let writer =
162
polars_json::ndjson::write::FileWriter::new(&mut self.buffer, serializer);
163
writer.collect::<PolarsResult<()>>()?;
164
},
165
JsonFormat::Json => {
166
let serializer = polars_json::json::write::Serializer::new(batches, vec![]);
167
polars_json::json::write::write(&mut self.buffer, serializer)?;
168
},
169
}
170
171
Ok(())
172
}
173
}
174
175
pub struct BatchedWriter<W: Write> {
176
writer: W,
177
}
178
179
impl<W> BatchedWriter<W>
180
where
181
W: Write,
182
{
183
pub fn new(writer: W) -> Self {
184
BatchedWriter { writer }
185
}
186
/// Write a batch to the json writer.
187
///
188
/// # Panics
189
/// The caller must ensure the chunks in the given [`DataFrame`] are aligned.
190
pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
191
let fields = df
192
.iter()
193
.map(|s| {
194
#[cfg(feature = "object")]
195
polars_ensure!(!matches!(s.dtype(), DataType::Object(_)), ComputeError: "cannot write 'Object' datatype to json");
196
Ok(s.field().to_arrow(CompatLevel::newest()))
197
})
198
.collect::<PolarsResult<Vec<_>>>()?;
199
let chunks = df.iter_chunks(CompatLevel::newest(), false);
200
let batches =
201
chunks.map(|chunk| Ok(Box::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef));
202
let mut serializer = polars_json::ndjson::write::Serializer::new(batches, vec![]);
203
while let Some(block) = serializer.next()? {
204
self.writer.write_all(block)?;
205
}
206
Ok(())
207
}
208
}
209
210
/// Reads JSON in one of the formats in [`JsonFormat`] into a DataFrame.
211
#[must_use]
212
pub struct JsonReader<'a, R>
213
where
214
R: MmapBytesReader,
215
{
216
reader: R,
217
rechunk: bool,
218
ignore_errors: bool,
219
infer_schema_len: Option<NonZeroUsize>,
220
batch_size: NonZeroUsize,
221
projection: Option<Vec<PlSmallStr>>,
222
schema: Option<SchemaRef>,
223
schema_overwrite: Option<&'a Schema>,
224
json_format: JsonFormat,
225
}
226
227
pub fn remove_bom(bytes: &[u8]) -> PolarsResult<&[u8]> {
228
if bytes.starts_with(&[0xEF, 0xBB, 0xBF]) {
229
// UTF-8 BOM
230
Ok(&bytes[3..])
231
} else if bytes.starts_with(&[0xFE, 0xFF]) || bytes.starts_with(&[0xFF, 0xFE]) {
232
// UTF-16 BOM
233
polars_bail!(ComputeError: "utf-16 not supported")
234
} else {
235
Ok(bytes)
236
}
237
}
238
impl<R> SerReader<R> for JsonReader<'_, R>
239
where
240
R: MmapBytesReader,
241
{
242
fn new(reader: R) -> Self {
243
JsonReader {
244
reader,
245
rechunk: true,
246
ignore_errors: false,
247
infer_schema_len: Some(NonZeroUsize::new(100).unwrap()),
248
batch_size: NonZeroUsize::new(8192).unwrap(),
249
projection: None,
250
schema: None,
251
schema_overwrite: None,
252
json_format: JsonFormat::Json,
253
}
254
}
255
256
fn set_rechunk(mut self, rechunk: bool) -> Self {
257
self.rechunk = rechunk;
258
self
259
}
260
261
/// Take the SerReader and return a parsed DataFrame.
262
///
263
/// Because JSON values specify their types (number, string, etc), no upcasting or conversion is performed between
264
/// incompatible types in the input. In the event that a column contains mixed dtypes, is it unspecified whether an
265
/// error is returned or whether elements of incompatible dtypes are replaced with `null`.
266
fn finish(mut self) -> PolarsResult<DataFrame> {
267
let pre_rb: ReaderBytes = (&mut self.reader).into();
268
let bytes = remove_bom(pre_rb.deref())?;
269
let rb = ReaderBytes::Borrowed(bytes);
270
let out = match self.json_format {
271
JsonFormat::Json => {
272
polars_ensure!(!self.ignore_errors, InvalidOperation: "'ignore_errors' only supported in ndjson");
273
let mut bytes = rb.deref().to_vec();
274
let owned = &mut vec![];
275
compression::maybe_decompress_bytes(&bytes, owned)?;
276
// the easiest way to avoid ownership issues is by implicitly figuring out if
277
// decompression happened (owned is only populated on decompress), then pick which bytes to parse
278
let json_value = if owned.is_empty() {
279
simd_json::to_borrowed_value(&mut bytes).map_err(to_compute_err)?
280
} else {
281
simd_json::to_borrowed_value(owned).map_err(to_compute_err)?
282
};
283
if let BorrowedValue::Array(array) = &json_value {
284
if array.is_empty() & self.schema.is_none() & self.schema_overwrite.is_none() {
285
return Ok(DataFrame::empty());
286
}
287
}
288
289
let allow_extra_fields_in_struct = self.schema.is_some();
290
291
// struct type
292
let dtype = if let Some(mut schema) = self.schema {
293
if let Some(overwrite) = self.schema_overwrite {
294
let mut_schema = Arc::make_mut(&mut schema);
295
overwrite_schema(mut_schema, overwrite)?;
296
}
297
298
DataType::Struct(schema.iter_fields().collect()).to_arrow(CompatLevel::newest())
299
} else {
300
// infer
301
let inner_dtype = if let BorrowedValue::Array(values) = &json_value {
302
infer::json_values_to_supertype(
303
values,
304
self.infer_schema_len
305
.unwrap_or(NonZeroUsize::new(usize::MAX).unwrap()),
306
)?
307
.to_arrow(CompatLevel::newest())
308
} else {
309
polars_json::json::infer(&json_value)?
310
};
311
312
if let Some(overwrite) = self.schema_overwrite {
313
let ArrowDataType::Struct(fields) = inner_dtype else {
314
polars_bail!(ComputeError: "can only deserialize json objects")
315
};
316
317
let mut schema = Schema::from_iter(fields.iter().map(Into::<Field>::into));
318
overwrite_schema(&mut schema, overwrite)?;
319
320
DataType::Struct(
321
schema
322
.into_iter()
323
.map(|(name, dt)| Field::new(name, dt))
324
.collect(),
325
)
326
.to_arrow(CompatLevel::newest())
327
} else {
328
inner_dtype
329
}
330
};
331
332
let dtype = if let BorrowedValue::Array(_) = &json_value {
333
ArrowDataType::LargeList(Box::new(arrow::datatypes::Field::new(
334
LIST_VALUES_NAME,
335
dtype,
336
true,
337
)))
338
} else {
339
dtype
340
};
341
342
let arr = polars_json::json::deserialize(
343
&json_value,
344
dtype,
345
allow_extra_fields_in_struct,
346
)?;
347
let arr = arr.as_any().downcast_ref::<StructArray>().ok_or_else(
348
|| polars_err!(ComputeError: "can only deserialize json objects"),
349
)?;
350
DataFrame::try_from(arr.clone())
351
},
352
JsonFormat::JsonLines => {
353
let mut json_reader = CoreJsonReader::new(
354
rb,
355
None,
356
self.schema,
357
self.schema_overwrite,
358
None,
359
1024, // sample size
360
NonZeroUsize::new(1 << 18).unwrap(),
361
false,
362
self.infer_schema_len,
363
self.ignore_errors,
364
None,
365
None,
366
None,
367
)?;
368
let mut df: DataFrame = json_reader.as_df()?;
369
if self.rechunk {
370
df.as_single_chunk_par();
371
}
372
Ok(df)
373
},
374
}?;
375
376
// TODO! Ensure we don't materialize the columns we don't need
377
if let Some(proj) = self.projection.as_deref() {
378
out.select(proj.iter().cloned())
379
} else {
380
Ok(out)
381
}
382
}
383
}
384
385
impl<'a, R> JsonReader<'a, R>
386
where
387
R: MmapBytesReader,
388
{
389
/// Set the JSON file's schema
390
pub fn with_schema(mut self, schema: SchemaRef) -> Self {
391
self.schema = Some(schema);
392
self
393
}
394
395
/// Overwrite parts of the inferred schema.
396
pub fn with_schema_overwrite(mut self, schema: &'a Schema) -> Self {
397
self.schema_overwrite = Some(schema);
398
self
399
}
400
401
/// Set the JSON reader to infer the schema of the file. Currently, this is only used when reading from
402
/// [`JsonFormat::JsonLines`], as [`JsonFormat::Json`] reads in the entire array anyway.
403
///
404
/// When using [`JsonFormat::JsonLines`], `max_records = None` will read the entire buffer in order to infer the
405
/// schema, `Some(1)` would look only at the first record, `Some(2)` the first two records, etc.
406
///
407
/// It is an error to pass `max_records = Some(0)`, as a schema cannot be inferred from 0 records when deserializing
408
/// from JSON (unlike CSVs, there is no header row to inspect for column names).
409
pub fn infer_schema_len(mut self, max_records: Option<NonZeroUsize>) -> Self {
410
self.infer_schema_len = max_records;
411
self
412
}
413
414
/// Set the batch size (number of records to load at one time)
415
///
416
/// This heavily influences loading time.
417
pub fn with_batch_size(mut self, batch_size: NonZeroUsize) -> Self {
418
self.batch_size = batch_size;
419
self
420
}
421
422
/// Set the reader's column projection: the names of the columns to keep after deserialization. If `None`, all
423
/// columns are kept.
424
///
425
/// Setting `projection` to the columns you want to keep is more efficient than deserializing all of the columns and
426
/// then dropping the ones you don't want.
427
pub fn with_projection(mut self, projection: Option<Vec<PlSmallStr>>) -> Self {
428
self.projection = projection;
429
self
430
}
431
432
pub fn with_json_format(mut self, format: JsonFormat) -> Self {
433
self.json_format = format;
434
self
435
}
436
437
/// Return a `null` if an error occurs during parsing.
438
pub fn with_ignore_errors(mut self, ignore: bool) -> Self {
439
self.ignore_errors = ignore;
440
self
441
}
442
}
443
444