Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sources/ipc/builder.rs
8446 views
1
use std::num::NonZeroUsize;
2
use std::sync::Arc;
3
4
use arrow::io::ipc::read::FileMetadata;
5
use polars_core::config;
6
use polars_io::cloud::CloudOptions;
7
use polars_io::ipc::IpcScanOptions;
8
use polars_plan::dsl::ScanSource;
9
use polars_utils::relaxed_cell::RelaxedCell;
10
11
use super::{DynByteSourceBuilder, IpcFileReader};
12
use crate::async_primitives::wait_group::WaitGroup;
13
#[cfg(feature = "ipc")]
14
use crate::metrics::IOMetrics;
15
use crate::nodes::io_sources::multi_scan::reader_interface::FileReader;
16
use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;
17
use crate::nodes::io_sources::multi_scan::reader_interface::capabilities::ReaderCapabilities;
18
19
pub struct IpcReaderBuilder {
20
pub first_metadata: Option<Arc<FileMetadata>>,
21
pub options: Arc<IpcScanOptions>,
22
pub prefetch_limit: RelaxedCell<usize>,
23
pub prefetch_semaphore: std::sync::OnceLock<Arc<tokio::sync::Semaphore>>,
24
pub shared_prefetch_wait_group_slot: Arc<std::sync::Mutex<Option<WaitGroup>>>,
25
pub io_metrics: std::sync::OnceLock<Arc<IOMetrics>>,
26
}
27
28
impl std::fmt::Debug for IpcReaderBuilder {
29
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30
f.debug_struct("IpcBuilder")
31
.field("first_metadata", &self.first_metadata)
32
.field("options", &self.options)
33
.field("prefetch_semaphore", &self.prefetch_semaphore)
34
.finish()
35
}
36
}
37
38
#[cfg(feature = "ipc")]
39
impl FileReaderBuilder for IpcReaderBuilder {
40
fn reader_name(&self) -> &str {
41
"ipc"
42
}
43
44
fn reader_capabilities(&self) -> ReaderCapabilities {
45
use ReaderCapabilities as RC;
46
47
RC::ROW_INDEX | RC::PRE_SLICE
48
}
49
50
fn set_execution_state(&self, execution_state: &crate::execute::StreamingExecutionState) {
51
let prefetch_limit = std::env::var("POLARS_RECORD_BATCH_PREFETCH_SIZE")
52
.map(|x| {
53
x.parse::<NonZeroUsize>()
54
.unwrap_or_else(|_| {
55
panic!("invalid value for POLARS_RECORD_BATCH_PREFETCH_SIZE: {x}")
56
})
57
.get()
58
})
59
.unwrap_or(execution_state.num_pipelines.saturating_mul(2))
60
.max(1);
61
62
self.prefetch_limit.store(prefetch_limit);
63
64
if config::verbose() {
65
eprintln!(
66
"[IpcReaderBuilder]: prefetch_limit: {}",
67
self.prefetch_limit.load()
68
);
69
}
70
71
self.prefetch_semaphore
72
.set(Arc::new(tokio::sync::Semaphore::new(prefetch_limit)))
73
.unwrap()
74
}
75
76
fn set_io_metrics(&self, io_metrics: Arc<IOMetrics>) {
77
self.io_metrics.set(io_metrics).ok().unwrap()
78
}
79
80
fn build_file_reader(
81
&self,
82
source: ScanSource,
83
cloud_options: Option<Arc<CloudOptions>>,
84
scan_source_idx: usize,
85
) -> Box<dyn FileReader> {
86
use crate::metrics::OptIOMetrics;
87
use crate::nodes::io_sources::ipc::RecordBatchPrefetchSync;
88
89
let scan_source = source;
90
let config = self.options.clone();
91
let verbose = config::verbose();
92
93
let metadata = if scan_source_idx == 0 {
94
self.first_metadata.clone()
95
} else {
96
None
97
};
98
99
let byte_source_builder =
100
if scan_source.is_cloud_url() || polars_config::config().force_async() {
101
DynByteSourceBuilder::ObjectStore
102
} else {
103
DynByteSourceBuilder::Mmap
104
};
105
106
let reader = IpcFileReader {
107
scan_source,
108
cloud_options,
109
config,
110
metadata,
111
byte_source_builder,
112
record_batch_prefetch_sync: RecordBatchPrefetchSync {
113
prefetch_limit: self.prefetch_limit.load(),
114
prefetch_semaphore: Arc::clone(self.prefetch_semaphore.get().unwrap()),
115
shared_prefetch_wait_group_slot: Arc::clone(&self.shared_prefetch_wait_group_slot),
116
prev_all_spawned: None,
117
current_all_spawned: None,
118
},
119
io_metrics: OptIOMetrics(self.io_metrics.get().cloned()),
120
verbose,
121
init_data: None,
122
checked: self.options.checked,
123
};
124
125
Box::new(reader) as Box<dyn FileReader>
126
}
127
}
128
129