Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-lazy/src/scan/parquet.rs
8506 views
1
use polars_buffer::Buffer;
2
use polars_core::prelude::*;
3
use polars_io::cloud::CloudOptions;
4
use polars_io::parquet::read::ParallelStrategy;
5
use polars_io::prelude::ParquetOptions;
6
use polars_io::{HiveOptions, RowIndex};
7
use polars_utils::pl_path::PlRefPath;
8
use polars_utils::slice_enum::Slice;
9
10
use crate::prelude::*;
11
12
#[derive(Clone)]
13
pub struct ScanArgsParquet {
14
pub n_rows: Option<usize>,
15
pub parallel: ParallelStrategy,
16
pub row_index: Option<RowIndex>,
17
pub cloud_options: Option<CloudOptions>,
18
pub hive_options: HiveOptions,
19
pub use_statistics: bool,
20
pub schema: Option<SchemaRef>,
21
pub low_memory: bool,
22
pub rechunk: bool,
23
pub cache: bool,
24
/// Expand path given via globbing rules.
25
pub glob: bool,
26
pub include_file_paths: Option<PlSmallStr>,
27
pub allow_missing_columns: bool,
28
}
29
30
impl Default for ScanArgsParquet {
31
fn default() -> Self {
32
Self {
33
n_rows: None,
34
parallel: Default::default(),
35
row_index: None,
36
cloud_options: None,
37
hive_options: Default::default(),
38
use_statistics: true,
39
schema: None,
40
rechunk: false,
41
low_memory: false,
42
cache: true,
43
glob: true,
44
include_file_paths: None,
45
allow_missing_columns: false,
46
}
47
}
48
}
49
50
#[derive(Clone)]
51
struct LazyParquetReader {
52
args: ScanArgsParquet,
53
sources: ScanSources,
54
}
55
56
impl LazyParquetReader {
57
fn new(args: ScanArgsParquet) -> Self {
58
Self {
59
args,
60
sources: ScanSources::default(),
61
}
62
}
63
}
64
65
impl LazyFileListReader for LazyParquetReader {
66
/// Get the final [LazyFrame].
67
fn finish(self) -> PolarsResult<LazyFrame> {
68
let parquet_options = ParquetOptions {
69
schema: self.args.schema,
70
parallel: self.args.parallel,
71
low_memory: self.args.low_memory,
72
use_statistics: self.args.use_statistics,
73
};
74
75
let unified_scan_args = UnifiedScanArgs {
76
schema: None,
77
cloud_options: self.args.cloud_options,
78
hive_options: self.args.hive_options,
79
rechunk: self.args.rechunk,
80
cache: self.args.cache,
81
glob: self.args.glob,
82
hidden_file_prefix: None,
83
projection: None,
84
column_mapping: None,
85
default_values: None,
86
// Note: We call `with_row_index()` on the LazyFrame below
87
row_index: None,
88
pre_slice: self
89
.args
90
.n_rows
91
.map(|len| Slice::Positive { offset: 0, len }),
92
cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
93
missing_columns_policy: if self.args.allow_missing_columns {
94
MissingColumnsPolicy::Insert
95
} else {
96
MissingColumnsPolicy::Raise
97
},
98
extra_columns_policy: ExtraColumnsPolicy::Raise,
99
include_file_paths: self.args.include_file_paths,
100
deletion_files: None,
101
table_statistics: None,
102
row_count: None,
103
};
104
105
let mut lf: LazyFrame =
106
DslBuilder::scan_parquet(self.sources, parquet_options, unified_scan_args)?
107
.build()
108
.into();
109
110
// It's a bit hacky, but this row_index function updates the schema.
111
if let Some(row_index) = self.args.row_index {
112
lf = lf.with_row_index(row_index.name, Some(row_index.offset))
113
}
114
115
Ok(lf)
116
}
117
118
fn glob(&self) -> bool {
119
self.args.glob
120
}
121
122
fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
123
unreachable!();
124
}
125
126
fn sources(&self) -> &ScanSources {
127
&self.sources
128
}
129
130
fn with_sources(mut self, sources: ScanSources) -> Self {
131
self.sources = sources;
132
self
133
}
134
135
fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
136
self.args.n_rows = n_rows.into();
137
self
138
}
139
140
fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
141
self.args.row_index = row_index.into();
142
self
143
}
144
145
fn rechunk(&self) -> bool {
146
self.args.rechunk
147
}
148
149
fn with_rechunk(mut self, toggle: bool) -> Self {
150
self.args.rechunk = toggle;
151
self
152
}
153
154
fn cloud_options(&self) -> Option<&CloudOptions> {
155
self.args.cloud_options.as_ref()
156
}
157
158
fn n_rows(&self) -> Option<usize> {
159
self.args.n_rows
160
}
161
162
fn row_index(&self) -> Option<&RowIndex> {
163
self.args.row_index.as_ref()
164
}
165
}
166
167
impl LazyFrame {
168
/// Create a LazyFrame directly from a parquet scan.
169
pub fn scan_parquet(path: PlRefPath, args: ScanArgsParquet) -> PolarsResult<Self> {
170
Self::scan_parquet_sources(ScanSources::Paths(Buffer::from_iter([path])), args)
171
}
172
173
/// Create a LazyFrame directly from a parquet scan.
174
pub fn scan_parquet_sources(sources: ScanSources, args: ScanArgsParquet) -> PolarsResult<Self> {
175
LazyParquetReader::new(args).with_sources(sources).finish()
176
}
177
178
/// Create a LazyFrame directly from a parquet scan.
179
pub fn scan_parquet_files(
180
paths: Buffer<PlRefPath>,
181
args: ScanArgsParquet,
182
) -> PolarsResult<Self> {
183
Self::scan_parquet_sources(ScanSources::Paths(paths), args)
184
}
185
}
186
187