Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/ndjson/buffer.rs
8415 views
1
use std::hash::{Hash, Hasher};
2
3
use polars_core::frame::row::AnyValueBuffer;
4
use polars_core::prelude::*;
5
#[cfg(any(feature = "dtype-datetime", feature = "dtype-date"))]
6
use polars_time::prelude::string::Pattern;
7
#[cfg(any(feature = "dtype-datetime", feature = "dtype-date"))]
8
use polars_time::prelude::string::infer::{DatetimeInfer, TryFromWithUnit, infer_pattern_single};
9
use polars_utils::format_pl_smallstr;
10
use simd_json::prelude::*;
11
use simd_json::{BorrowedValue as Value, KnownKey, StaticNode};
12
13
#[derive(Debug, Clone, PartialEq)]
14
pub(crate) struct BufferKey<'a>(pub(crate) KnownKey<'a>);
15
impl Eq for BufferKey<'_> {}
16
17
impl Hash for BufferKey<'_> {
18
fn hash<H: Hasher>(&self, state: &mut H) {
19
self.0.key().hash(state)
20
}
21
}
22
23
pub(crate) struct Buffer<'a> {
24
name: &'a str,
25
ignore_errors: bool,
26
buf: AnyValueBuffer<'a>,
27
}
28
29
impl Buffer<'_> {
30
pub fn into_series(self) -> PolarsResult<Series> {
31
let mut buf = self.buf;
32
let mut s = buf.reset(0, !self.ignore_errors)?;
33
s.rename(PlSmallStr::from_str(self.name));
34
Ok(s)
35
}
36
37
#[inline]
38
pub(crate) fn add(&mut self, value: &Value) -> PolarsResult<()> {
39
use AnyValueBuffer::*;
40
match &mut self.buf {
41
_ if value.is_null() => {
42
self.buf.add(AnyValue::Null);
43
Ok(())
44
},
45
Boolean(buf) => {
46
match value {
47
Value::Static(StaticNode::Bool(v)) => buf.append_value(*v),
48
Value::Static(StaticNode::Null) => buf.append_null(),
49
_ if self.ignore_errors => buf.append_null(),
50
v => {
51
polars_bail!(ComputeError: "cannot parse '{}' ({}) as Boolean", v, v.value_type())
52
},
53
}
54
Ok(())
55
},
56
Int32(buf) => {
57
let v =
58
deserialize_numeric::<Int32Type>(value, value.as_i32(), self.ignore_errors)?;
59
buf.append_option(v);
60
Ok(())
61
},
62
Int64(buf) => {
63
let v =
64
deserialize_numeric::<Int64Type>(value, value.as_i64(), self.ignore_errors)?;
65
buf.append_option(v);
66
Ok(())
67
},
68
UInt64(buf) => {
69
let v =
70
deserialize_numeric::<UInt64Type>(value, value.as_u64(), self.ignore_errors)?;
71
buf.append_option(v);
72
Ok(())
73
},
74
UInt32(buf) => {
75
let v =
76
deserialize_numeric::<UInt32Type>(value, value.as_u32(), self.ignore_errors)?;
77
buf.append_option(v);
78
Ok(())
79
},
80
Float32(buf) => {
81
let v = deserialize_numeric::<Float32Type>(
82
value,
83
value.cast_f64().map(|f| f as f32),
84
self.ignore_errors,
85
)?;
86
buf.append_option(v);
87
Ok(())
88
},
89
Float64(buf) => {
90
let v = deserialize_numeric::<Float64Type>(
91
value,
92
value.cast_f64(),
93
self.ignore_errors,
94
)?;
95
buf.append_option(v);
96
Ok(())
97
},
98
String(buf) => {
99
match value {
100
Value::String(v) => buf.append_value(v),
101
// Forcibly convert to String using the Display impl.
102
v => buf.append_value(format_pl_smallstr!("{}", ValueDisplay(v))),
103
}
104
Ok(())
105
},
106
#[cfg(feature = "dtype-datetime")]
107
Datetime(buf, tu, _) => {
108
let v =
109
deserialize_datetime::<Int64Type>(value, "Datetime", self.ignore_errors, *tu)?;
110
buf.append_option(v);
111
Ok(())
112
},
113
#[cfg(feature = "dtype-date")]
114
Date(buf) => {
115
let v = deserialize_datetime::<Int32Type>(
116
value,
117
"Date",
118
self.ignore_errors,
119
TimeUnit::Microseconds, // ignored
120
)?;
121
buf.append_option(v);
122
Ok(())
123
},
124
All(dtype, buf) => {
125
let av = deserialize_all(value, dtype, self.ignore_errors)?;
126
buf.push(av);
127
Ok(())
128
},
129
Null(builder) => {
130
if !(matches!(value, Value::Static(StaticNode::Null)) || self.ignore_errors) {
131
polars_bail!(ComputeError: "got non-null value for NULL-typed column: {}", value)
132
};
133
134
builder.append_null();
135
Ok(())
136
},
137
_ => panic!("unexpected dtype when deserializing ndjson"),
138
}
139
}
140
141
pub fn add_null(&mut self) {
142
self.buf.add(AnyValue::Null).expect("should not fail");
143
}
144
}
145
pub(crate) fn init_buffers(
146
schema: &Schema,
147
capacity: usize,
148
ignore_errors: bool,
149
) -> PolarsResult<PlIndexMap<BufferKey<'_>, Buffer<'_>>> {
150
schema
151
.iter()
152
.map(|(name, dtype)| {
153
let av_buf = (dtype, capacity).into();
154
let key = KnownKey::from(name.as_str());
155
Ok((
156
BufferKey(key),
157
Buffer {
158
name,
159
buf: av_buf,
160
ignore_errors,
161
},
162
))
163
})
164
.collect()
165
}
166
167
fn deserialize_numeric<T: PolarsNumericType>(
168
value: &Value,
169
n: Option<T::Native>,
170
ignore_errors: bool,
171
) -> PolarsResult<Option<T::Native>> {
172
match n {
173
Some(v) => Ok(Some(v)),
174
None if ignore_errors => Ok(None),
175
None => Err(
176
polars_err!(ComputeError: "cannot parse '{}' ({}) as {:?}", value, value.value_type(), T::get_static_dtype()),
177
),
178
}
179
}
180
181
#[cfg(feature = "dtype-datetime")]
182
fn deserialize_datetime<T>(
183
value: &Value,
184
type_name: &str,
185
ignore_errors: bool,
186
tu: TimeUnit,
187
) -> PolarsResult<Option<T::Native>>
188
where
189
T: PolarsNumericType,
190
DatetimeInfer<T>: TryFromWithUnit<Pattern>,
191
{
192
match value {
193
Value::String(val) => {
194
if let Some(pattern) = infer_pattern_single(val) {
195
if let Ok(mut infer) = DatetimeInfer::try_from_with_unit(pattern, Some(tu)) {
196
if let Some(v) = infer.parse(val) {
197
return Ok(Some(v));
198
}
199
}
200
}
201
},
202
Value::Static(StaticNode::Null) => return Ok(None),
203
_ => {},
204
};
205
206
if ignore_errors {
207
return Ok(None);
208
}
209
210
polars_bail!(ComputeError: "cannot parse '{}' ({}) as {}", value, value.value_type(), type_name)
211
}
212
213
fn deserialize_all<'a>(
214
json: &Value,
215
dtype: &DataType,
216
ignore_errors: bool,
217
) -> PolarsResult<AnyValue<'a>> {
218
if let Value::Static(StaticNode::Null) = json {
219
return Ok(AnyValue::Null);
220
}
221
match dtype {
222
#[cfg(feature = "dtype-datetime")]
223
DataType::Date => {
224
let value = deserialize_datetime::<Int32Type>(
225
json,
226
"Date",
227
ignore_errors,
228
TimeUnit::Microseconds, // ignored
229
)?;
230
return Ok(if let Some(value) = value {
231
AnyValue::Date(value)
232
} else {
233
AnyValue::Null
234
});
235
},
236
#[cfg(feature = "dtype-datetime")]
237
DataType::Datetime(tu, tz) => {
238
let value = deserialize_datetime::<Int64Type>(json, "Datetime", ignore_errors, *tu)?;
239
return Ok(if let Some(value) = value {
240
AnyValue::DatetimeOwned(value, *tu, tz.as_ref().map(|s| Arc::from(s.clone())))
241
} else {
242
AnyValue::Null
243
});
244
},
245
#[cfg(feature = "dtype-f16")]
246
dt @ DataType::Float16 => {
247
use num_traits::AsPrimitive;
248
use polars_utils::float16::pf16;
249
250
return match json.cast_f64() {
251
Some(v) => Ok(AnyValue::Float16(AsPrimitive::<pf16>::as_(v))),
252
None if ignore_errors => Ok(AnyValue::Null),
253
None => {
254
polars_bail!(ComputeError: "cannot parse '{}' ({}) as {}", json, json.value_type(), dt)
255
},
256
};
257
},
258
dt @ DataType::Float32 => {
259
return match json.cast_f64() {
260
Some(v) => Ok(AnyValue::Float32(v as f32)),
261
None if ignore_errors => Ok(AnyValue::Null),
262
None => {
263
polars_bail!(ComputeError: "cannot parse '{}' ({}) as {}", json, json.value_type(), dt)
264
},
265
};
266
},
267
dt @ DataType::Float64 => {
268
return match json.cast_f64() {
269
Some(v) => Ok(AnyValue::Float64(v)),
270
None if ignore_errors => Ok(AnyValue::Null),
271
None => {
272
polars_bail!(ComputeError: "cannot parse '{}' ({}) as {}", json, json.value_type(), dt)
273
},
274
};
275
},
276
DataType::String => {
277
return Ok(match json {
278
Value::String(s) => AnyValue::StringOwned(s.as_ref().into()),
279
v => AnyValue::StringOwned(format_pl_smallstr!("{}", ValueDisplay(v))),
280
});
281
},
282
dt if dt.is_primitive_numeric() => {
283
return match json.as_i128() {
284
Some(v) => Ok(AnyValue::Int128(v).into_static()),
285
None if ignore_errors => Ok(AnyValue::Null),
286
None => {
287
polars_bail!(ComputeError: "cannot parse '{}' ({}) as {}", json, json.value_type(), dt)
288
},
289
};
290
},
291
_ => {},
292
}
293
294
let out = match json {
295
Value::Static(StaticNode::Bool(b)) => AnyValue::Boolean(*b),
296
Value::Static(StaticNode::I64(i)) => AnyValue::Int64(*i),
297
Value::Static(StaticNode::U64(u)) => AnyValue::UInt64(*u),
298
Value::Static(StaticNode::F64(f)) => AnyValue::Float64(*f),
299
Value::String(s) => AnyValue::StringOwned(s.as_ref().into()),
300
Value::Array(arr) => {
301
let Some(inner_dtype) = dtype.inner_dtype() else {
302
if ignore_errors {
303
return Ok(AnyValue::Null);
304
}
305
polars_bail!(ComputeError: "expected dtype '{}' in JSON value, got dtype: Array\n\nEncountered value: {}", dtype, json);
306
};
307
let vals: Vec<AnyValue> = arr
308
.iter()
309
.map(|val| deserialize_all(val, inner_dtype, ignore_errors))
310
.collect::<PolarsResult<_>>()?;
311
let strict = !ignore_errors;
312
let s =
313
Series::from_any_values_and_dtype(PlSmallStr::EMPTY, &vals, inner_dtype, strict)?;
314
AnyValue::List(s)
315
},
316
#[cfg(feature = "dtype-struct")]
317
Value::Object(doc) => {
318
if let DataType::Struct(fields) = dtype {
319
let document = &**doc;
320
321
let vals = fields
322
.iter()
323
.map(|field| {
324
if let Some(value) = document.get(field.name.as_str()) {
325
deserialize_all(value, &field.dtype, ignore_errors)
326
} else {
327
Ok(AnyValue::Null)
328
}
329
})
330
.collect::<PolarsResult<Vec<_>>>()?;
331
AnyValue::StructOwned(Box::new((vals, fields.clone())))
332
} else {
333
if ignore_errors {
334
return Ok(AnyValue::Null);
335
}
336
polars_bail!(
337
ComputeError: "expected {} in json value, got object", dtype,
338
);
339
}
340
},
341
val => AnyValue::StringOwned(format!("{val:#?}").into()),
342
};
343
Ok(out)
344
}
345
346
/// Wrapper for serde_json's `Value` with a human-friendly Display impl for nested types:
347
///
348
/// * Default: `{"x": Static(U64(1))}`
349
/// * ValueDisplay: `{x: 1}`
350
///
351
/// This intended for reading in arbitrary `Value` types into a String type. Note that the output
352
/// is not guaranteed to be valid JSON as we don't do any escaping of e.g. quote/newline values.
353
struct ValueDisplay<'a>(&'a Value<'a>);
354
355
impl std::fmt::Display for ValueDisplay<'_> {
356
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
357
use Value::*;
358
359
match self.0 {
360
Static(s) => write!(f, "{s}"),
361
String(s) => write!(f, r#""{s}""#),
362
Array(a) => {
363
write!(f, "[")?;
364
365
let mut iter = a.iter();
366
367
for v in (&mut iter).take(1) {
368
write!(f, "{}", ValueDisplay(v))?;
369
}
370
371
for v in iter {
372
write!(f, ", {}", ValueDisplay(v))?;
373
}
374
375
write!(f, "]")
376
},
377
Object(o) => {
378
write!(f, "{{")?;
379
380
let mut iter = o.iter();
381
382
for (k, v) in (&mut iter).take(1) {
383
write!(f, r#""{}": {}"#, k, ValueDisplay(v))?;
384
}
385
386
for (k, v) in iter {
387
write!(f, r#", "{}": {}"#, k, ValueDisplay(v))?;
388
}
389
390
write!(f, "}}")
391
},
392
}
393
}
394
}
395
396