Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/ndjson/buffer.rs
6939 views
1
use std::hash::{Hash, Hasher};
2
3
use arrow::types::NativeType;
4
use num_traits::NumCast;
5
use polars_core::frame::row::AnyValueBuffer;
6
use polars_core::prelude::*;
7
#[cfg(any(feature = "dtype-datetime", feature = "dtype-date"))]
8
use polars_time::prelude::string::Pattern;
9
#[cfg(any(feature = "dtype-datetime", feature = "dtype-date"))]
10
use polars_time::prelude::string::infer::{DatetimeInfer, TryFromWithUnit, infer_pattern_single};
11
use polars_utils::format_pl_smallstr;
12
use simd_json::{BorrowedValue as Value, KnownKey, StaticNode};
13
14
#[derive(Debug, Clone, PartialEq)]
15
pub(crate) struct BufferKey<'a>(pub(crate) KnownKey<'a>);
16
impl Eq for BufferKey<'_> {}
17
18
impl Hash for BufferKey<'_> {
19
fn hash<H: Hasher>(&self, state: &mut H) {
20
self.0.key().hash(state)
21
}
22
}
23
24
pub(crate) struct Buffer<'a> {
25
name: &'a str,
26
ignore_errors: bool,
27
buf: AnyValueBuffer<'a>,
28
}
29
30
impl Buffer<'_> {
31
pub fn into_series(self) -> PolarsResult<Series> {
32
let mut buf = self.buf;
33
let mut s = buf.reset(0, !self.ignore_errors)?;
34
s.rename(PlSmallStr::from_str(self.name));
35
Ok(s)
36
}
37
38
#[inline]
39
pub(crate) fn add(&mut self, value: &Value) -> PolarsResult<()> {
40
use AnyValueBuffer::*;
41
match &mut self.buf {
42
Boolean(buf) => {
43
match value {
44
Value::Static(StaticNode::Bool(b)) => buf.append_value(*b),
45
Value::Static(StaticNode::Null) => buf.append_null(),
46
_ if self.ignore_errors => buf.append_null(),
47
v => polars_bail!(ComputeError: "cannot parse '{}' as Boolean", v),
48
}
49
Ok(())
50
},
51
Int32(buf) => {
52
let n = deserialize_number::<i32>(value, self.ignore_errors)?;
53
match n {
54
Some(v) => buf.append_value(v),
55
None => buf.append_null(),
56
}
57
Ok(())
58
},
59
Int64(buf) => {
60
let n = deserialize_number::<i64>(value, self.ignore_errors)?;
61
match n {
62
Some(v) => buf.append_value(v),
63
None => buf.append_null(),
64
}
65
Ok(())
66
},
67
UInt64(buf) => {
68
let n = deserialize_number::<u64>(value, self.ignore_errors)?;
69
match n {
70
Some(v) => buf.append_value(v),
71
None => buf.append_null(),
72
}
73
Ok(())
74
},
75
UInt32(buf) => {
76
let n = deserialize_number::<u32>(value, self.ignore_errors)?;
77
match n {
78
Some(v) => buf.append_value(v),
79
None => buf.append_null(),
80
}
81
Ok(())
82
},
83
Float32(buf) => {
84
let n = deserialize_number::<f32>(value, self.ignore_errors)?;
85
match n {
86
Some(v) => buf.append_value(v),
87
None => buf.append_null(),
88
}
89
Ok(())
90
},
91
Float64(buf) => {
92
let n = deserialize_number::<f64>(value, self.ignore_errors)?;
93
match n {
94
Some(v) => buf.append_value(v),
95
None => buf.append_null(),
96
}
97
Ok(())
98
},
99
100
String(buf) => {
101
match value {
102
Value::String(v) => buf.append_value(v),
103
Value::Static(StaticNode::Null) => buf.append_null(),
104
// Forcibly convert to String using the Display impl.
105
v => buf.append_value(format_pl_smallstr!("{}", ValueDisplay(v))),
106
}
107
Ok(())
108
},
109
#[cfg(feature = "dtype-datetime")]
110
Datetime(buf, tu, _) => {
111
let v =
112
deserialize_datetime::<Int64Type>(value, "Datetime", self.ignore_errors, *tu)?;
113
buf.append_option(v);
114
Ok(())
115
},
116
#[cfg(feature = "dtype-date")]
117
Date(buf) => {
118
let v = deserialize_datetime::<Int32Type>(
119
value,
120
"Date",
121
self.ignore_errors,
122
TimeUnit::Microseconds, // ignored
123
)?;
124
buf.append_option(v);
125
Ok(())
126
},
127
All(dtype, buf) => {
128
let av = deserialize_all(value, dtype, self.ignore_errors)?;
129
buf.push(av);
130
Ok(())
131
},
132
Null(builder) => {
133
if !(matches!(value, Value::Static(StaticNode::Null)) || self.ignore_errors) {
134
polars_bail!(ComputeError: "got non-null value for NULL-typed column: {}", value)
135
};
136
137
builder.append_null();
138
Ok(())
139
},
140
_ => panic!("unexpected dtype when deserializing ndjson"),
141
}
142
}
143
144
pub fn add_null(&mut self) {
145
self.buf.add(AnyValue::Null).expect("should not fail");
146
}
147
}
148
pub(crate) fn init_buffers(
149
schema: &Schema,
150
capacity: usize,
151
ignore_errors: bool,
152
) -> PolarsResult<PlIndexMap<BufferKey<'_>, Buffer<'_>>> {
153
schema
154
.iter()
155
.map(|(name, dtype)| {
156
let av_buf = (dtype, capacity).into();
157
let key = KnownKey::from(name.as_str());
158
Ok((
159
BufferKey(key),
160
Buffer {
161
name,
162
buf: av_buf,
163
ignore_errors,
164
},
165
))
166
})
167
.collect()
168
}
169
170
fn deserialize_number<T: NativeType + NumCast>(
171
value: &Value,
172
ignore_errors: bool,
173
) -> PolarsResult<Option<T>> {
174
let to_result = |x: Option<T>| {
175
let out = if ignore_errors {
176
x
177
} else {
178
Some(x.ok_or_else(|| {
179
polars_err!(ComputeError: "cannot parse '{}' as {:?}", value, T::PRIMITIVE
180
)
181
})?)
182
};
183
184
Ok(out)
185
};
186
187
match value {
188
Value::Static(StaticNode::F64(f)) => to_result(num_traits::cast(*f)),
189
Value::Static(StaticNode::I64(i)) => to_result(num_traits::cast(*i)),
190
Value::Static(StaticNode::U64(u)) => to_result(num_traits::cast(*u)),
191
Value::Static(StaticNode::Bool(b)) => to_result(num_traits::cast(*b as i32)),
192
Value::Static(StaticNode::Null) => Ok(None),
193
_ => to_result(None),
194
}
195
}
196
197
#[cfg(feature = "dtype-datetime")]
198
fn deserialize_datetime<T>(
199
value: &Value,
200
type_name: &str,
201
ignore_errors: bool,
202
tu: TimeUnit,
203
) -> PolarsResult<Option<T::Native>>
204
where
205
T: PolarsNumericType,
206
DatetimeInfer<T>: TryFromWithUnit<Pattern>,
207
{
208
match value {
209
Value::String(val) => {
210
if let Some(pattern) = infer_pattern_single(val) {
211
if let Ok(mut infer) = DatetimeInfer::try_from_with_unit(pattern, Some(tu)) {
212
if let Some(v) = infer.parse(val) {
213
return Ok(Some(v));
214
}
215
}
216
}
217
},
218
Value::Static(StaticNode::Null) => return Ok(None),
219
_ => {},
220
};
221
222
if ignore_errors {
223
return Ok(None);
224
}
225
226
polars_bail!(ComputeError: "cannot parse '{}' as {}", value, type_name)
227
}
228
229
fn deserialize_all<'a>(
230
json: &Value,
231
dtype: &DataType,
232
ignore_errors: bool,
233
) -> PolarsResult<AnyValue<'a>> {
234
if let Value::Static(StaticNode::Null) = json {
235
return Ok(AnyValue::Null);
236
}
237
match dtype {
238
#[cfg(feature = "dtype-datetime")]
239
DataType::Date => {
240
let value = deserialize_datetime::<Int32Type>(
241
json,
242
"Date",
243
ignore_errors,
244
TimeUnit::Microseconds, // ignored
245
)?;
246
return Ok(if let Some(value) = value {
247
AnyValue::Date(value)
248
} else {
249
AnyValue::Null
250
});
251
},
252
#[cfg(feature = "dtype-datetime")]
253
DataType::Datetime(tu, tz) => {
254
let value = deserialize_datetime::<Int64Type>(json, "Datetime", ignore_errors, *tu)?;
255
return Ok(if let Some(value) = value {
256
AnyValue::DatetimeOwned(value, *tu, tz.as_ref().map(|s| Arc::from(s.clone())))
257
} else {
258
AnyValue::Null
259
});
260
},
261
DataType::Float32 => {
262
return Ok(
263
if let Some(v) = deserialize_number::<f32>(json, ignore_errors)? {
264
AnyValue::Float32(v)
265
} else {
266
AnyValue::Null
267
},
268
);
269
},
270
DataType::Float64 => {
271
return Ok(
272
if let Some(v) = deserialize_number::<f64>(json, ignore_errors)? {
273
AnyValue::Float64(v)
274
} else {
275
AnyValue::Null
276
},
277
);
278
},
279
DataType::String => {
280
return Ok(match json {
281
Value::String(s) => AnyValue::StringOwned(s.as_ref().into()),
282
v => AnyValue::StringOwned(format_pl_smallstr!("{}", ValueDisplay(v))),
283
});
284
},
285
dt if dt.is_primitive_numeric() => {
286
return Ok(
287
if let Some(v) = deserialize_number::<i128>(json, ignore_errors)? {
288
AnyValue::Int128(v).cast(dt).into_static()
289
} else {
290
AnyValue::Null
291
},
292
);
293
},
294
_ => {},
295
}
296
297
let out = match json {
298
Value::Static(StaticNode::Bool(b)) => AnyValue::Boolean(*b),
299
Value::Static(StaticNode::I64(i)) => AnyValue::Int64(*i),
300
Value::Static(StaticNode::U64(u)) => AnyValue::UInt64(*u),
301
Value::Static(StaticNode::F64(f)) => AnyValue::Float64(*f),
302
Value::String(s) => AnyValue::StringOwned(s.as_ref().into()),
303
Value::Array(arr) => {
304
let Some(inner_dtype) = dtype.inner_dtype() else {
305
if ignore_errors {
306
return Ok(AnyValue::Null);
307
}
308
polars_bail!(ComputeError: "expected dtype '{}' in JSON value, got dtype: Array\n\nEncountered value: {}", dtype, json);
309
};
310
let vals: Vec<AnyValue> = arr
311
.iter()
312
.map(|val| deserialize_all(val, inner_dtype, ignore_errors))
313
.collect::<PolarsResult<_>>()?;
314
let strict = !ignore_errors;
315
let s =
316
Series::from_any_values_and_dtype(PlSmallStr::EMPTY, &vals, inner_dtype, strict)?;
317
AnyValue::List(s)
318
},
319
#[cfg(feature = "dtype-struct")]
320
Value::Object(doc) => {
321
if let DataType::Struct(fields) = dtype {
322
let document = &**doc;
323
324
let vals = fields
325
.iter()
326
.map(|field| {
327
if let Some(value) = document.get(field.name.as_str()) {
328
deserialize_all(value, &field.dtype, ignore_errors)
329
} else {
330
Ok(AnyValue::Null)
331
}
332
})
333
.collect::<PolarsResult<Vec<_>>>()?;
334
AnyValue::StructOwned(Box::new((vals, fields.clone())))
335
} else {
336
if ignore_errors {
337
return Ok(AnyValue::Null);
338
}
339
polars_bail!(
340
ComputeError: "expected {} in json value, got object", dtype,
341
);
342
}
343
},
344
val => AnyValue::StringOwned(format!("{val:#?}").into()),
345
};
346
Ok(out)
347
}
348
349
/// Wrapper for serde_json's `Value` with a human-friendly Display impl for nested types:
350
///
351
/// * Default: `{"x": Static(U64(1))}`
352
/// * ValueDisplay: `{x: 1}`
353
///
354
/// This intended for reading in arbitrary `Value` types into a String type. Note that the output
355
/// is not guaranteed to be valid JSON as we don't do any escaping of e.g. quote/newline values.
356
struct ValueDisplay<'a>(&'a Value<'a>);
357
358
impl std::fmt::Display for ValueDisplay<'_> {
359
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
360
use Value::*;
361
362
match self.0 {
363
Static(s) => write!(f, "{s}"),
364
String(s) => write!(f, r#""{s}""#),
365
Array(a) => {
366
write!(f, "[")?;
367
368
let mut iter = a.iter();
369
370
for v in (&mut iter).take(1) {
371
write!(f, "{}", ValueDisplay(v))?;
372
}
373
374
for v in iter {
375
write!(f, ", {}", ValueDisplay(v))?;
376
}
377
378
write!(f, "]")
379
},
380
Object(o) => {
381
write!(f, "{{")?;
382
383
let mut iter = o.iter();
384
385
for (k, v) in (&mut iter).take(1) {
386
write!(f, r#""{}": {}"#, k, ValueDisplay(v))?;
387
}
388
389
for (k, v) in iter {
390
write!(f, r#", "{}": {}"#, k, ValueDisplay(v))?;
391
}
392
393
write!(f, "}}")
394
},
395
}
396
}
397
}
398
399