Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sources/ndjson/chunk_reader.rs
8446 views
1
use polars_core::schema::SchemaRef;
2
use polars_error::PolarsResult;
3
use polars_io::ndjson;
4
use polars_io::prelude::{is_json_line, parse_ndjson};
5
#[cfg(feature = "scan_lines")]
6
use polars_utils::pl_str::PlSmallStr;
7
8
use crate::nodes::compute_node_prelude::*;
9
10
#[derive(Clone)]
11
pub enum ChunkReaderBuilder {
12
NDJson {
13
ignore_errors: bool,
14
},
15
#[cfg(feature = "scan_lines")]
16
Lines,
17
}
18
19
#[derive(Clone)]
20
pub enum ChunkReader {
21
/// NDJSON chunk reader.
22
NDJson {
23
projected_schema: SchemaRef,
24
ignore_errors: bool,
25
},
26
#[cfg(feature = "scan_lines")]
27
Lines {
28
/// If this is `None` we are projecting 0-width morsels.
29
projection: Option<PlSmallStr>,
30
},
31
}
32
33
impl ChunkReaderBuilder {
34
pub(super) fn build(&self, projected_schema: SchemaRef) -> ChunkReader {
35
match self {
36
Self::NDJson { ignore_errors } => ChunkReader::NDJson {
37
projected_schema,
38
ignore_errors: *ignore_errors,
39
},
40
#[cfg(feature = "scan_lines")]
41
Self::Lines => {
42
use polars_core::prelude::DataType;
43
44
assert!(projected_schema.len() <= 1);
45
46
let projection = projected_schema
47
.get_at_index(0)
48
.map(|(projected_name, dtype)| {
49
assert!(matches!(dtype, DataType::String));
50
projected_name.clone()
51
});
52
53
ChunkReader::Lines { projection }
54
},
55
}
56
}
57
58
pub(super) fn is_line_fn(&self) -> fn(&[u8]) -> bool {
59
match self {
60
Self::NDJson { .. } => is_json_line,
61
#[cfg(feature = "scan_lines")]
62
Self::Lines { .. } => |_: &[u8]| true,
63
}
64
}
65
}
66
67
impl ChunkReader {
68
pub(super) fn read_chunk(&self, chunk: &[u8]) -> PolarsResult<DataFrame> {
69
match self {
70
Self::NDJson {
71
projected_schema,
72
ignore_errors,
73
} => {
74
if projected_schema.is_empty() {
75
Ok(DataFrame::empty_with_height(ndjson::count_rows(chunk)))
76
} else {
77
parse_ndjson(chunk, None, projected_schema, *ignore_errors)
78
}
79
},
80
#[cfg(feature = "scan_lines")]
81
Self::Lines { projection } => {
82
use polars_core::prelude::IntoColumn;
83
use polars_core::series::Series;
84
use polars_io::scan_lines;
85
86
let Some(name) = projection else {
87
return Ok(DataFrame::empty_with_height(scan_lines::count_lines(chunk)));
88
};
89
90
let out: Series = scan_lines::split_lines_to_rows(chunk)?.with_name(name.clone());
91
let out = unsafe { DataFrame::new_unchecked(out.len(), vec![out.into_column()]) };
92
93
Ok(out)
94
},
95
}
96
}
97
}
98
99