Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sources/parquet/builder.rs
8509 views
1
use std::num::NonZeroUsize;
2
use std::sync::Arc;
3
4
use polars_core::config;
5
use polars_io::cloud::CloudOptions;
6
use polars_io::prelude::{FileMetadata, ParallelStrategy, ParquetOptions};
7
use polars_io::utils::byte_source::DynByteSourceBuilder;
8
use polars_plan::dsl::ScanSource;
9
use polars_utils::relaxed_cell::RelaxedCell;
10
11
use super::{FileReader, ParquetFileReader};
12
use crate::async_primitives::wait_group::WaitGroup;
13
use crate::metrics::{IOMetrics, OptIOMetrics};
14
use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;
15
use crate::nodes::io_sources::multi_scan::reader_interface::capabilities::ReaderCapabilities;
16
17
#[derive(Clone)]
18
pub struct ParquetReaderBuilder {
19
pub first_metadata: Option<Arc<FileMetadata>>,
20
pub options: Arc<ParquetOptions>,
21
pub prefetch_limit: RelaxedCell<usize>,
22
pub prefetch_semaphore: std::sync::OnceLock<Arc<tokio::sync::Semaphore>>,
23
pub shared_prefetch_wait_group_slot: Arc<std::sync::Mutex<Option<WaitGroup>>>,
24
pub io_metrics: std::sync::OnceLock<Arc<IOMetrics>>,
25
}
26
27
impl std::fmt::Debug for ParquetReaderBuilder {
28
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29
f.debug_struct("ParquetReaderBuilder")
30
.field("first_metadata", &self.first_metadata)
31
.field("options", &self.options)
32
.field("prefetch_semaphore", &self.prefetch_semaphore)
33
.finish()
34
}
35
}
36
37
impl FileReaderBuilder for ParquetReaderBuilder {
38
fn reader_name(&self) -> &str {
39
"parquet"
40
}
41
42
fn reader_capabilities(&self) -> ReaderCapabilities {
43
use ReaderCapabilities as RC;
44
45
let mut capabilities = RC::ROW_INDEX
46
| RC::PRE_SLICE
47
| RC::NEGATIVE_PRE_SLICE
48
| RC::PARTIAL_FILTER
49
| RC::MAPPED_COLUMN_PROJECTION;
50
51
if matches!(
52
self.options.parallel,
53
ParallelStrategy::Auto | ParallelStrategy::Prefiltered
54
) {
55
capabilities |= RC::FULL_FILTER;
56
}
57
capabilities
58
}
59
60
fn set_execution_state(&self, execution_state: &crate::execute::StreamingExecutionState) {
61
let prefetch_limit = std::env::var("POLARS_ROW_GROUP_PREFETCH_SIZE")
62
.map(|x| {
63
x.parse::<NonZeroUsize>()
64
.unwrap_or_else(|_| {
65
panic!("invalid value for POLARS_ROW_GROUP_PREFETCH_SIZE: {x}")
66
})
67
.get()
68
})
69
.unwrap_or(
70
execution_state
71
.num_pipelines
72
.saturating_mul(2)
73
.clamp(8, 512),
74
)
75
.max(1);
76
77
self.prefetch_limit.store(prefetch_limit);
78
79
if config::verbose() {
80
eprintln!(
81
"[ParquetReaderBuilder]: prefetch_limit: {}",
82
self.prefetch_limit.load()
83
);
84
}
85
86
self.prefetch_semaphore
87
.set(Arc::new(tokio::sync::Semaphore::new(prefetch_limit)))
88
.unwrap()
89
}
90
91
fn set_io_metrics(&self, io_metrics: Arc<IOMetrics>) {
92
let _ = self.io_metrics.set(io_metrics);
93
}
94
95
fn build_file_reader(
96
&self,
97
source: ScanSource,
98
cloud_options: Option<Arc<CloudOptions>>,
99
scan_source_idx: usize,
100
) -> Box<dyn FileReader> {
101
use crate::nodes::io_sources::parquet::RowGroupPrefetchSync;
102
103
let scan_source = source;
104
let config = self.options.clone();
105
let verbose = config::verbose();
106
107
let byte_source_builder =
108
if scan_source.is_cloud_url() || polars_config::config().force_async() {
109
DynByteSourceBuilder::ObjectStore
110
} else {
111
DynByteSourceBuilder::Mmap
112
};
113
114
assert!(self.prefetch_limit.load() > 0);
115
116
let reader = ParquetFileReader {
117
scan_source,
118
cloud_options,
119
config,
120
metadata: if scan_source_idx == 0 {
121
self.first_metadata.clone()
122
} else {
123
None
124
},
125
byte_source_builder,
126
row_group_prefetch_sync: RowGroupPrefetchSync {
127
prefetch_limit: self.prefetch_limit.load(),
128
prefetch_semaphore: Arc::clone(self.prefetch_semaphore.get().unwrap()),
129
shared_prefetch_wait_group_slot: Arc::clone(&self.shared_prefetch_wait_group_slot),
130
prev_all_spawned: None,
131
current_all_spawned: None,
132
},
133
io_metrics: OptIOMetrics(self.io_metrics.get().cloned()),
134
verbose,
135
136
init_data: None,
137
};
138
139
Box::new(reader) as Box<dyn FileReader>
140
}
141
}
142
143