Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-lazy/src/scan/file_list_reader.rs
8430 views
1
use polars_buffer::Buffer;
2
use polars_core::prelude::*;
3
use polars_io::RowIndex;
4
use polars_io::cloud::CloudOptions;
5
use polars_plan::prelude::UnionArgs;
6
use polars_utils::pl_path::PlRefPath;
7
8
use crate::prelude::*;
9
10
/// Reads [LazyFrame] from a filesystem or a cloud storage.
11
/// Supports glob patterns.
12
///
13
/// Use [LazyFileListReader::finish] to get the final [LazyFrame].
14
pub trait LazyFileListReader: Clone {
15
/// Get the final [LazyFrame].
16
fn finish(self) -> PolarsResult<LazyFrame> {
17
if !self.glob() {
18
return self.finish_no_glob();
19
}
20
21
let ScanSources::Paths(paths) = self.sources() else {
22
unreachable!("opened-files or in-memory buffers should never be globbed");
23
};
24
25
let lfs = paths
26
.iter()
27
.map(|path| {
28
self.clone()
29
// Each individual reader should not apply a row limit.
30
.with_n_rows(None)
31
// Each individual reader should not apply a row index.
32
.with_row_index(None)
33
.with_paths(Buffer::from_iter([path.clone()]))
34
.with_rechunk(false)
35
.finish_no_glob()
36
.map_err(|e| {
37
polars_err!(
38
ComputeError: "error while reading {}: {}", path, e
39
)
40
})
41
})
42
.collect::<PolarsResult<Vec<_>>>()?;
43
44
polars_ensure!(
45
!lfs.is_empty(),
46
ComputeError: "no matching files found in {:?}", paths.iter().map(|x| x.as_str()).collect::<Vec<_>>()
47
);
48
49
let mut lf = self.concat_impl(lfs)?;
50
if let Some(n_rows) = self.n_rows() {
51
lf = lf.slice(0, n_rows as IdxSize)
52
};
53
if let Some(rc) = self.row_index() {
54
lf = lf.with_row_index(rc.name.clone(), Some(rc.offset))
55
};
56
57
Ok(lf)
58
}
59
60
/// Recommended concatenation of [LazyFrame]s from many input files.
61
///
62
/// This method should not take into consideration [LazyFileListReader::n_rows]
63
/// nor [LazyFileListReader::row_index].
64
fn concat_impl(&self, lfs: Vec<LazyFrame>) -> PolarsResult<LazyFrame> {
65
let args = UnionArgs {
66
rechunk: self.rechunk(),
67
parallel: true,
68
to_supertypes: false,
69
from_partitioned_ds: true,
70
..Default::default()
71
};
72
concat_impl(&lfs, args)
73
}
74
75
/// Get the final [LazyFrame].
76
/// This method assumes, that path is *not* a glob.
77
///
78
/// It is recommended to always use [LazyFileListReader::finish] method.
79
fn finish_no_glob(self) -> PolarsResult<LazyFrame>;
80
81
fn glob(&self) -> bool {
82
true
83
}
84
85
/// Get the sources for this reader.
86
fn sources(&self) -> &ScanSources;
87
88
/// Set sources of the scanned files.
89
#[must_use]
90
fn with_sources(self, source: ScanSources) -> Self;
91
92
/// Set paths of the scanned files.
93
#[must_use]
94
fn with_paths(self, paths: Buffer<PlRefPath>) -> Self {
95
self.with_sources(ScanSources::Paths(paths))
96
}
97
98
/// Configure the row limit.
99
fn with_n_rows(self, n_rows: impl Into<Option<usize>>) -> Self;
100
101
/// Configure the row index.
102
fn with_row_index(self, row_index: impl Into<Option<RowIndex>>) -> Self;
103
104
/// Rechunk the memory to contiguous chunks when parsing is done.
105
fn rechunk(&self) -> bool;
106
107
/// Rechunk the memory to contiguous chunks when parsing is done.
108
#[must_use]
109
fn with_rechunk(self, toggle: bool) -> Self;
110
111
/// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
112
/// be guaranteed.
113
fn n_rows(&self) -> Option<usize>;
114
115
/// Add a row index column.
116
fn row_index(&self) -> Option<&RowIndex>;
117
118
/// [CloudOptions] used to list files.
119
fn cloud_options(&self) -> Option<&CloudOptions> {
120
None
121
}
122
}
123
124