Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-lazy/src/scan/ndjson.rs
6939 views
1
use std::num::NonZeroUsize;
2
use std::sync::Arc;
3
4
use polars_core::prelude::*;
5
use polars_io::cloud::CloudOptions;
6
use polars_io::{HiveOptions, RowIndex};
7
use polars_plan::dsl::{
8
CastColumnsPolicy, DslPlan, ExtraColumnsPolicy, FileScanDsl, MissingColumnsPolicy, ScanSources,
9
};
10
use polars_plan::prelude::{NDJsonReadOptions, UnifiedScanArgs};
11
use polars_utils::plpath::PlPath;
12
use polars_utils::slice_enum::Slice;
13
14
use crate::prelude::LazyFrame;
15
use crate::scan::file_list_reader::LazyFileListReader;
16
17
#[derive(Clone)]
18
pub struct LazyJsonLineReader {
19
pub(crate) sources: ScanSources,
20
pub(crate) batch_size: Option<NonZeroUsize>,
21
pub(crate) low_memory: bool,
22
pub(crate) rechunk: bool,
23
pub(crate) schema: Option<SchemaRef>,
24
pub(crate) schema_overwrite: Option<SchemaRef>,
25
pub(crate) row_index: Option<RowIndex>,
26
pub(crate) infer_schema_length: Option<NonZeroUsize>,
27
pub(crate) n_rows: Option<usize>,
28
pub(crate) ignore_errors: bool,
29
pub(crate) include_file_paths: Option<PlSmallStr>,
30
pub(crate) cloud_options: Option<CloudOptions>,
31
}
32
33
impl LazyJsonLineReader {
34
pub fn new_paths(paths: Arc<[PlPath]>) -> Self {
35
Self::new_with_sources(ScanSources::Paths(paths))
36
}
37
38
pub fn new_with_sources(sources: ScanSources) -> Self {
39
LazyJsonLineReader {
40
sources,
41
batch_size: None,
42
low_memory: false,
43
rechunk: false,
44
schema: None,
45
schema_overwrite: None,
46
row_index: None,
47
infer_schema_length: NonZeroUsize::new(100),
48
ignore_errors: false,
49
n_rows: None,
50
include_file_paths: None,
51
cloud_options: None,
52
}
53
}
54
55
pub fn new(path: PlPath) -> Self {
56
Self::new_with_sources(ScanSources::Paths([path].into()))
57
}
58
59
/// Add a row index column.
60
#[must_use]
61
pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
62
self.row_index = row_index;
63
self
64
}
65
66
/// Set values as `Null` if parsing fails because of schema mismatches.
67
#[must_use]
68
pub fn with_ignore_errors(mut self, ignore_errors: bool) -> Self {
69
self.ignore_errors = ignore_errors;
70
self
71
}
72
/// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
73
/// be guaranteed.
74
#[must_use]
75
pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
76
self.n_rows = num_rows;
77
self
78
}
79
/// Set the number of rows to use when inferring the json schema.
80
/// the default is 100 rows.
81
/// Ignored when the schema is specified explicitly using [`Self::with_schema`].
82
/// Setting to `None` will do a full table scan, very slow.
83
#[must_use]
84
pub fn with_infer_schema_length(mut self, num_rows: Option<NonZeroUsize>) -> Self {
85
self.infer_schema_length = num_rows;
86
self
87
}
88
/// Set the JSON file's schema
89
#[must_use]
90
pub fn with_schema(mut self, schema: Option<SchemaRef>) -> Self {
91
self.schema = schema;
92
self
93
}
94
95
/// Set the JSON file's schema
96
#[must_use]
97
pub fn with_schema_overwrite(mut self, schema_overwrite: Option<SchemaRef>) -> Self {
98
self.schema_overwrite = schema_overwrite;
99
self
100
}
101
102
/// Reduce memory usage at the expense of performance
103
#[must_use]
104
pub fn low_memory(mut self, toggle: bool) -> Self {
105
self.low_memory = toggle;
106
self
107
}
108
109
#[must_use]
110
pub fn with_batch_size(mut self, batch_size: Option<NonZeroUsize>) -> Self {
111
self.batch_size = batch_size;
112
self
113
}
114
115
pub fn with_cloud_options(mut self, cloud_options: Option<CloudOptions>) -> Self {
116
self.cloud_options = cloud_options;
117
self
118
}
119
120
pub fn with_include_file_paths(mut self, include_file_paths: Option<PlSmallStr>) -> Self {
121
self.include_file_paths = include_file_paths;
122
self
123
}
124
}
125
126
impl LazyFileListReader for LazyJsonLineReader {
127
fn finish(self) -> PolarsResult<LazyFrame> {
128
let unified_scan_args = UnifiedScanArgs {
129
schema: None,
130
cloud_options: self.cloud_options,
131
hive_options: HiveOptions::new_disabled(),
132
rechunk: self.rechunk,
133
cache: false,
134
glob: true,
135
projection: None,
136
column_mapping: None,
137
default_values: None,
138
row_index: self.row_index,
139
pre_slice: self.n_rows.map(|len| Slice::Positive { offset: 0, len }),
140
cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
141
missing_columns_policy: MissingColumnsPolicy::Raise,
142
extra_columns_policy: ExtraColumnsPolicy::Raise,
143
include_file_paths: self.include_file_paths,
144
deletion_files: None,
145
};
146
147
let options = NDJsonReadOptions {
148
n_threads: None,
149
infer_schema_length: self.infer_schema_length,
150
chunk_size: NonZeroUsize::new(1 << 18).unwrap(),
151
low_memory: self.low_memory,
152
ignore_errors: self.ignore_errors,
153
schema: self.schema,
154
schema_overwrite: self.schema_overwrite,
155
};
156
157
let scan_type = Box::new(FileScanDsl::NDJson { options });
158
159
Ok(LazyFrame::from(DslPlan::Scan {
160
sources: self.sources,
161
unified_scan_args: Box::new(unified_scan_args),
162
scan_type,
163
cached_ir: Default::default(),
164
}))
165
}
166
167
fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
168
unreachable!();
169
}
170
171
fn sources(&self) -> &ScanSources {
172
&self.sources
173
}
174
175
fn with_sources(mut self, sources: ScanSources) -> Self {
176
self.sources = sources;
177
self
178
}
179
180
fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
181
self.n_rows = n_rows.into();
182
self
183
}
184
185
fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
186
self.row_index = row_index.into();
187
self
188
}
189
190
fn rechunk(&self) -> bool {
191
self.rechunk
192
}
193
194
/// Rechunk the memory to contiguous chunks when parsing is done.
195
fn with_rechunk(mut self, toggle: bool) -> Self {
196
self.rechunk = toggle;
197
self
198
}
199
200
/// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
201
/// be guaranteed.
202
fn n_rows(&self) -> Option<usize> {
203
self.n_rows
204
}
205
206
/// Add a row index column.
207
fn row_index(&self) -> Option<&RowIndex> {
208
self.row_index.as_ref()
209
}
210
211
/// [CloudOptions] used to list files.
212
fn cloud_options(&self) -> Option<&CloudOptions> {
213
self.cloud_options.as_ref()
214
}
215
}
216
217