Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sources/ndjson/builder.rs
8446 views
1
use std::num::NonZeroUsize;
2
use std::sync::Arc;
3
4
use polars_core::config;
5
use polars_io::cloud::CloudOptions;
6
#[cfg(feature = "json")]
7
use polars_io::metrics::IOMetrics;
8
use polars_plan::dsl::{NDJsonReadOptions, ScanSource};
9
use polars_utils::relaxed_cell::RelaxedCell;
10
11
use super::{DynByteSourceBuilder, FileReader, NDJsonFileReader};
12
use crate::async_primitives::wait_group::WaitGroup;
13
use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;
14
use crate::nodes::io_sources::multi_scan::reader_interface::capabilities::ReaderCapabilities;
15
use crate::nodes::io_sources::ndjson::chunk_reader::ChunkReaderBuilder;
16
17
pub struct NDJsonReaderBuilder {
18
pub options: Arc<NDJsonReadOptions>,
19
pub prefetch_limit: RelaxedCell<usize>,
20
pub prefetch_semaphore: std::sync::OnceLock<Arc<tokio::sync::Semaphore>>,
21
pub shared_prefetch_wait_group_slot: Arc<std::sync::Mutex<Option<WaitGroup>>>,
22
pub io_metrics: std::sync::OnceLock<Arc<IOMetrics>>,
23
}
24
25
impl std::fmt::Debug for NDJsonReaderBuilder {
26
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27
f.debug_struct("NDJsonReaderBuilder")
28
.field("ignore_errors", &self.options.ignore_errors)
29
.field("prefetch_limit", &self.prefetch_limit)
30
.field("prefetch_semaphore", &self.prefetch_semaphore)
31
.finish()
32
}
33
}
34
35
pub fn ndjson_reader_capabilities() -> ReaderCapabilities {
36
use ReaderCapabilities as RC;
37
38
RC::ROW_INDEX | RC::PRE_SLICE | RC::NEGATIVE_PRE_SLICE
39
}
40
41
#[cfg(feature = "json")]
42
impl FileReaderBuilder for NDJsonReaderBuilder {
43
fn reader_name(&self) -> &str {
44
"ndjson"
45
}
46
47
fn reader_capabilities(&self) -> ReaderCapabilities {
48
ndjson_reader_capabilities()
49
}
50
51
fn set_execution_state(&self, execution_state: &crate::execute::StreamingExecutionState) {
52
// The maximum number of chunks actively being prefetched at any given point in time.
53
let prefetch_limit = std::env::var("POLARS_NDJSON_CHUNK_PREFETCH_LIMIT")
54
.map(|x| {
55
x.parse::<NonZeroUsize>()
56
.ok()
57
.unwrap_or_else(|| {
58
panic!("invalid value for POLARS_NDJSON_CHUNK_PREFETCH_LIMIT: {x}")
59
})
60
.get()
61
})
62
.unwrap_or(execution_state.num_pipelines.saturating_mul(2))
63
.max(1);
64
65
self.prefetch_limit.store(prefetch_limit);
66
67
if config::verbose() {
68
eprintln!(
69
"[NDJsonReaderBuilder]: prefetch_limit: {}",
70
self.prefetch_limit.load()
71
);
72
}
73
74
self.prefetch_semaphore
75
.set(Arc::new(tokio::sync::Semaphore::new(prefetch_limit)))
76
.unwrap()
77
}
78
79
fn set_io_metrics(&self, io_metrics: Arc<IOMetrics>) {
80
self.io_metrics.set(io_metrics).ok().unwrap()
81
}
82
83
fn build_file_reader(
84
&self,
85
source: ScanSource,
86
cloud_options: Option<Arc<CloudOptions>>,
87
_scan_source_idx: usize,
88
) -> Box<dyn FileReader> {
89
use crate::metrics::OptIOMetrics;
90
use crate::nodes::io_sources::ndjson::ChunkPrefetchSync;
91
92
let scan_source = source;
93
let chunk_reader_builder = ChunkReaderBuilder::NDJson {
94
ignore_errors: self.options.ignore_errors,
95
};
96
let verbose = config::verbose();
97
98
let byte_source_builder =
99
if scan_source.is_cloud_url() || polars_config::config().force_async() {
100
DynByteSourceBuilder::ObjectStore
101
} else {
102
DynByteSourceBuilder::Mmap
103
};
104
105
let reader = NDJsonFileReader {
106
scan_source,
107
cloud_options,
108
chunk_reader_builder,
109
count_rows_fn: polars_io::ndjson::count_rows,
110
verbose,
111
byte_source_builder,
112
chunk_prefetch_sync: ChunkPrefetchSync {
113
prefetch_limit: self.prefetch_limit.load(),
114
prefetch_semaphore: Arc::clone(self.prefetch_semaphore.get().unwrap()),
115
shared_prefetch_wait_group_slot: Arc::clone(&self.shared_prefetch_wait_group_slot),
116
prev_all_spawned: None,
117
current_all_spawned: None,
118
},
119
init_data: None,
120
io_metrics: OptIOMetrics(self.io_metrics.get().cloned()),
121
};
122
123
Box::new(reader) as _
124
}
125
}
126
127