Path: blob/main/crates/polars-stream/src/nodes/io_sources/ndjson/builder.rs
8446 views
use std::num::NonZeroUsize;1use std::sync::Arc;23use polars_core::config;4use polars_io::cloud::CloudOptions;5#[cfg(feature = "json")]6use polars_io::metrics::IOMetrics;7use polars_plan::dsl::{NDJsonReadOptions, ScanSource};8use polars_utils::relaxed_cell::RelaxedCell;910use super::{DynByteSourceBuilder, FileReader, NDJsonFileReader};11use crate::async_primitives::wait_group::WaitGroup;12use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;13use crate::nodes::io_sources::multi_scan::reader_interface::capabilities::ReaderCapabilities;14use crate::nodes::io_sources::ndjson::chunk_reader::ChunkReaderBuilder;1516pub struct NDJsonReaderBuilder {17pub options: Arc<NDJsonReadOptions>,18pub prefetch_limit: RelaxedCell<usize>,19pub prefetch_semaphore: std::sync::OnceLock<Arc<tokio::sync::Semaphore>>,20pub shared_prefetch_wait_group_slot: Arc<std::sync::Mutex<Option<WaitGroup>>>,21pub io_metrics: std::sync::OnceLock<Arc<IOMetrics>>,22}2324impl std::fmt::Debug for NDJsonReaderBuilder {25fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {26f.debug_struct("NDJsonReaderBuilder")27.field("ignore_errors", &self.options.ignore_errors)28.field("prefetch_limit", &self.prefetch_limit)29.field("prefetch_semaphore", &self.prefetch_semaphore)30.finish()31}32}3334pub fn ndjson_reader_capabilities() -> ReaderCapabilities {35use ReaderCapabilities as RC;3637RC::ROW_INDEX | RC::PRE_SLICE | RC::NEGATIVE_PRE_SLICE38}3940#[cfg(feature = "json")]41impl FileReaderBuilder for NDJsonReaderBuilder {42fn reader_name(&self) -> &str {43"ndjson"44}4546fn reader_capabilities(&self) -> ReaderCapabilities {47ndjson_reader_capabilities()48}4950fn set_execution_state(&self, execution_state: &crate::execute::StreamingExecutionState) {51// The maximum number of chunks actively being prefetched at any given point in time.52let prefetch_limit = std::env::var("POLARS_NDJSON_CHUNK_PREFETCH_LIMIT")53.map(|x| {54x.parse::<NonZeroUsize>()55.ok()56.unwrap_or_else(|| {57panic!("invalid value for POLARS_NDJSON_CHUNK_PREFETCH_LIMIT: {x}")58})59.get()60})61.unwrap_or(execution_state.num_pipelines.saturating_mul(2))62.max(1);6364self.prefetch_limit.store(prefetch_limit);6566if config::verbose() {67eprintln!(68"[NDJsonReaderBuilder]: prefetch_limit: {}",69self.prefetch_limit.load()70);71}7273self.prefetch_semaphore74.set(Arc::new(tokio::sync::Semaphore::new(prefetch_limit)))75.unwrap()76}7778fn set_io_metrics(&self, io_metrics: Arc<IOMetrics>) {79self.io_metrics.set(io_metrics).ok().unwrap()80}8182fn build_file_reader(83&self,84source: ScanSource,85cloud_options: Option<Arc<CloudOptions>>,86_scan_source_idx: usize,87) -> Box<dyn FileReader> {88use crate::metrics::OptIOMetrics;89use crate::nodes::io_sources::ndjson::ChunkPrefetchSync;9091let scan_source = source;92let chunk_reader_builder = ChunkReaderBuilder::NDJson {93ignore_errors: self.options.ignore_errors,94};95let verbose = config::verbose();9697let byte_source_builder =98if scan_source.is_cloud_url() || polars_config::config().force_async() {99DynByteSourceBuilder::ObjectStore100} else {101DynByteSourceBuilder::Mmap102};103104let reader = NDJsonFileReader {105scan_source,106cloud_options,107chunk_reader_builder,108count_rows_fn: polars_io::ndjson::count_rows,109verbose,110byte_source_builder,111chunk_prefetch_sync: ChunkPrefetchSync {112prefetch_limit: self.prefetch_limit.load(),113prefetch_semaphore: Arc::clone(self.prefetch_semaphore.get().unwrap()),114shared_prefetch_wait_group_slot: Arc::clone(&self.shared_prefetch_wait_group_slot),115prev_all_spawned: None,116current_all_spawned: None,117},118init_data: None,119io_metrics: OptIOMetrics(self.io_metrics.get().cloned()),120};121122Box::new(reader) as _123}124}125126127