Path: blob/main/crates/polars-plan/src/plans/functions/count.rs
6940 views
#[cfg(feature = "ipc")]1use arrow::io::ipc::read::get_row_count as count_rows_ipc_sync;2#[cfg(any(3feature = "parquet",4feature = "ipc",5feature = "json",6feature = "csv"7))]8use polars_core::error::feature_gated;9#[cfg(any(feature = "json", feature = "parquet"))]10use polars_io::SerReader;11#[cfg(any(feature = "parquet", feature = "json"))]12use polars_io::cloud::CloudOptions;13#[cfg(feature = "parquet")]14use polars_io::parquet::read::ParquetReader;15#[cfg(all(feature = "parquet", feature = "async"))]16use polars_io::pl_async::{get_runtime, with_concurrency_budget};17use polars_utils::plpath::PlPath;1819use super::*;2021#[allow(unused_variables)]22pub fn count_rows(23sources: &ScanSources,24scan_type: &FileScanIR,25cloud_options: Option<&CloudOptions>,26alias: Option<PlSmallStr>,27) -> PolarsResult<DataFrame> {28#[cfg(not(any(29feature = "parquet",30feature = "ipc",31feature = "json",32feature = "csv"33)))]34{35unreachable!()36}3738#[cfg(any(39feature = "parquet",40feature = "ipc",41feature = "json",42feature = "csv"43))]44{45let count: PolarsResult<usize> = match scan_type {46#[cfg(feature = "csv")]47FileScanIR::Csv { options } => count_all_rows_csv(sources, options),48#[cfg(feature = "parquet")]49FileScanIR::Parquet { .. } => count_rows_parquet(sources, cloud_options),50#[cfg(feature = "ipc")]51FileScanIR::Ipc { options, metadata } => count_rows_ipc(52sources,53#[cfg(feature = "cloud")]54cloud_options,55metadata.as_deref(),56),57#[cfg(feature = "json")]58FileScanIR::NDJson { options } => count_rows_ndjson(sources, cloud_options),59#[cfg(feature = "python")]60FileScanIR::PythonDataset { .. } => unreachable!(),61FileScanIR::Anonymous { .. } => {62unreachable!()63},64};65let count = count?;66let count: IdxSize = count.try_into().map_err(67|_| polars_err!(ComputeError: "count of {} exceeded maximum row size", count),68)?;69let column_name = alias.unwrap_or(PlSmallStr::from_static(crate::constants::LEN));70DataFrame::new(vec![Column::new(column_name, [count])])71}72}7374#[cfg(feature = "csv")]75fn count_all_rows_csv(76sources: &ScanSources,77options: &polars_io::prelude::CsvReadOptions,78) -> PolarsResult<usize> {79let parse_options = options.get_parse_options();8081sources82.iter()83.map(|source| match source {84ScanSourceRef::Path(addr) => polars_io::csv::read::count_rows(85addr,86parse_options.separator,87parse_options.quote_char,88parse_options.comment_prefix.as_ref(),89parse_options.eol_char,90options.has_header,91options.skip_lines,92options.skip_rows,93options.skip_rows_after_header,94),95_ => {96let memslice = source.to_memslice()?;9798polars_io::csv::read::count_rows_from_slice_par(99&memslice[..],100parse_options.separator,101parse_options.quote_char,102parse_options.comment_prefix.as_ref(),103parse_options.eol_char,104options.has_header,105options.skip_lines,106options.skip_rows,107options.skip_rows_after_header,108)109},110})111.sum()112}113114#[cfg(feature = "parquet")]115pub(super) fn count_rows_parquet(116sources: &ScanSources,117#[allow(unused)] cloud_options: Option<&CloudOptions>,118) -> PolarsResult<usize> {119if sources.is_empty() {120return Ok(0);121};122123if sources.is_cloud_url() {124feature_gated!("cloud", {125get_runtime().block_on(count_rows_cloud_parquet(126sources.as_paths().unwrap(),127cloud_options,128))129})130} else {131sources132.iter()133.map(|source| {134ParquetReader::new(std::io::Cursor::new(source.to_memslice()?)).num_rows()135})136.sum::<PolarsResult<usize>>()137}138}139140#[cfg(all(feature = "parquet", feature = "async"))]141async fn count_rows_cloud_parquet(142addrs: &[PlPath],143cloud_options: Option<&CloudOptions>,144) -> PolarsResult<usize> {145use polars_io::prelude::ParquetObjectStore;146147let collection = addrs.iter().map(|path| {148with_concurrency_budget(1, || async {149let mut reader =150ParquetObjectStore::from_uri(path.to_str(), cloud_options, None).await?;151reader.num_rows().await152})153});154futures::future::try_join_all(collection)155.await156.map(|rows| rows.iter().sum())157}158159#[cfg(feature = "ipc")]160pub(super) fn count_rows_ipc(161sources: &ScanSources,162#[cfg(feature = "cloud")] cloud_options: Option<&CloudOptions>,163metadata: Option<&arrow::io::ipc::read::FileMetadata>,164) -> PolarsResult<usize> {165if sources.is_empty() {166return Ok(0);167};168let is_cloud = sources.is_cloud_url();169170if is_cloud {171feature_gated!("cloud", {172get_runtime().block_on(count_rows_cloud_ipc(173sources.as_paths().unwrap(),174cloud_options,175metadata,176))177})178} else {179sources180.iter()181.map(|source| {182let memslice = source.to_memslice()?;183count_rows_ipc_sync(&mut std::io::Cursor::new(memslice)).map(|v| v as usize)184})185.sum::<PolarsResult<usize>>()186}187}188189#[cfg(all(feature = "ipc", feature = "async"))]190async fn count_rows_cloud_ipc(191addrs: &[PlPath],192cloud_options: Option<&CloudOptions>,193metadata: Option<&arrow::io::ipc::read::FileMetadata>,194) -> PolarsResult<usize> {195use polars_io::ipc::IpcReaderAsync;196197let collection = addrs.iter().map(|path| {198with_concurrency_budget(1, || async {199let reader = IpcReaderAsync::from_uri(path.to_str(), cloud_options).await?;200reader.count_rows(metadata).await201})202});203futures::future::try_join_all(collection)204.await205.map(|rows| rows.iter().map(|v| *v as usize).sum())206}207208#[cfg(feature = "json")]209pub(super) fn count_rows_ndjson(210sources: &ScanSources,211cloud_options: Option<&CloudOptions>,212) -> PolarsResult<usize> {213use polars_core::config;214use polars_io::utils::compression::maybe_decompress_bytes;215216if sources.is_empty() {217return Ok(0);218}219220let is_cloud_url = sources.is_cloud_url();221let run_async = is_cloud_url || (sources.is_paths() && config::force_async());222223let cache_entries = {224if run_async {225feature_gated!("cloud", {226Some(polars_io::file_cache::init_entries_from_uri_list(227sources228.as_paths()229.unwrap()230.iter()231.map(|path| Arc::from(path.to_str())),232cloud_options,233)?)234})235} else {236None237}238};239240sources241.iter()242.map(|source| {243let memslice =244source.to_memslice_possibly_async(run_async, cache_entries.as_ref(), 0)?;245246let owned = &mut vec![];247let reader = polars_io::ndjson::core::JsonLineReader::new(std::io::Cursor::new(248maybe_decompress_bytes(&memslice[..], owned)?,249));250reader.count()251})252.sum()253}254255256