Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-lazy/src/scan/ipc.rs
6939 views
1
use polars_core::prelude::*;
2
use polars_io::cloud::CloudOptions;
3
use polars_io::ipc::IpcScanOptions;
4
use polars_io::{HiveOptions, RowIndex};
5
use polars_utils::plpath::PlPath;
6
use polars_utils::slice_enum::Slice;
7
8
use crate::prelude::*;
9
10
#[derive(Clone)]
11
pub struct ScanArgsIpc {
12
pub n_rows: Option<usize>,
13
pub cache: bool,
14
pub rechunk: bool,
15
pub row_index: Option<RowIndex>,
16
pub cloud_options: Option<CloudOptions>,
17
pub hive_options: HiveOptions,
18
pub include_file_paths: Option<PlSmallStr>,
19
}
20
21
impl Default for ScanArgsIpc {
22
fn default() -> Self {
23
Self {
24
n_rows: None,
25
cache: true,
26
rechunk: false,
27
row_index: None,
28
cloud_options: Default::default(),
29
hive_options: Default::default(),
30
include_file_paths: None,
31
}
32
}
33
}
34
35
#[derive(Clone)]
36
struct LazyIpcReader {
37
args: ScanArgsIpc,
38
sources: ScanSources,
39
}
40
41
impl LazyIpcReader {
42
fn new(args: ScanArgsIpc) -> Self {
43
Self {
44
args,
45
sources: ScanSources::default(),
46
}
47
}
48
}
49
50
impl LazyFileListReader for LazyIpcReader {
51
fn finish(self) -> PolarsResult<LazyFrame> {
52
let args = self.args;
53
54
let options = IpcScanOptions {};
55
let pre_slice = args.n_rows.map(|len| Slice::Positive { offset: 0, len });
56
57
let cloud_options = args.cloud_options;
58
let hive_options = args.hive_options;
59
let rechunk = args.rechunk;
60
let cache = args.cache;
61
let row_index = args.row_index;
62
let include_file_paths = args.include_file_paths;
63
64
let lf: LazyFrame = DslBuilder::scan_ipc(
65
self.sources,
66
options,
67
UnifiedScanArgs {
68
schema: None,
69
cloud_options,
70
hive_options,
71
rechunk,
72
cache,
73
glob: true,
74
projection: None,
75
column_mapping: None,
76
default_values: None,
77
row_index,
78
pre_slice,
79
cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
80
missing_columns_policy: MissingColumnsPolicy::Raise,
81
extra_columns_policy: ExtraColumnsPolicy::Raise,
82
include_file_paths,
83
deletion_files: None,
84
},
85
)?
86
.build()
87
.into();
88
89
Ok(lf)
90
}
91
92
fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
93
unreachable!()
94
}
95
96
fn sources(&self) -> &ScanSources {
97
&self.sources
98
}
99
100
fn with_sources(mut self, sources: ScanSources) -> Self {
101
self.sources = sources;
102
self
103
}
104
105
fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
106
self.args.n_rows = n_rows.into();
107
self
108
}
109
110
fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
111
self.args.row_index = row_index.into();
112
self
113
}
114
115
fn rechunk(&self) -> bool {
116
self.args.rechunk
117
}
118
119
fn with_rechunk(mut self, toggle: bool) -> Self {
120
self.args.rechunk = toggle;
121
self
122
}
123
124
fn n_rows(&self) -> Option<usize> {
125
self.args.n_rows
126
}
127
128
fn row_index(&self) -> Option<&RowIndex> {
129
self.args.row_index.as_ref()
130
}
131
132
/// [CloudOptions] used to list files.
133
fn cloud_options(&self) -> Option<&CloudOptions> {
134
self.args.cloud_options.as_ref()
135
}
136
}
137
138
impl LazyFrame {
139
/// Create a LazyFrame directly from a ipc scan.
140
pub fn scan_ipc(path: PlPath, args: ScanArgsIpc) -> PolarsResult<Self> {
141
Self::scan_ipc_sources(ScanSources::Paths([path].into()), args)
142
}
143
144
pub fn scan_ipc_files(paths: Arc<[PlPath]>, args: ScanArgsIpc) -> PolarsResult<Self> {
145
Self::scan_ipc_sources(ScanSources::Paths(paths), args)
146
}
147
148
pub fn scan_ipc_sources(sources: ScanSources, args: ScanArgsIpc) -> PolarsResult<Self> {
149
LazyIpcReader::new(args).with_sources(sources).finish()
150
}
151
}
152
153