Path: blob/main/crates/polars-stream/src/nodes/io_sources/ndjson/mod.rs
6939 views
pub mod builder;12use std::cmp::Reverse;3use std::ops::Range;4use std::sync::Arc;56use async_trait::async_trait;7use chunk_reader::ChunkReader;8use line_batch_processor::{LineBatchProcessor, LineBatchProcessorOutputPort};9use negative_slice_pass::MorselStreamReverser;10use polars_core::schema::SchemaRef;11use polars_error::{PolarsResult, polars_bail, polars_err};12use polars_io::cloud::CloudOptions;13use polars_io::prelude::estimate_n_lines_in_file;14use polars_io::utils::compression::maybe_decompress_bytes;15use polars_plan::dsl::{NDJsonReadOptions, ScanSource};16use polars_utils::IdxSize;17use polars_utils::mem::prefetch::get_memory_prefetch_func;18use polars_utils::mmap::MemSlice;19use polars_utils::priority::Priority;20use polars_utils::slice_enum::Slice;21use row_index_limit_pass::ApplyRowIndexOrLimit;2223use super::multi_scan::reader_interface::output::FileReaderOutputRecv;24use super::multi_scan::reader_interface::{BeginReadArgs, FileReader, FileReaderCallbacks};25use crate::async_executor::{AbortOnDropHandle, spawn};26use crate::async_primitives::distributor_channel::distributor_channel;27use crate::async_primitives::linearizer::Linearizer;28use crate::morsel::SourceToken;29use crate::nodes::compute_node_prelude::*;30use crate::nodes::io_sources::multi_scan::reader_interface::Projection;31use crate::nodes::io_sources::multi_scan::reader_interface::output::FileReaderOutputSend;32use crate::nodes::{MorselSeq, TaskPriority};33mod chunk_reader;34mod line_batch_distributor;35mod line_batch_processor;36mod negative_slice_pass;37mod row_index_limit_pass;3839#[derive(Clone)]40pub struct NDJsonFileReader {41scan_source: ScanSource,42#[expect(unused)] // Will be used when implementing cloud streaming.43cloud_options: Option<Arc<CloudOptions>>,44options: Arc<NDJsonReadOptions>,45verbose: bool,46// Cached on first access - we may be called multiple times e.g. on negative slice.47cached_bytes: Option<MemSlice>,48}4950#[async_trait]51impl FileReader for NDJsonFileReader {52async fn initialize(&mut self) -> PolarsResult<()> {53Ok(())54}5556fn begin_read(57&mut self,58args: BeginReadArgs,59) -> PolarsResult<(FileReaderOutputRecv, JoinHandle<PolarsResult<()>>)> {60let verbose = self.verbose;6162let BeginReadArgs {63projection: Projection::Plain(projected_schema),64mut row_index,65pre_slice,6667num_pipelines,68callbacks:69FileReaderCallbacks {70file_schema_tx,71n_rows_in_file_tx,72row_position_on_end_tx,73},7475predicate: None,76cast_columns_policy: _,77} = args78else {79panic!("unsupported args: {:?}", &args)80};8182// TODO: This currently downloads and decompresses everything upfront in a blocking manner.83// Ideally we have a streaming download/decompression.84let global_bytes = self.get_bytes_maybe_decompress()?;8586// NDJSON: We just use the projected schema - the parser will automatically append NULL if87// the field is not found.88//89// TODO90// We currently always use the projected dtype, but this may cause91// issues e.g. with temporal types. This can be improved to better choose92// between the 2 dtypes.93let schema = projected_schema;9495if let Some(mut tx) = file_schema_tx {96_ = tx.try_send(schema.clone())97}9899let is_negative_slice = matches!(pre_slice, Some(Slice::Negative { .. }));100101// Convert (offset, len) to Range102// Note: This is converted to right-to-left for negative slice (i.e. range.start is position103// from end).104let global_slice: Option<Range<usize>> = if let Some(slice) = pre_slice.clone() {105match slice {106Slice::Positive { offset, len } => Some(offset..offset.saturating_add(len)),107Slice::Negative {108offset_from_end,109len,110} => {111// array: [_ _ _ _ _]112// slice: [ _ _ ]113// in: offset_from_end: 3, len: 2114// out: 1..3 (right-to-left)115Some(offset_from_end.saturating_sub(len)..offset_from_end)116},117}118} else {119None120};121122let (total_row_count_tx, total_row_count_rx) = if is_negative_slice && row_index.is_some() {123let (tx, rx) = tokio::sync::oneshot::channel();124(Some(tx), Some(rx))125} else {126(None, None)127};128129let needs_total_row_count = total_row_count_tx.is_some()130|| n_rows_in_file_tx.is_some()131|| (row_position_on_end_tx.is_some()132&& matches!(pre_slice, Some(Slice::Negative { .. })));133134let chunk_size: usize = {135let n_bytes_to_split = if let Some(x) = global_slice.as_ref() {136if needs_total_row_count {137global_bytes.len()138} else {139// There may be early stopping, try to heuristically use a smaller chunk size to stop faster.140let n_rows_to_sample = 8;141let n_lines_estimate =142estimate_n_lines_in_file(global_bytes.as_ref(), n_rows_to_sample);143let line_length_estimate = global_bytes.len().div_ceil(n_lines_estimate);144145if verbose {146eprintln!(147"[NDJsonFileReader]: n_lines_estimate: {n_lines_estimate}, line_length_estimate: {line_length_estimate}"148);149}150151// Estimated stopping point in the file152x.end.saturating_mul(line_length_estimate)153}154} else {155global_bytes.len()156};157158let chunk_size = n_bytes_to_split.div_ceil(16 * num_pipelines);159160let max_chunk_size = 16 * 1024 * 1024;161// Use a small min chunk size to catch failures in tests.162#[cfg(debug_assertions)]163let min_chunk_size = 64;164#[cfg(not(debug_assertions))]165let min_chunk_size = 1024 * 4;166167let chunk_size = chunk_size.clamp(min_chunk_size, max_chunk_size);168169std::env::var("POLARS_FORCE_NDJSON_CHUNK_SIZE").map_or(chunk_size, |x| {170x.parse::<usize>()171.expect("expected `POLARS_FORCE_NDJSON_CHUNK_SIZE` to be an integer")172})173};174175if verbose {176eprintln!(177"[NDJsonFileReader]: \178project: {}, \179global_slice: {:?}, \180row_index: {:?}, \181chunk_size: {}, \182n_chunks: {}, \183is_negative_slice: {}",184schema.len(),185&global_slice,186&row_index,187chunk_size,188global_bytes.len().div_ceil(chunk_size),189is_negative_slice,190);191}192193// Note: This counts from the end of file for negative slice.194let n_rows_to_skip = global_slice.as_ref().map_or(0, |x| x.start);195196let (opt_linearizer, mut linearizer_inserters) =197if global_slice.is_some() || row_index.is_some() {198let (a, b) =199Linearizer::<Priority<Reverse<MorselSeq>, DataFrame>>::new(num_pipelines, 1);200(Some(a), b)201} else {202(None, vec![])203};204205let output_to_linearizer = opt_linearizer.is_some();206207let mut output_port = None;208209let opt_post_process_handle = if is_negative_slice {210// Note: This is right-to-left211let negative_slice = global_slice.unwrap();212213if verbose {214eprintln!("[NDJsonFileReader]: Initialize morsel stream reverser");215}216217let (morsel_senders, rx) = FileReaderOutputSend::new_parallel(num_pipelines);218output_port = Some(rx);219220Some(AbortOnDropHandle::new(spawn(221TaskPriority::High,222MorselStreamReverser {223morsel_receiver: opt_linearizer.unwrap(),224morsel_senders,225offset_len_rtl: (226negative_slice.start,227negative_slice.end - negative_slice.start,228),229// The correct row index offset can only be known after total row count is230// available. This is handled by the MorselStreamReverser.231row_index: row_index.take().map(|x| (x, total_row_count_rx.unwrap())),232verbose,233}234.run(),235)))236} else if global_slice.is_some() || row_index.is_some() {237let mut row_index = row_index.take();238239if verbose {240eprintln!("[NDJsonFileReader]: Initialize ApplyRowIndexOrLimit");241}242243if let Some(ri) = row_index.as_mut() {244// Update the row index offset according to the slice start.245let Some(v) = ri.offset.checked_add(n_rows_to_skip as IdxSize) else {246let offset = ri.offset;247248polars_bail!(249ComputeError:250"row_index with offset {} overflows at {} rows",251offset, n_rows_to_skip252)253};254ri.offset = v;255}256257let (morsel_tx, rx) = FileReaderOutputSend::new_serial();258output_port = Some(rx);259260let limit = global_slice.as_ref().map(|x| x.len());261262let task = ApplyRowIndexOrLimit {263morsel_receiver: opt_linearizer.unwrap(),264morsel_tx,265// Note: The line batch distributor handles skipping lines until the offset,266// we only need to handle the limit here.267limit,268row_index,269verbose,270};271272if limit == Some(0) {273None274} else {275Some(AbortOnDropHandle::new(spawn(276TaskPriority::High,277task.run(),278)))279}280} else {281None282};283284let schema = Arc::new(schema);285let chunk_reader = Arc::new(self.try_init_chunk_reader(&schema)?);286287if !is_negative_slice {288get_memory_prefetch_func(verbose)(global_bytes.as_ref());289}290291let (line_batch_distribute_tx, line_batch_distribute_receivers) =292distributor_channel(num_pipelines, 1);293294let mut morsel_senders = if !output_to_linearizer {295let (senders, outp) = FileReaderOutputSend::new_parallel(num_pipelines);296assert!(output_port.is_none());297output_port = Some(outp);298senders299} else {300vec![]301};302303// Initialize in reverse as we want to manually pop from either the linearizer or the phase receivers depending304// on if we have negative slice.305let line_batch_processor_handles = line_batch_distribute_receivers306.into_iter()307.enumerate()308.rev()309.map(|(worker_idx, line_batch_rx)| {310let global_bytes = global_bytes.clone();311let chunk_reader = chunk_reader.clone();312// Note: We don't use this (it is handled by the bridge). But morsels require a source token.313let source_token = SourceToken::new();314315AbortOnDropHandle::new(spawn(316TaskPriority::Low,317LineBatchProcessor {318worker_idx,319320global_bytes,321chunk_reader,322323line_batch_rx,324output_port: if output_to_linearizer {325LineBatchProcessorOutputPort::Linearize {326tx: linearizer_inserters.pop().unwrap(),327}328} else {329LineBatchProcessorOutputPort::Direct {330tx: morsel_senders.pop().unwrap(),331source_token,332}333},334needs_total_row_count,335336// Only log from the last worker to prevent flooding output.337verbose: verbose && worker_idx == num_pipelines - 1,338}339.run(),340))341})342.collect::<Vec<_>>();343344let line_batch_distributor_task_handle = AbortOnDropHandle::new(spawn(345TaskPriority::Low,346line_batch_distributor::LineBatchDistributor {347global_bytes,348chunk_size,349n_rows_to_skip,350reverse: is_negative_slice,351line_batch_distribute_tx,352}353.run(),354));355356let finishing_handle = spawn(TaskPriority::Low, async move {357// Number of rows skipped by the line batch distributor.358let n_rows_skipped: usize = line_batch_distributor_task_handle.await?;359// Number of rows processed by the line batch processors.360let mut n_rows_processed: usize = 0;361362if verbose {363eprintln!("[NDJsonFileReader]: line batch distributor handle returned");364}365366for handle in line_batch_processor_handles {367n_rows_processed = n_rows_processed.saturating_add(handle.await?);368}369370let total_row_count =371needs_total_row_count.then_some(n_rows_skipped.saturating_add(n_rows_processed));372373if verbose {374eprintln!("[NDJsonFileReader]: line batch processor handles returned");375}376377if let Some(mut row_position_on_end_tx) = row_position_on_end_tx {378let n = match pre_slice {379None => n_rows_skipped.saturating_add(n_rows_processed),380381Some(Slice::Positive { offset, len }) => n_rows_skipped382.saturating_add(n_rows_processed)383.min(offset.saturating_add(len)),384385Some(Slice::Negative { .. }) => {386total_row_count.unwrap().saturating_sub(n_rows_skipped)387},388};389390let n = IdxSize::try_from(n)391.map_err(|_| polars_err!(bigidx, ctx = "ndjson file", size = n))?;392393_ = row_position_on_end_tx.try_send(n);394}395396if let Some(tx) = total_row_count_tx {397let total_row_count = total_row_count.unwrap();398399if verbose {400eprintln!(401"[NDJsonFileReader]: \402send total row count: {total_row_count}"403)404}405_ = tx.send(total_row_count);406}407408if let Some(mut n_rows_in_file_tx) = n_rows_in_file_tx {409let total_row_count = total_row_count.unwrap();410411if verbose {412eprintln!("[NDJsonFileReader]: send n_rows_in_file: {total_row_count}");413}414415let num_rows = total_row_count;416let num_rows = IdxSize::try_from(num_rows)417.map_err(|_| polars_err!(bigidx, ctx = "ndjson file", size = num_rows))?;418_ = n_rows_in_file_tx.try_send(num_rows);419}420421if let Some(handle) = opt_post_process_handle {422handle.await?;423}424425if verbose {426eprintln!("[NDJsonFileReader]: returning");427}428429Ok(())430});431432Ok((output_port.unwrap(), finishing_handle))433}434}435436impl NDJsonFileReader {437fn try_init_chunk_reader(&self, schema: &SchemaRef) -> PolarsResult<ChunkReader> {438ChunkReader::try_new(&self.options, schema)439}440441fn get_bytes_maybe_decompress(&mut self) -> PolarsResult<MemSlice> {442if self.cached_bytes.is_none() {443let run_async = self.scan_source.run_async();444let source = self445.scan_source446.as_scan_source_ref()447.to_memslice_async_assume_latest(run_async)?;448449let memslice = {450let mut out = vec![];451maybe_decompress_bytes(&source, &mut out)?;452453if out.is_empty() {454source455} else {456MemSlice::from_vec(out)457}458};459460self.cached_bytes = Some(memslice);461}462463Ok(self.cached_bytes.clone().unwrap())464}465}466467468