Path: blob/main/crates/polars-stream/src/nodes/io_sources/ndjson/mod.rs
8479 views
pub mod builder;12use std::cmp::Reverse;3use std::num::NonZeroUsize;4use std::ops::Range;5use std::sync::Arc;67use async_trait::async_trait;8use chunk_data_fetch::ChunkDataFetcher;9use line_batch_processor::{LineBatchProcessor, LineBatchProcessorOutputPort};10use negative_slice_pass::MorselStreamReverser;11use polars_error::{PolarsResult, polars_bail, polars_err};12use polars_io::cloud::CloudOptions;13use polars_io::metrics::OptIOMetrics;14use polars_io::pl_async;15use polars_io::utils::byte_source::{ByteSource, DynByteSource, DynByteSourceBuilder};16use polars_io::utils::compression::{ByteSourceReader, SupportedCompression};17use polars_io::utils::stream_buf_reader::{ReaderSource, StreamBufReader};18use polars_plan::dsl::ScanSource;19use polars_utils::IdxSize;20use polars_utils::mem::prefetch::get_memory_prefetch_func;21use polars_utils::priority::Priority;22use polars_utils::slice_enum::Slice;23use row_index_limit_pass::ApplyRowIndexOrLimit;2425use super::multi_scan::reader_interface::output::FileReaderOutputRecv;26use super::multi_scan::reader_interface::{BeginReadArgs, FileReader, FileReaderCallbacks};27use crate::async_executor::{AbortOnDropHandle, spawn};28use crate::async_primitives::distributor_channel::distributor_channel;29use crate::async_primitives::linearizer::Linearizer;30use crate::async_primitives::oneshot_channel;31use crate::async_primitives::wait_group::{WaitGroup, WaitToken};32use crate::morsel::SourceToken;33use crate::nodes::compute_node_prelude::*;34use crate::nodes::io_sources::multi_scan::reader_interface::Projection;35use crate::nodes::io_sources::multi_scan::reader_interface::output::FileReaderOutputSend;36use crate::nodes::io_sources::ndjson::chunk_reader::ChunkReaderBuilder;37use crate::nodes::io_sources::ndjson::line_batch_distributor::RowSkipper;38use crate::nodes::{MorselSeq, TaskPriority};39use crate::utils::tokio_handle_ext;40pub mod chunk_data_fetch;41pub(super) mod chunk_reader;42mod line_batch_distributor;43mod line_batch_processor;44mod negative_slice_pass;45mod row_index_limit_pass;4647pub struct NDJsonFileReader {48pub scan_source: ScanSource,49pub cloud_options: Option<Arc<CloudOptions>>,50pub chunk_reader_builder: ChunkReaderBuilder,51pub count_rows_fn: fn(&[u8]) -> usize,52pub verbose: bool,53pub byte_source_builder: DynByteSourceBuilder,54pub chunk_prefetch_sync: ChunkPrefetchSync,55pub init_data: Option<InitializedState>,56pub io_metrics: OptIOMetrics,57}5859pub(crate) struct ChunkPrefetchSync {60pub(crate) prefetch_limit: usize,61pub(crate) prefetch_semaphore: Arc<tokio::sync::Semaphore>,62pub(crate) shared_prefetch_wait_group_slot: Arc<std::sync::Mutex<Option<WaitGroup>>>,6364/// Waits for the previous reader to finish spawning prefetches.65pub(crate) prev_all_spawned: Option<WaitGroup>,66/// Dropped once the current reader has finished spawning prefetches.67pub(crate) current_all_spawned: Option<WaitToken>,68}6970#[derive(Clone)]71pub struct InitializedState {72file_size: usize,73compression: Option<SupportedCompression>,74byte_source: Arc<DynByteSource>,75}7677#[async_trait]78impl FileReader for NDJsonFileReader {79async fn initialize(&mut self) -> PolarsResult<()> {80if self.init_data.is_some() {81return Ok(());82}8384let scan_source = self.scan_source.clone();85let byte_source_builder = self.byte_source_builder.clone();86let cloud_options = self.cloud_options.clone();87let io_metrics = self.io_metrics.clone();8889let byte_source = pl_async::get_runtime()90.spawn(async move {91scan_source92.as_scan_source_ref()93.to_dyn_byte_source(94&byte_source_builder,95cloud_options.as_deref(),96io_metrics.0,97)98.await99})100.await101.unwrap()?;102let byte_source = Arc::new(byte_source);103104// @TODO: Refactor FileInfo so we can re-use the file_size value from the planning stage.105let file_size = {106let byte_source = byte_source.clone();107pl_async::get_runtime()108.spawn(async move { byte_source.get_size().await })109.await110.unwrap()?111};112113let compression = if file_size >= 4 {114let byte_source = byte_source.clone();115let magic_range = 0..4;116let magic_bytes = pl_async::get_runtime()117.spawn(async move { byte_source.get_range(magic_range).await })118.await119.unwrap()?;120SupportedCompression::check(&magic_bytes)121} else {122None123};124125self.init_data = Some(InitializedState {126file_size,127compression,128byte_source,129});130131Ok(())132}133134fn prepare_read(&mut self) -> PolarsResult<()> {135let wait_group_this_reader = WaitGroup::default();136let prefetch_all_spawned_token = wait_group_this_reader.token();137138let prev_wait_group: Option<WaitGroup> = self139.chunk_prefetch_sync140.shared_prefetch_wait_group_slot141.try_lock()142.unwrap()143.replace(wait_group_this_reader);144145self.chunk_prefetch_sync.prev_all_spawned = prev_wait_group;146self.chunk_prefetch_sync.current_all_spawned = Some(prefetch_all_spawned_token);147148Ok(())149}150151fn begin_read(152&mut self,153args: BeginReadArgs,154) -> PolarsResult<(FileReaderOutputRecv, JoinHandle<PolarsResult<()>>)> {155let verbose = self.verbose;156157// Initialize.158let InitializedState {159file_size,160compression,161byte_source,162} = self.init_data.clone().unwrap();163164let BeginReadArgs {165projection: Projection::Plain(projected_schema),166mut row_index,167pre_slice,168169num_pipelines,170disable_morsel_split: _,171callbacks:172FileReaderCallbacks {173file_schema_tx,174n_rows_in_file_tx,175row_position_on_end_tx,176},177178predicate: None,179cast_columns_policy: _,180} = args181else {182panic!("unsupported args: {:?}", &args)183};184185let is_empty_slice = pre_slice.as_ref().is_some_and(|x| x.len() == 0);186let is_negative_slice = matches!(pre_slice, Some(Slice::Negative { .. }));187188// There are two byte sourcing strategies `ReaderSource`: (a) async parallel prefetch using a189// streaming pipeline, or (b) memory-mapped, only to be used for uncompressed local files.190// The `compressed_reader` (of type `ByteSourceReader`) abstracts these source types.191// The `use_async_prefetch` flag controls the optional pipeline startup behavior.192let use_async_prefetch =193!(matches!(byte_source.as_ref(), &DynByteSource::Buffer(_)) && compression.is_none());194195// NDJSON: We just use the projected schema - the parser will automatically append NULL if196// the field is not found.197//198// TODO199// We currently always use the projected dtype, but this may cause200// issues e.g. with temporal types. This can be improved to better choose201// between the 2 dtypes.202let schema = projected_schema;203204if let Some(tx) = file_schema_tx {205_ = tx.send(schema.clone())206}207208// Convert (offset, len) to Range209// Note: This is converted to right-to-left for negative slice (i.e. range.start is position210// from end).211let global_slice: Option<Range<usize>> = if let Some(slice) = pre_slice.clone() {212match slice {213Slice::Positive { offset, len } => Some(offset..offset.saturating_add(len)),214Slice::Negative {215offset_from_end,216len,217} => {218// array: [_ _ _ _ _]219// slice: [ _ _ ]220// in: offset_from_end: 3, len: 2221// out: 1..3 (right-to-left)222Some(offset_from_end.saturating_sub(len)..offset_from_end)223},224}225} else {226None227};228229let (total_row_count_tx, total_row_count_rx) = if is_negative_slice && row_index.is_some() {230let (tx, rx) = oneshot_channel::channel();231(Some(tx), Some(rx))232} else {233(None, None)234};235236let needs_total_row_count = total_row_count_tx.is_some()237|| n_rows_in_file_tx.is_some()238|| (row_position_on_end_tx.is_some()239&& matches!(pre_slice, Some(Slice::Negative { .. })));240241if verbose {242eprintln!(243"[NDJsonFileReader]: \244project: {}, \245global_slice: {:?}, \246row_index: {:?}, \247is_negative_slice: {}, \248use_async_prefetch: {}",249schema.len(),250&global_slice,251&row_index,252is_negative_slice,253use_async_prefetch254);255}256257// Note: This counts from the end of file for negative slice.258let n_rows_to_skip = global_slice.as_ref().map_or(0, |x| x.start);259260let (opt_linearizer, mut linearizer_inserters) =261if global_slice.is_some() || row_index.is_some() {262let (a, b) =263Linearizer::<Priority<Reverse<MorselSeq>, DataFrame>>::new(num_pipelines, 1);264(Some(a), b)265} else {266(None, vec![])267};268269let output_to_linearizer = opt_linearizer.is_some();270let mut output_port = None;271272let opt_post_process_handle = if is_negative_slice {273// Note: This is right-to-left274let negative_slice = global_slice.unwrap();275276if verbose {277eprintln!("[NDJsonFileReader]: Initialize morsel stream reverser");278}279280let (morsel_senders, rx) = FileReaderOutputSend::new_parallel(num_pipelines);281output_port = Some(rx);282283Some(AbortOnDropHandle::new(spawn(284TaskPriority::High,285MorselStreamReverser {286morsel_receiver: opt_linearizer.unwrap(),287morsel_senders,288offset_len_rtl: (289negative_slice.start,290negative_slice.end - negative_slice.start,291),292// The correct row index offset can only be known after total row count is293// available. This is handled by the MorselStreamReverser.294row_index: row_index.take().map(|x| (x, total_row_count_rx.unwrap())),295verbose,296}297.run(),298)))299} else if global_slice.is_some() || row_index.is_some() {300let mut row_index = row_index.take();301302if verbose {303eprintln!("[NDJsonFileReader]: Initialize ApplyRowIndexOrLimit");304}305306if let Some(ri) = row_index.as_mut() {307// Update the row index offset according to the slice start.308let Some(v) = ri.offset.checked_add(n_rows_to_skip as IdxSize) else {309let offset = ri.offset;310311polars_bail!(312ComputeError:313"row_index with offset {} overflows at {} rows",314offset, n_rows_to_skip315)316};317ri.offset = v;318}319320let (morsel_tx, rx) = FileReaderOutputSend::new_serial();321output_port = Some(rx);322323let limit = global_slice.as_ref().map(|x| x.len());324325let task = ApplyRowIndexOrLimit {326morsel_receiver: opt_linearizer.unwrap(),327morsel_tx,328// Note: The line batch distributor handles skipping lines until the offset,329// we only need to handle the limit here.330limit,331row_index,332verbose,333};334335if is_empty_slice {336None337} else {338Some(AbortOnDropHandle::new(spawn(339TaskPriority::High,340task.run(),341)))342}343} else {344None345};346347let chunk_reader = self.chunk_reader_builder.build(schema);348349let (line_batch_distribute_tx, line_batch_distribute_receivers) =350distributor_channel(num_pipelines, 1);351352let mut morsel_senders = if !output_to_linearizer {353let (senders, outp) = FileReaderOutputSend::new_parallel(num_pipelines);354assert!(output_port.is_none());355output_port = Some(outp);356senders357} else {358vec![]359};360361// Initialize in reverse as we want to manually pop from either the linearizer or the phase receivers depending362// on if we have negative slice.363let line_batch_processor_handles = line_batch_distribute_receivers364.into_iter()365.enumerate()366.rev()367.map(|(worker_idx, line_batch_rx)| {368let chunk_reader = chunk_reader.clone();369let count_rows_fn = self.count_rows_fn;370// Note: We don't use this (it is handled by the bridge). But morsels require a source token.371let source_token = SourceToken::new();372373AbortOnDropHandle::new(spawn(374TaskPriority::Low,375LineBatchProcessor {376worker_idx,377378chunk_reader,379count_rows_fn,380381line_batch_rx,382output_port: if is_empty_slice {383LineBatchProcessorOutputPort::Closed384} else if output_to_linearizer {385LineBatchProcessorOutputPort::Linearize {386tx: linearizer_inserters.pop().unwrap(),387}388} else {389LineBatchProcessorOutputPort::Direct {390tx: morsel_senders.pop().unwrap(),391source_token,392}393},394needs_total_row_count,395396// Only log from the last worker to prevent flooding output.397verbose: verbose && worker_idx == num_pipelines - 1,398}399.run(),400))401})402.collect::<Vec<_>>();403404let row_skipper = RowSkipper {405cfg_n_rows_to_skip: n_rows_to_skip,406n_rows_skipped: 0,407is_line: self.chunk_reader_builder.is_line_fn(),408reverse: is_negative_slice,409};410411// Unify the two source options (uncompressed local file mmapp'ed, or streaming async with transparent412// decompression), into one unified reader object.413let byte_source_reader: ByteSourceReader<ReaderSource> = if use_async_prefetch {414// Prepare parameters for Prefetch task.415const DEFAULT_NDJSON_CHUNK_SIZE: usize = 32 * 1024 * 1024;416let memory_prefetch_func = get_memory_prefetch_func(verbose);417let chunk_size = std::env::var("POLARS_NDJSON_CHUNK_SIZE")418.map(|x| {419x.parse::<NonZeroUsize>()420.unwrap_or_else(|_| {421panic!("invalid value for POLARS_NDJSON_CHUNK_SIZE: {x}")422})423.get()424})425.unwrap_or(DEFAULT_NDJSON_CHUNK_SIZE);426427let prefetch_limit = self428.chunk_prefetch_sync429.prefetch_limit430.min(file_size.div_ceil(chunk_size))431.max(1);432433let (prefetch_send, prefetch_recv) = tokio::sync::mpsc::channel(prefetch_limit);434435// Task: Prefetch.436// Initiate parallel downloads of raw data chunks.437let byte_source = byte_source.clone();438let prefetch_task = {439let io_runtime = polars_io::pl_async::get_runtime();440441let prefetch_semaphore = Arc::clone(&self.chunk_prefetch_sync.prefetch_semaphore);442let prefetch_prev_all_spawned =443Option::take(&mut self.chunk_prefetch_sync.prev_all_spawned);444let prefetch_current_all_spawned =445Option::take(&mut self.chunk_prefetch_sync.current_all_spawned);446447tokio_handle_ext::AbortOnDropHandle(io_runtime.spawn(async move {448let mut chunk_data_fetcher = ChunkDataFetcher {449memory_prefetch_func,450byte_source,451file_size,452chunk_size,453prefetch_send,454prefetch_semaphore,455prefetch_current_all_spawned,456};457458if let Some(prefetch_prev_all_spawned) = prefetch_prev_all_spawned {459prefetch_prev_all_spawned.wait().await;460}461462chunk_data_fetcher.run().await?;463464Ok(())465}))466};467468// Wrap into ByteSourceReader to enable sync `BufRead` access.469let stream_buf_reader = StreamBufReader::new(prefetch_recv, prefetch_task);470ByteSourceReader::try_new(ReaderSource::Streaming(stream_buf_reader), compression)?471} else {472let memslice = self473.scan_source474.as_scan_source_ref()475.to_buffer_async_assume_latest(self.scan_source.run_async())?;476477ByteSourceReader::from_memory(memslice, compression)?478};479480const ASSUMED_COMPRESSION_RATIO: usize = 4;481let uncompressed_file_size_hint = Some(match compression {482Some(_) => file_size * ASSUMED_COMPRESSION_RATIO,483None => file_size,484});485486let line_batch_distributor_task_handle = AbortOnDropHandle::new(spawn(487TaskPriority::Low,488line_batch_distributor::LineBatchDistributor {489reader: byte_source_reader,490reverse: is_negative_slice,491row_skipper,492line_batch_distribute_tx,493uncompressed_file_size_hint,494}495.run(),496));497498// Task. Finishing handle.499let finishing_handle = spawn(TaskPriority::Low, async move {500// Number of rows skipped by the line batch distributor.501let n_rows_skipped: usize = line_batch_distributor_task_handle.await?;502503// Number of rows processed by the line batch processors.504let mut n_rows_processed: usize = 0;505506if verbose {507eprintln!("[NDJsonFileReader]: line batch distributor handle returned");508}509510for handle in line_batch_processor_handles {511n_rows_processed = n_rows_processed.saturating_add(handle.await?);512}513514let total_row_count =515needs_total_row_count.then_some(n_rows_skipped.saturating_add(n_rows_processed));516517if verbose {518eprintln!("[NDJsonFileReader]: line batch processor handles returned");519}520521if let Some(row_position_on_end_tx) = row_position_on_end_tx {522let n = match pre_slice {523None => n_rows_skipped.saturating_add(n_rows_processed),524525Some(Slice::Positive { offset, len }) => n_rows_skipped526.saturating_add(n_rows_processed)527.min(offset.saturating_add(len)),528529Some(Slice::Negative { .. }) => {530total_row_count.unwrap().saturating_sub(n_rows_skipped)531},532};533534let n = IdxSize::try_from(n)535.map_err(|_| polars_err!(bigidx, ctx = "ndjson file", size = n))?;536537_ = row_position_on_end_tx.send(n);538}539540if let Some(tx) = total_row_count_tx {541let total_row_count = total_row_count.unwrap();542543if verbose {544eprintln!(545"[NDJsonFileReader]: \546send total row count: {total_row_count}"547)548}549_ = tx.send(total_row_count);550}551552if let Some(n_rows_in_file_tx) = n_rows_in_file_tx {553let total_row_count = total_row_count.unwrap();554555if verbose {556eprintln!("[NDJsonFileReader]: send n_rows_in_file: {total_row_count}");557}558559let num_rows = total_row_count;560let num_rows = IdxSize::try_from(num_rows)561.map_err(|_| polars_err!(bigidx, ctx = "ndjson file", size = num_rows))?;562_ = n_rows_in_file_tx.send(num_rows);563}564565if let Some(handle) = opt_post_process_handle {566handle.await?;567}568569if verbose {570eprintln!("[NDJsonFileReader]: returning");571}572573Ok(())574});575576Ok((output_port.unwrap(), finishing_handle))577}578}579580581