Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/parquet/read/reader.rs
8450 views
1
use std::io::{Read, Seek};
2
use std::sync::Arc;
3
4
use arrow::datatypes::ArrowSchemaRef;
5
use polars_core::prelude::*;
6
use polars_parquet::read;
7
use polars_utils::pl_str::PlRefStr;
8
9
use super::read_impl::read_parquet;
10
use super::utils::{ensure_matching_dtypes_if_found, projected_arrow_schema_to_projection_indices};
11
use crate::RowIndex;
12
use crate::mmap::MmapBytesReader;
13
use crate::parquet::metadata::FileMetadataRef;
14
use crate::prelude::*;
15
16
/// Read Apache parquet format into a DataFrame.
17
#[must_use]
18
pub struct ParquetReader<R: Read + Seek> {
19
reader: R,
20
rechunk: bool,
21
slice: (usize, usize),
22
columns: Option<Vec<String>>,
23
projection: Option<Vec<usize>>,
24
parallel: ParallelStrategy,
25
schema: Option<ArrowSchemaRef>,
26
row_index: Option<RowIndex>,
27
low_memory: bool,
28
metadata: Option<FileMetadataRef>,
29
hive_partition_columns: Option<Vec<Series>>,
30
include_file_path: Option<(PlSmallStr, PlRefStr)>,
31
}
32
33
impl<R: MmapBytesReader> ParquetReader<R> {
34
/// Try to reduce memory pressure at the expense of performance. If setting this does not reduce memory
35
/// enough, turn off parallelization.
36
pub fn set_low_memory(mut self, low_memory: bool) -> Self {
37
self.low_memory = low_memory;
38
self
39
}
40
41
/// Read the parquet file in parallel (default). The single threaded reader consumes less memory.
42
pub fn read_parallel(mut self, parallel: ParallelStrategy) -> Self {
43
self.parallel = parallel;
44
self
45
}
46
47
pub fn with_slice(mut self, slice: Option<(usize, usize)>) -> Self {
48
self.slice = slice.unwrap_or((0, usize::MAX));
49
self
50
}
51
52
/// Columns to select/ project
53
pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
54
self.columns = columns;
55
self
56
}
57
58
/// Set the reader's column projection. This counts from 0, meaning that
59
/// `vec![0, 4]` would select the 1st and 5th column.
60
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
61
self.projection = projection;
62
self
63
}
64
65
/// Add a row index column.
66
pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
67
self.row_index = row_index;
68
self
69
}
70
71
/// Checks that the file contains all the columns in `projected_arrow_schema` with the same
72
/// dtype, and sets the projection indices.
73
pub fn with_arrow_schema_projection(
74
mut self,
75
first_schema: &Arc<ArrowSchema>,
76
projected_arrow_schema: Option<&ArrowSchema>,
77
allow_missing_columns: bool,
78
) -> PolarsResult<Self> {
79
let slf_schema = self.schema()?;
80
let slf_schema_width = slf_schema.len();
81
82
if allow_missing_columns {
83
// Must check the dtypes
84
ensure_matching_dtypes_if_found(
85
projected_arrow_schema.unwrap_or(first_schema.as_ref()),
86
self.schema()?.as_ref(),
87
)?;
88
self.schema = Some(Arc::new(
89
first_schema
90
.iter()
91
.map(|(name, field)| {
92
(name.clone(), slf_schema.get(name).unwrap_or(field).clone())
93
})
94
.collect(),
95
));
96
}
97
98
let schema = self.schema()?;
99
100
(|| {
101
if let Some(projected_arrow_schema) = projected_arrow_schema {
102
self.projection = projected_arrow_schema_to_projection_indices(
103
schema.as_ref(),
104
projected_arrow_schema,
105
)?;
106
} else {
107
if slf_schema_width > first_schema.len() {
108
polars_bail!(
109
SchemaMismatch:
110
"parquet file contained extra columns and no selection was given"
111
)
112
}
113
114
self.projection =
115
projected_arrow_schema_to_projection_indices(schema.as_ref(), first_schema)?;
116
};
117
Ok(())
118
})()
119
.map_err(|e| {
120
if !allow_missing_columns && matches!(e, PolarsError::ColumnNotFound(_)) {
121
e.wrap_msg(|s| {
122
format!(
123
"error with column selection, \
124
consider passing `missing_columns='insert'`: {s}"
125
)
126
})
127
} else {
128
e
129
}
130
})?;
131
132
Ok(self)
133
}
134
135
/// [`Schema`] of the file.
136
pub fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {
137
self.schema = Some(match &self.schema {
138
Some(schema) => schema.clone(),
139
None => {
140
let metadata = self.get_metadata()?;
141
Arc::new(read::infer_schema(metadata)?)
142
},
143
});
144
145
Ok(self.schema.clone().unwrap())
146
}
147
148
/// Number of rows in the parquet file.
149
pub fn num_rows(&mut self) -> PolarsResult<usize> {
150
let metadata = self.get_metadata()?;
151
Ok(metadata.num_rows)
152
}
153
154
pub fn with_hive_partition_columns(mut self, columns: Option<Vec<Series>>) -> Self {
155
self.hive_partition_columns = columns;
156
self
157
}
158
159
pub fn with_include_file_path(
160
mut self,
161
include_file_path: Option<(PlSmallStr, PlRefStr)>,
162
) -> Self {
163
self.include_file_path = include_file_path;
164
self
165
}
166
167
pub fn set_metadata(&mut self, metadata: FileMetadataRef) {
168
self.metadata = Some(metadata);
169
}
170
171
pub fn get_metadata(&mut self) -> PolarsResult<&FileMetadataRef> {
172
if self.metadata.is_none() {
173
self.metadata = Some(Arc::new(read::read_metadata(&mut self.reader)?));
174
}
175
Ok(self.metadata.as_ref().unwrap())
176
}
177
}
178
179
impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
180
/// Create a new [`ParquetReader`] from an existing `Reader`.
181
fn new(reader: R) -> Self {
182
ParquetReader {
183
reader,
184
rechunk: false,
185
slice: (0, usize::MAX),
186
columns: None,
187
projection: None,
188
parallel: Default::default(),
189
row_index: None,
190
low_memory: false,
191
metadata: None,
192
schema: None,
193
hive_partition_columns: None,
194
include_file_path: None,
195
}
196
}
197
198
fn set_rechunk(mut self, rechunk: bool) -> Self {
199
self.rechunk = rechunk;
200
self
201
}
202
203
fn finish(mut self) -> PolarsResult<DataFrame> {
204
let schema = self.schema()?;
205
let metadata = self.get_metadata()?.clone();
206
let n_rows = metadata.num_rows.min(self.slice.0 + self.slice.1);
207
208
if let Some(cols) = &self.columns {
209
self.projection = Some(columns_to_projection(cols, schema.as_ref())?);
210
}
211
212
let mut df = read_parquet(
213
self.reader,
214
self.slice,
215
self.projection.as_deref(),
216
&schema,
217
Some(metadata),
218
self.parallel,
219
self.row_index,
220
self.hive_partition_columns.as_deref(),
221
)?;
222
223
if self.rechunk {
224
df.rechunk_mut_par();
225
};
226
227
if let Some((col, value)) = &self.include_file_path {
228
unsafe {
229
df.push_column_unchecked(Column::new_scalar(
230
col.clone(),
231
Scalar::new(
232
DataType::String,
233
AnyValue::StringOwned(value.as_str().into()),
234
),
235
if df.width() > 0 { df.height() } else { n_rows },
236
))
237
};
238
}
239
240
Ok(df)
241
}
242
}
243
244