Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-lazy/src/scan/parquet.rs
6939 views
1
use polars_core::prelude::*;
2
use polars_io::cloud::CloudOptions;
3
use polars_io::parquet::read::ParallelStrategy;
4
use polars_io::prelude::ParquetOptions;
5
use polars_io::{HiveOptions, RowIndex};
6
use polars_utils::plpath::PlPath;
7
use polars_utils::slice_enum::Slice;
8
9
use crate::prelude::*;
10
11
#[derive(Clone)]
12
pub struct ScanArgsParquet {
13
pub n_rows: Option<usize>,
14
pub parallel: ParallelStrategy,
15
pub row_index: Option<RowIndex>,
16
pub cloud_options: Option<CloudOptions>,
17
pub hive_options: HiveOptions,
18
pub use_statistics: bool,
19
pub schema: Option<SchemaRef>,
20
pub low_memory: bool,
21
pub rechunk: bool,
22
pub cache: bool,
23
/// Expand path given via globbing rules.
24
pub glob: bool,
25
pub include_file_paths: Option<PlSmallStr>,
26
pub allow_missing_columns: bool,
27
}
28
29
impl Default for ScanArgsParquet {
30
fn default() -> Self {
31
Self {
32
n_rows: None,
33
parallel: Default::default(),
34
row_index: None,
35
cloud_options: None,
36
hive_options: Default::default(),
37
use_statistics: true,
38
schema: None,
39
rechunk: false,
40
low_memory: false,
41
cache: true,
42
glob: true,
43
include_file_paths: None,
44
allow_missing_columns: false,
45
}
46
}
47
}
48
49
#[derive(Clone)]
50
struct LazyParquetReader {
51
args: ScanArgsParquet,
52
sources: ScanSources,
53
}
54
55
impl LazyParquetReader {
56
fn new(args: ScanArgsParquet) -> Self {
57
Self {
58
args,
59
sources: ScanSources::default(),
60
}
61
}
62
}
63
64
impl LazyFileListReader for LazyParquetReader {
65
/// Get the final [LazyFrame].
66
fn finish(self) -> PolarsResult<LazyFrame> {
67
let parquet_options = ParquetOptions {
68
schema: self.args.schema,
69
parallel: self.args.parallel,
70
low_memory: self.args.low_memory,
71
use_statistics: self.args.use_statistics,
72
};
73
74
let unified_scan_args = UnifiedScanArgs {
75
schema: None,
76
cloud_options: self.args.cloud_options,
77
hive_options: self.args.hive_options,
78
rechunk: self.args.rechunk,
79
cache: self.args.cache,
80
glob: self.args.glob,
81
projection: None,
82
column_mapping: None,
83
default_values: None,
84
// Note: We call `with_row_index()` on the LazyFrame below
85
row_index: None,
86
pre_slice: self
87
.args
88
.n_rows
89
.map(|len| Slice::Positive { offset: 0, len }),
90
cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
91
missing_columns_policy: if self.args.allow_missing_columns {
92
MissingColumnsPolicy::Insert
93
} else {
94
MissingColumnsPolicy::Raise
95
},
96
extra_columns_policy: ExtraColumnsPolicy::Raise,
97
include_file_paths: self.args.include_file_paths,
98
deletion_files: None,
99
};
100
101
let mut lf: LazyFrame =
102
DslBuilder::scan_parquet(self.sources, parquet_options, unified_scan_args)?
103
.build()
104
.into();
105
106
// It's a bit hacky, but this row_index function updates the schema.
107
if let Some(row_index) = self.args.row_index {
108
lf = lf.with_row_index(row_index.name, Some(row_index.offset))
109
}
110
111
Ok(lf)
112
}
113
114
fn glob(&self) -> bool {
115
self.args.glob
116
}
117
118
fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
119
unreachable!();
120
}
121
122
fn sources(&self) -> &ScanSources {
123
&self.sources
124
}
125
126
fn with_sources(mut self, sources: ScanSources) -> Self {
127
self.sources = sources;
128
self
129
}
130
131
fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
132
self.args.n_rows = n_rows.into();
133
self
134
}
135
136
fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
137
self.args.row_index = row_index.into();
138
self
139
}
140
141
fn rechunk(&self) -> bool {
142
self.args.rechunk
143
}
144
145
fn with_rechunk(mut self, toggle: bool) -> Self {
146
self.args.rechunk = toggle;
147
self
148
}
149
150
fn cloud_options(&self) -> Option<&CloudOptions> {
151
self.args.cloud_options.as_ref()
152
}
153
154
fn n_rows(&self) -> Option<usize> {
155
self.args.n_rows
156
}
157
158
fn row_index(&self) -> Option<&RowIndex> {
159
self.args.row_index.as_ref()
160
}
161
}
162
163
impl LazyFrame {
164
/// Create a LazyFrame directly from a parquet scan.
165
pub fn scan_parquet(path: PlPath, args: ScanArgsParquet) -> PolarsResult<Self> {
166
Self::scan_parquet_sources(ScanSources::Paths([path].into()), args)
167
}
168
169
/// Create a LazyFrame directly from a parquet scan.
170
pub fn scan_parquet_sources(sources: ScanSources, args: ScanArgsParquet) -> PolarsResult<Self> {
171
LazyParquetReader::new(args).with_sources(sources).finish()
172
}
173
174
/// Create a LazyFrame directly from a parquet scan.
175
pub fn scan_parquet_files(paths: Arc<[PlPath]>, args: ScanArgsParquet) -> PolarsResult<Self> {
176
Self::scan_parquet_sources(ScanSources::Paths(paths), args)
177
}
178
}
179
180