Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sources/lines.rs
8448 views
1
use std::num::NonZeroUsize;
2
use std::sync::Arc;
3
4
use polars_core::config;
5
use polars_io::cloud::CloudOptions;
6
use polars_io::metrics::IOMetrics;
7
use polars_io::utils::byte_source::DynByteSourceBuilder;
8
use polars_plan::dsl::ScanSource;
9
use polars_utils::relaxed_cell::RelaxedCell;
10
11
use crate::async_primitives::wait_group::WaitGroup;
12
use crate::nodes::io_sources::multi_scan::reader_interface::FileReader;
13
use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;
14
use crate::nodes::io_sources::multi_scan::reader_interface::capabilities::ReaderCapabilities;
15
use crate::nodes::io_sources::ndjson::NDJsonFileReader;
16
use crate::nodes::io_sources::ndjson::builder::ndjson_reader_capabilities;
17
use crate::nodes::io_sources::ndjson::chunk_reader::ChunkReaderBuilder;
18
19
pub struct LineReaderBuilder {
20
pub prefetch_limit: RelaxedCell<usize>,
21
pub prefetch_semaphore: std::sync::OnceLock<Arc<tokio::sync::Semaphore>>,
22
pub shared_prefetch_wait_group_slot: Arc<std::sync::Mutex<Option<WaitGroup>>>,
23
pub io_metrics: std::sync::OnceLock<Arc<IOMetrics>>,
24
}
25
26
impl std::fmt::Debug for LineReaderBuilder {
27
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28
f.debug_struct("LineReaderBuilder")
29
.field("prefetch_limit", &self.prefetch_limit)
30
.field("prefetch_semaphore", &self.prefetch_semaphore)
31
.finish()
32
}
33
}
34
35
impl FileReaderBuilder for LineReaderBuilder {
36
fn reader_name(&self) -> &str {
37
"line"
38
}
39
40
fn reader_capabilities(&self) -> ReaderCapabilities {
41
ndjson_reader_capabilities()
42
}
43
44
fn set_execution_state(&self, execution_state: &crate::execute::StreamingExecutionState) {
45
// The maximum number of chunks actively being prefetched at any point in time.
46
let prefetch_limit = std::env::var("POLARS_LINES_CHUNK_PREFETCH_LIMIT")
47
.map(|x| {
48
x.parse::<NonZeroUsize>()
49
.ok()
50
.unwrap_or_else(|| {
51
panic!("invalid value for POLARS_LINES_CHUNK_PREFETCH_LIMIT: {x}")
52
})
53
.get()
54
})
55
.unwrap_or(execution_state.num_pipelines.saturating_mul(2))
56
.max(1);
57
58
self.prefetch_limit.store(prefetch_limit);
59
60
if config::verbose() {
61
eprintln!(
62
"[LineReaderBuilder]: prefetch_limit: {}",
63
self.prefetch_limit.load()
64
);
65
}
66
67
self.prefetch_semaphore
68
.set(Arc::new(tokio::sync::Semaphore::new(prefetch_limit)))
69
.unwrap()
70
}
71
72
fn set_io_metrics(&self, io_metrics: Arc<IOMetrics>) {
73
self.io_metrics.set(io_metrics).ok().unwrap()
74
}
75
76
fn build_file_reader(
77
&self,
78
source: ScanSource,
79
cloud_options: Option<Arc<CloudOptions>>,
80
_scan_source_idx: usize,
81
) -> Box<dyn FileReader> {
82
use crate::metrics::OptIOMetrics;
83
use crate::nodes::io_sources::ndjson::ChunkPrefetchSync;
84
85
let scan_source = source;
86
let chunk_reader_builder = ChunkReaderBuilder::Lines;
87
let verbose = config::verbose();
88
89
let byte_source_builder =
90
if scan_source.is_cloud_url() || polars_config::config().force_async() {
91
DynByteSourceBuilder::ObjectStore
92
} else {
93
DynByteSourceBuilder::Mmap
94
};
95
96
// Leverage the existing NDJson code path and line counting functionality.
97
let reader = NDJsonFileReader {
98
scan_source,
99
cloud_options,
100
chunk_reader_builder,
101
count_rows_fn: polars_io::scan_lines::count_lines,
102
verbose,
103
byte_source_builder,
104
chunk_prefetch_sync: ChunkPrefetchSync {
105
prefetch_limit: self.prefetch_limit.load(),
106
prefetch_semaphore: Arc::clone(self.prefetch_semaphore.get().unwrap()),
107
shared_prefetch_wait_group_slot: Arc::clone(&self.shared_prefetch_wait_group_slot),
108
prev_all_spawned: None,
109
current_all_spawned: None,
110
},
111
init_data: None,
112
io_metrics: OptIOMetrics(self.io_metrics.get().cloned()),
113
};
114
115
Box::new(reader) as _
116
}
117
}
118
119