Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-lazy/src/scan/file_list_reader.rs
6939 views
1
use std::sync::Arc;
2
3
use polars_core::prelude::*;
4
use polars_io::RowIndex;
5
use polars_io::cloud::CloudOptions;
6
use polars_plan::prelude::UnionArgs;
7
use polars_utils::plpath::PlPath;
8
9
use crate::prelude::*;
10
11
/// Reads [LazyFrame] from a filesystem or a cloud storage.
12
/// Supports glob patterns.
13
///
14
/// Use [LazyFileListReader::finish] to get the final [LazyFrame].
15
pub trait LazyFileListReader: Clone {
16
/// Get the final [LazyFrame].
17
fn finish(self) -> PolarsResult<LazyFrame> {
18
if !self.glob() {
19
return self.finish_no_glob();
20
}
21
22
let ScanSources::Paths(paths) = self.sources() else {
23
unreachable!("opened-files or in-memory buffers should never be globbed");
24
};
25
26
let lfs = paths
27
.iter()
28
.map(|path| {
29
self.clone()
30
// Each individual reader should not apply a row limit.
31
.with_n_rows(None)
32
// Each individual reader should not apply a row index.
33
.with_row_index(None)
34
.with_paths([path.clone()].into())
35
.with_rechunk(false)
36
.finish_no_glob()
37
.map_err(|e| {
38
polars_err!(
39
ComputeError: "error while reading {}: {}", path.display(), e
40
)
41
})
42
})
43
.collect::<PolarsResult<Vec<_>>>()?;
44
45
polars_ensure!(
46
!lfs.is_empty(),
47
ComputeError: "no matching files found in {:?}", paths.iter().map(|x| x.to_str()).collect::<Vec<_>>()
48
);
49
50
let mut lf = self.concat_impl(lfs)?;
51
if let Some(n_rows) = self.n_rows() {
52
lf = lf.slice(0, n_rows as IdxSize)
53
};
54
if let Some(rc) = self.row_index() {
55
lf = lf.with_row_index(rc.name.clone(), Some(rc.offset))
56
};
57
58
Ok(lf)
59
}
60
61
/// Recommended concatenation of [LazyFrame]s from many input files.
62
///
63
/// This method should not take into consideration [LazyFileListReader::n_rows]
64
/// nor [LazyFileListReader::row_index].
65
fn concat_impl(&self, lfs: Vec<LazyFrame>) -> PolarsResult<LazyFrame> {
66
let args = UnionArgs {
67
rechunk: self.rechunk(),
68
parallel: true,
69
to_supertypes: false,
70
from_partitioned_ds: true,
71
..Default::default()
72
};
73
concat_impl(&lfs, args)
74
}
75
76
/// Get the final [LazyFrame].
77
/// This method assumes, that path is *not* a glob.
78
///
79
/// It is recommended to always use [LazyFileListReader::finish] method.
80
fn finish_no_glob(self) -> PolarsResult<LazyFrame>;
81
82
fn glob(&self) -> bool {
83
true
84
}
85
86
/// Get the sources for this reader.
87
fn sources(&self) -> &ScanSources;
88
89
/// Set sources of the scanned files.
90
#[must_use]
91
fn with_sources(self, source: ScanSources) -> Self;
92
93
/// Set paths of the scanned files.
94
#[must_use]
95
fn with_paths(self, paths: Arc<[PlPath]>) -> Self {
96
self.with_sources(ScanSources::Paths(paths))
97
}
98
99
/// Configure the row limit.
100
fn with_n_rows(self, n_rows: impl Into<Option<usize>>) -> Self;
101
102
/// Configure the row index.
103
fn with_row_index(self, row_index: impl Into<Option<RowIndex>>) -> Self;
104
105
/// Rechunk the memory to contiguous chunks when parsing is done.
106
fn rechunk(&self) -> bool;
107
108
/// Rechunk the memory to contiguous chunks when parsing is done.
109
#[must_use]
110
fn with_rechunk(self, toggle: bool) -> Self;
111
112
/// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
113
/// be guaranteed.
114
fn n_rows(&self) -> Option<usize>;
115
116
/// Add a row index column.
117
fn row_index(&self) -> Option<&RowIndex>;
118
119
/// [CloudOptions] used to list files.
120
fn cloud_options(&self) -> Option<&CloudOptions> {
121
None
122
}
123
}
124
125