Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/parquet/read/reader.rs
6940 views
1
use std::io::{Read, Seek};
2
use std::sync::Arc;
3
4
use arrow::datatypes::ArrowSchemaRef;
5
use polars_core::prelude::*;
6
use polars_parquet::read;
7
8
use super::read_impl::read_parquet;
9
use super::utils::{ensure_matching_dtypes_if_found, projected_arrow_schema_to_projection_indices};
10
use crate::RowIndex;
11
use crate::mmap::MmapBytesReader;
12
use crate::parquet::metadata::FileMetadataRef;
13
use crate::prelude::*;
14
15
/// Read Apache parquet format into a DataFrame.
16
#[must_use]
17
pub struct ParquetReader<R: Read + Seek> {
18
reader: R,
19
rechunk: bool,
20
slice: (usize, usize),
21
columns: Option<Vec<String>>,
22
projection: Option<Vec<usize>>,
23
parallel: ParallelStrategy,
24
schema: Option<ArrowSchemaRef>,
25
row_index: Option<RowIndex>,
26
low_memory: bool,
27
metadata: Option<FileMetadataRef>,
28
hive_partition_columns: Option<Vec<Series>>,
29
include_file_path: Option<(PlSmallStr, Arc<str>)>,
30
}
31
32
impl<R: MmapBytesReader> ParquetReader<R> {
33
/// Try to reduce memory pressure at the expense of performance. If setting this does not reduce memory
34
/// enough, turn off parallelization.
35
pub fn set_low_memory(mut self, low_memory: bool) -> Self {
36
self.low_memory = low_memory;
37
self
38
}
39
40
/// Read the parquet file in parallel (default). The single threaded reader consumes less memory.
41
pub fn read_parallel(mut self, parallel: ParallelStrategy) -> Self {
42
self.parallel = parallel;
43
self
44
}
45
46
pub fn with_slice(mut self, slice: Option<(usize, usize)>) -> Self {
47
self.slice = slice.unwrap_or((0, usize::MAX));
48
self
49
}
50
51
/// Columns to select/ project
52
pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
53
self.columns = columns;
54
self
55
}
56
57
/// Set the reader's column projection. This counts from 0, meaning that
58
/// `vec![0, 4]` would select the 1st and 5th column.
59
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
60
self.projection = projection;
61
self
62
}
63
64
/// Add a row index column.
65
pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
66
self.row_index = row_index;
67
self
68
}
69
70
/// Checks that the file contains all the columns in `projected_arrow_schema` with the same
71
/// dtype, and sets the projection indices.
72
pub fn with_arrow_schema_projection(
73
mut self,
74
first_schema: &Arc<ArrowSchema>,
75
projected_arrow_schema: Option<&ArrowSchema>,
76
allow_missing_columns: bool,
77
) -> PolarsResult<Self> {
78
let slf_schema = self.schema()?;
79
let slf_schema_width = slf_schema.len();
80
81
if allow_missing_columns {
82
// Must check the dtypes
83
ensure_matching_dtypes_if_found(
84
projected_arrow_schema.unwrap_or(first_schema.as_ref()),
85
self.schema()?.as_ref(),
86
)?;
87
self.schema = Some(Arc::new(
88
first_schema
89
.iter()
90
.map(|(name, field)| {
91
(name.clone(), slf_schema.get(name).unwrap_or(field).clone())
92
})
93
.collect(),
94
));
95
}
96
97
let schema = self.schema()?;
98
99
(|| {
100
if let Some(projected_arrow_schema) = projected_arrow_schema {
101
self.projection = projected_arrow_schema_to_projection_indices(
102
schema.as_ref(),
103
projected_arrow_schema,
104
)?;
105
} else {
106
if slf_schema_width > first_schema.len() {
107
polars_bail!(
108
SchemaMismatch:
109
"parquet file contained extra columns and no selection was given"
110
)
111
}
112
113
self.projection =
114
projected_arrow_schema_to_projection_indices(schema.as_ref(), first_schema)?;
115
};
116
Ok(())
117
})()
118
.map_err(|e| {
119
if !allow_missing_columns && matches!(e, PolarsError::ColumnNotFound(_)) {
120
e.wrap_msg(|s| {
121
format!(
122
"error with column selection, \
123
consider passing `missing_columns='insert'`: {s}"
124
)
125
})
126
} else {
127
e
128
}
129
})?;
130
131
Ok(self)
132
}
133
134
/// [`Schema`] of the file.
135
pub fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {
136
self.schema = Some(match &self.schema {
137
Some(schema) => schema.clone(),
138
None => {
139
let metadata = self.get_metadata()?;
140
Arc::new(read::infer_schema(metadata)?)
141
},
142
});
143
144
Ok(self.schema.clone().unwrap())
145
}
146
147
/// Number of rows in the parquet file.
148
pub fn num_rows(&mut self) -> PolarsResult<usize> {
149
let metadata = self.get_metadata()?;
150
Ok(metadata.num_rows)
151
}
152
153
pub fn with_hive_partition_columns(mut self, columns: Option<Vec<Series>>) -> Self {
154
self.hive_partition_columns = columns;
155
self
156
}
157
158
pub fn with_include_file_path(
159
mut self,
160
include_file_path: Option<(PlSmallStr, Arc<str>)>,
161
) -> Self {
162
self.include_file_path = include_file_path;
163
self
164
}
165
166
pub fn set_metadata(&mut self, metadata: FileMetadataRef) {
167
self.metadata = Some(metadata);
168
}
169
170
pub fn get_metadata(&mut self) -> PolarsResult<&FileMetadataRef> {
171
if self.metadata.is_none() {
172
self.metadata = Some(Arc::new(read::read_metadata(&mut self.reader)?));
173
}
174
Ok(self.metadata.as_ref().unwrap())
175
}
176
}
177
178
impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
179
/// Create a new [`ParquetReader`] from an existing `Reader`.
180
fn new(reader: R) -> Self {
181
ParquetReader {
182
reader,
183
rechunk: false,
184
slice: (0, usize::MAX),
185
columns: None,
186
projection: None,
187
parallel: Default::default(),
188
row_index: None,
189
low_memory: false,
190
metadata: None,
191
schema: None,
192
hive_partition_columns: None,
193
include_file_path: None,
194
}
195
}
196
197
fn set_rechunk(mut self, rechunk: bool) -> Self {
198
self.rechunk = rechunk;
199
self
200
}
201
202
fn finish(mut self) -> PolarsResult<DataFrame> {
203
let schema = self.schema()?;
204
let metadata = self.get_metadata()?.clone();
205
let n_rows = metadata.num_rows.min(self.slice.0 + self.slice.1);
206
207
if let Some(cols) = &self.columns {
208
self.projection = Some(columns_to_projection(cols, schema.as_ref())?);
209
}
210
211
let mut df = read_parquet(
212
self.reader,
213
self.slice,
214
self.projection.as_deref(),
215
&schema,
216
Some(metadata),
217
self.parallel,
218
self.row_index,
219
self.hive_partition_columns.as_deref(),
220
)?;
221
222
if self.rechunk {
223
df.as_single_chunk_par();
224
};
225
226
if let Some((col, value)) = &self.include_file_path {
227
unsafe {
228
df.with_column_unchecked(Column::new_scalar(
229
col.clone(),
230
Scalar::new(
231
DataType::String,
232
AnyValue::StringOwned(value.as_ref().into()),
233
),
234
if df.width() > 0 { df.height() } else { n_rows },
235
))
236
};
237
}
238
239
Ok(df)
240
}
241
}
242
243