Path: blob/main/crates/polars-stream/src/nodes/io_sources/lines.rs
8448 views
use std::num::NonZeroUsize;1use std::sync::Arc;23use polars_core::config;4use polars_io::cloud::CloudOptions;5use polars_io::metrics::IOMetrics;6use polars_io::utils::byte_source::DynByteSourceBuilder;7use polars_plan::dsl::ScanSource;8use polars_utils::relaxed_cell::RelaxedCell;910use crate::async_primitives::wait_group::WaitGroup;11use crate::nodes::io_sources::multi_scan::reader_interface::FileReader;12use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;13use crate::nodes::io_sources::multi_scan::reader_interface::capabilities::ReaderCapabilities;14use crate::nodes::io_sources::ndjson::NDJsonFileReader;15use crate::nodes::io_sources::ndjson::builder::ndjson_reader_capabilities;16use crate::nodes::io_sources::ndjson::chunk_reader::ChunkReaderBuilder;1718pub struct LineReaderBuilder {19pub prefetch_limit: RelaxedCell<usize>,20pub prefetch_semaphore: std::sync::OnceLock<Arc<tokio::sync::Semaphore>>,21pub shared_prefetch_wait_group_slot: Arc<std::sync::Mutex<Option<WaitGroup>>>,22pub io_metrics: std::sync::OnceLock<Arc<IOMetrics>>,23}2425impl std::fmt::Debug for LineReaderBuilder {26fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {27f.debug_struct("LineReaderBuilder")28.field("prefetch_limit", &self.prefetch_limit)29.field("prefetch_semaphore", &self.prefetch_semaphore)30.finish()31}32}3334impl FileReaderBuilder for LineReaderBuilder {35fn reader_name(&self) -> &str {36"line"37}3839fn reader_capabilities(&self) -> ReaderCapabilities {40ndjson_reader_capabilities()41}4243fn set_execution_state(&self, execution_state: &crate::execute::StreamingExecutionState) {44// The maximum number of chunks actively being prefetched at any point in time.45let prefetch_limit = std::env::var("POLARS_LINES_CHUNK_PREFETCH_LIMIT")46.map(|x| {47x.parse::<NonZeroUsize>()48.ok()49.unwrap_or_else(|| {50panic!("invalid value for POLARS_LINES_CHUNK_PREFETCH_LIMIT: {x}")51})52.get()53})54.unwrap_or(execution_state.num_pipelines.saturating_mul(2))55.max(1);5657self.prefetch_limit.store(prefetch_limit);5859if config::verbose() {60eprintln!(61"[LineReaderBuilder]: prefetch_limit: {}",62self.prefetch_limit.load()63);64}6566self.prefetch_semaphore67.set(Arc::new(tokio::sync::Semaphore::new(prefetch_limit)))68.unwrap()69}7071fn set_io_metrics(&self, io_metrics: Arc<IOMetrics>) {72self.io_metrics.set(io_metrics).ok().unwrap()73}7475fn build_file_reader(76&self,77source: ScanSource,78cloud_options: Option<Arc<CloudOptions>>,79_scan_source_idx: usize,80) -> Box<dyn FileReader> {81use crate::metrics::OptIOMetrics;82use crate::nodes::io_sources::ndjson::ChunkPrefetchSync;8384let scan_source = source;85let chunk_reader_builder = ChunkReaderBuilder::Lines;86let verbose = config::verbose();8788let byte_source_builder =89if scan_source.is_cloud_url() || polars_config::config().force_async() {90DynByteSourceBuilder::ObjectStore91} else {92DynByteSourceBuilder::Mmap93};9495// Leverage the existing NDJson code path and line counting functionality.96let reader = NDJsonFileReader {97scan_source,98cloud_options,99chunk_reader_builder,100count_rows_fn: polars_io::scan_lines::count_lines,101verbose,102byte_source_builder,103chunk_prefetch_sync: ChunkPrefetchSync {104prefetch_limit: self.prefetch_limit.load(),105prefetch_semaphore: Arc::clone(self.prefetch_semaphore.get().unwrap()),106shared_prefetch_wait_group_slot: Arc::clone(&self.shared_prefetch_wait_group_slot),107prev_all_spawned: None,108current_all_spawned: None,109},110init_data: None,111io_metrics: OptIOMetrics(self.io_metrics.get().cloned()),112};113114Box::new(reader) as _115}116}117118119