Path: blob/main/crates/polars-stream/src/nodes/io_sources/ndjson/chunk_reader.rs
8446 views
use polars_core::schema::SchemaRef;1use polars_error::PolarsResult;2use polars_io::ndjson;3use polars_io::prelude::{is_json_line, parse_ndjson};4#[cfg(feature = "scan_lines")]5use polars_utils::pl_str::PlSmallStr;67use crate::nodes::compute_node_prelude::*;89#[derive(Clone)]10pub enum ChunkReaderBuilder {11NDJson {12ignore_errors: bool,13},14#[cfg(feature = "scan_lines")]15Lines,16}1718#[derive(Clone)]19pub enum ChunkReader {20/// NDJSON chunk reader.21NDJson {22projected_schema: SchemaRef,23ignore_errors: bool,24},25#[cfg(feature = "scan_lines")]26Lines {27/// If this is `None` we are projecting 0-width morsels.28projection: Option<PlSmallStr>,29},30}3132impl ChunkReaderBuilder {33pub(super) fn build(&self, projected_schema: SchemaRef) -> ChunkReader {34match self {35Self::NDJson { ignore_errors } => ChunkReader::NDJson {36projected_schema,37ignore_errors: *ignore_errors,38},39#[cfg(feature = "scan_lines")]40Self::Lines => {41use polars_core::prelude::DataType;4243assert!(projected_schema.len() <= 1);4445let projection = projected_schema46.get_at_index(0)47.map(|(projected_name, dtype)| {48assert!(matches!(dtype, DataType::String));49projected_name.clone()50});5152ChunkReader::Lines { projection }53},54}55}5657pub(super) fn is_line_fn(&self) -> fn(&[u8]) -> bool {58match self {59Self::NDJson { .. } => is_json_line,60#[cfg(feature = "scan_lines")]61Self::Lines { .. } => |_: &[u8]| true,62}63}64}6566impl ChunkReader {67pub(super) fn read_chunk(&self, chunk: &[u8]) -> PolarsResult<DataFrame> {68match self {69Self::NDJson {70projected_schema,71ignore_errors,72} => {73if projected_schema.is_empty() {74Ok(DataFrame::empty_with_height(ndjson::count_rows(chunk)))75} else {76parse_ndjson(chunk, None, projected_schema, *ignore_errors)77}78},79#[cfg(feature = "scan_lines")]80Self::Lines { projection } => {81use polars_core::prelude::IntoColumn;82use polars_core::series::Series;83use polars_io::scan_lines;8485let Some(name) = projection else {86return Ok(DataFrame::empty_with_height(scan_lines::count_lines(chunk)));87};8889let out: Series = scan_lines::split_lines_to_rows(chunk)?.with_name(name.clone());90let out = unsafe { DataFrame::new_unchecked(out.len(), vec![out.into_column()]) };9192Ok(out)93},94}95}96}979899