Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-json/src/ndjson/file.rs
6939 views
1
use std::io::BufRead;
2
use std::num::NonZeroUsize;
3
4
use arrow::datatypes::ArrowDataType;
5
use fallible_streaming_iterator::FallibleStreamingIterator;
6
use indexmap::IndexSet;
7
use polars_error::*;
8
use polars_utils::aliases::{PlIndexSet, PlRandomState};
9
use simd_json::BorrowedValue;
10
11
/// Reads up to a number of lines from `reader` into `rows` bounded by `limit`.
12
fn read_rows<R: BufRead>(reader: &mut R, rows: &mut [String], limit: usize) -> PolarsResult<usize> {
13
if limit == 0 {
14
return Ok(0);
15
}
16
let mut row_number = 0;
17
for row in rows.iter_mut() {
18
loop {
19
row.clear();
20
let _ = reader.read_line(row).map_err(|e| {
21
PolarsError::ComputeError(format!("{e} at line {row_number}").into())
22
})?;
23
if row.is_empty() {
24
break;
25
}
26
if !row.trim().is_empty() {
27
break;
28
}
29
}
30
if row.is_empty() {
31
break;
32
}
33
row_number += 1;
34
if row_number == limit {
35
break;
36
}
37
}
38
Ok(row_number)
39
}
40
41
/// A [`FallibleStreamingIterator`] of NDJSON rows.
42
///
43
/// This iterator is used to read chunks of an NDJSON in batches.
44
/// This iterator is guaranteed to yield at least one row.
45
/// # Implementation
46
/// Advancing this iterator is IO-bounded, but does require parsing each byte to find end of lines.
47
/// # Error
48
/// Advancing this iterator errors iff the reader errors.
49
pub struct FileReader<R: BufRead> {
50
reader: R,
51
rows: Vec<String>,
52
number_of_rows: usize,
53
remaining: usize,
54
}
55
56
impl<R: BufRead> FileReader<R> {
57
/// Creates a new [`FileReader`] from a reader and `rows`.
58
///
59
/// The number of items in `rows` denotes the batch size.
60
pub fn new(reader: R, rows: Vec<String>, limit: Option<usize>) -> Self {
61
Self {
62
reader,
63
rows,
64
remaining: limit.unwrap_or(usize::MAX),
65
number_of_rows: 0,
66
}
67
}
68
}
69
70
impl<R: BufRead> FallibleStreamingIterator for FileReader<R> {
71
type Error = PolarsError;
72
type Item = [String];
73
74
fn advance(&mut self) -> PolarsResult<()> {
75
self.number_of_rows = read_rows(&mut self.reader, &mut self.rows, self.remaining)?;
76
self.remaining -= self.number_of_rows;
77
Ok(())
78
}
79
80
fn get(&self) -> Option<&Self::Item> {
81
if self.number_of_rows > 0 {
82
Some(&self.rows[..self.number_of_rows])
83
} else {
84
None
85
}
86
}
87
}
88
89
fn parse_value<'a>(scratch: &'a mut Vec<u8>, val: &[u8]) -> PolarsResult<BorrowedValue<'a>> {
90
scratch.clear();
91
scratch.extend_from_slice(val);
92
// 0 because it is row by row
93
94
simd_json::to_borrowed_value(scratch)
95
.map_err(|e| PolarsError::ComputeError(format!("{e}").into()))
96
}
97
98
/// Infers the [`ArrowDataType`] from an NDJSON file, optionally only using `number_of_rows` rows.
99
///
100
/// # Implementation
101
/// This implementation reads the file line by line and infers the type of each line.
102
/// It performs both `O(N)` IO and CPU-bounded operations where `N` is the number of rows.
103
pub fn iter_unique_dtypes<R: std::io::BufRead>(
104
reader: &mut R,
105
number_of_rows: Option<NonZeroUsize>,
106
) -> PolarsResult<impl Iterator<Item = ArrowDataType>> {
107
if reader.fill_buf().map(|b| b.is_empty())? {
108
return Err(PolarsError::ComputeError(
109
"Cannot infer NDJSON types on empty reader because empty string is not a valid JSON value".into(),
110
));
111
}
112
113
let rows = vec!["".to_string(); 1]; // 1 <=> read row by row
114
let mut reader = FileReader::new(reader, rows, number_of_rows.map(|v| v.into()));
115
116
let mut dtypes = PlIndexSet::default();
117
let mut buf = vec![];
118
while let Some(rows) = reader.next()? {
119
// 0 because it is row by row
120
let value = parse_value(&mut buf, rows[0].as_bytes())?;
121
let dtype = crate::json::infer(&value)?;
122
dtypes.insert(dtype);
123
}
124
Ok(dtypes.into_iter())
125
}
126
127
/// Infers the [`ArrowDataType`] from an iterator of JSON strings. A limited number of
128
/// rows can be used by passing `rows.take(number_of_rows)` as an input.
129
///
130
/// # Implementation
131
/// This implementation infers each row by going through the entire iterator.
132
pub fn infer_iter<A: AsRef<str>>(rows: impl Iterator<Item = A>) -> PolarsResult<ArrowDataType> {
133
let mut dtypes = IndexSet::<_, PlRandomState>::default();
134
135
let mut buf = vec![];
136
for row in rows {
137
let v = parse_value(&mut buf, row.as_ref().as_bytes())?;
138
let dtype = crate::json::infer(&v)?;
139
if dtype != ArrowDataType::Null {
140
dtypes.insert(dtype);
141
}
142
}
143
144
let v: Vec<&ArrowDataType> = dtypes.iter().collect();
145
Ok(crate::json::infer_schema::coerce_dtype(&v))
146
}
147
148