Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/functions/count.rs
6940 views
1
#[cfg(feature = "ipc")]
2
use arrow::io::ipc::read::get_row_count as count_rows_ipc_sync;
3
#[cfg(any(
4
feature = "parquet",
5
feature = "ipc",
6
feature = "json",
7
feature = "csv"
8
))]
9
use polars_core::error::feature_gated;
10
#[cfg(any(feature = "json", feature = "parquet"))]
11
use polars_io::SerReader;
12
#[cfg(any(feature = "parquet", feature = "json"))]
13
use polars_io::cloud::CloudOptions;
14
#[cfg(feature = "parquet")]
15
use polars_io::parquet::read::ParquetReader;
16
#[cfg(all(feature = "parquet", feature = "async"))]
17
use polars_io::pl_async::{get_runtime, with_concurrency_budget};
18
use polars_utils::plpath::PlPath;
19
20
use super::*;
21
22
#[allow(unused_variables)]
23
pub fn count_rows(
24
sources: &ScanSources,
25
scan_type: &FileScanIR,
26
cloud_options: Option<&CloudOptions>,
27
alias: Option<PlSmallStr>,
28
) -> PolarsResult<DataFrame> {
29
#[cfg(not(any(
30
feature = "parquet",
31
feature = "ipc",
32
feature = "json",
33
feature = "csv"
34
)))]
35
{
36
unreachable!()
37
}
38
39
#[cfg(any(
40
feature = "parquet",
41
feature = "ipc",
42
feature = "json",
43
feature = "csv"
44
))]
45
{
46
let count: PolarsResult<usize> = match scan_type {
47
#[cfg(feature = "csv")]
48
FileScanIR::Csv { options } => count_all_rows_csv(sources, options),
49
#[cfg(feature = "parquet")]
50
FileScanIR::Parquet { .. } => count_rows_parquet(sources, cloud_options),
51
#[cfg(feature = "ipc")]
52
FileScanIR::Ipc { options, metadata } => count_rows_ipc(
53
sources,
54
#[cfg(feature = "cloud")]
55
cloud_options,
56
metadata.as_deref(),
57
),
58
#[cfg(feature = "json")]
59
FileScanIR::NDJson { options } => count_rows_ndjson(sources, cloud_options),
60
#[cfg(feature = "python")]
61
FileScanIR::PythonDataset { .. } => unreachable!(),
62
FileScanIR::Anonymous { .. } => {
63
unreachable!()
64
},
65
};
66
let count = count?;
67
let count: IdxSize = count.try_into().map_err(
68
|_| polars_err!(ComputeError: "count of {} exceeded maximum row size", count),
69
)?;
70
let column_name = alias.unwrap_or(PlSmallStr::from_static(crate::constants::LEN));
71
DataFrame::new(vec![Column::new(column_name, [count])])
72
}
73
}
74
75
#[cfg(feature = "csv")]
76
fn count_all_rows_csv(
77
sources: &ScanSources,
78
options: &polars_io::prelude::CsvReadOptions,
79
) -> PolarsResult<usize> {
80
let parse_options = options.get_parse_options();
81
82
sources
83
.iter()
84
.map(|source| match source {
85
ScanSourceRef::Path(addr) => polars_io::csv::read::count_rows(
86
addr,
87
parse_options.separator,
88
parse_options.quote_char,
89
parse_options.comment_prefix.as_ref(),
90
parse_options.eol_char,
91
options.has_header,
92
options.skip_lines,
93
options.skip_rows,
94
options.skip_rows_after_header,
95
),
96
_ => {
97
let memslice = source.to_memslice()?;
98
99
polars_io::csv::read::count_rows_from_slice_par(
100
&memslice[..],
101
parse_options.separator,
102
parse_options.quote_char,
103
parse_options.comment_prefix.as_ref(),
104
parse_options.eol_char,
105
options.has_header,
106
options.skip_lines,
107
options.skip_rows,
108
options.skip_rows_after_header,
109
)
110
},
111
})
112
.sum()
113
}
114
115
#[cfg(feature = "parquet")]
116
pub(super) fn count_rows_parquet(
117
sources: &ScanSources,
118
#[allow(unused)] cloud_options: Option<&CloudOptions>,
119
) -> PolarsResult<usize> {
120
if sources.is_empty() {
121
return Ok(0);
122
};
123
124
if sources.is_cloud_url() {
125
feature_gated!("cloud", {
126
get_runtime().block_on(count_rows_cloud_parquet(
127
sources.as_paths().unwrap(),
128
cloud_options,
129
))
130
})
131
} else {
132
sources
133
.iter()
134
.map(|source| {
135
ParquetReader::new(std::io::Cursor::new(source.to_memslice()?)).num_rows()
136
})
137
.sum::<PolarsResult<usize>>()
138
}
139
}
140
141
#[cfg(all(feature = "parquet", feature = "async"))]
142
async fn count_rows_cloud_parquet(
143
addrs: &[PlPath],
144
cloud_options: Option<&CloudOptions>,
145
) -> PolarsResult<usize> {
146
use polars_io::prelude::ParquetObjectStore;
147
148
let collection = addrs.iter().map(|path| {
149
with_concurrency_budget(1, || async {
150
let mut reader =
151
ParquetObjectStore::from_uri(path.to_str(), cloud_options, None).await?;
152
reader.num_rows().await
153
})
154
});
155
futures::future::try_join_all(collection)
156
.await
157
.map(|rows| rows.iter().sum())
158
}
159
160
#[cfg(feature = "ipc")]
161
pub(super) fn count_rows_ipc(
162
sources: &ScanSources,
163
#[cfg(feature = "cloud")] cloud_options: Option<&CloudOptions>,
164
metadata: Option<&arrow::io::ipc::read::FileMetadata>,
165
) -> PolarsResult<usize> {
166
if sources.is_empty() {
167
return Ok(0);
168
};
169
let is_cloud = sources.is_cloud_url();
170
171
if is_cloud {
172
feature_gated!("cloud", {
173
get_runtime().block_on(count_rows_cloud_ipc(
174
sources.as_paths().unwrap(),
175
cloud_options,
176
metadata,
177
))
178
})
179
} else {
180
sources
181
.iter()
182
.map(|source| {
183
let memslice = source.to_memslice()?;
184
count_rows_ipc_sync(&mut std::io::Cursor::new(memslice)).map(|v| v as usize)
185
})
186
.sum::<PolarsResult<usize>>()
187
}
188
}
189
190
#[cfg(all(feature = "ipc", feature = "async"))]
191
async fn count_rows_cloud_ipc(
192
addrs: &[PlPath],
193
cloud_options: Option<&CloudOptions>,
194
metadata: Option<&arrow::io::ipc::read::FileMetadata>,
195
) -> PolarsResult<usize> {
196
use polars_io::ipc::IpcReaderAsync;
197
198
let collection = addrs.iter().map(|path| {
199
with_concurrency_budget(1, || async {
200
let reader = IpcReaderAsync::from_uri(path.to_str(), cloud_options).await?;
201
reader.count_rows(metadata).await
202
})
203
});
204
futures::future::try_join_all(collection)
205
.await
206
.map(|rows| rows.iter().map(|v| *v as usize).sum())
207
}
208
209
#[cfg(feature = "json")]
210
pub(super) fn count_rows_ndjson(
211
sources: &ScanSources,
212
cloud_options: Option<&CloudOptions>,
213
) -> PolarsResult<usize> {
214
use polars_core::config;
215
use polars_io::utils::compression::maybe_decompress_bytes;
216
217
if sources.is_empty() {
218
return Ok(0);
219
}
220
221
let is_cloud_url = sources.is_cloud_url();
222
let run_async = is_cloud_url || (sources.is_paths() && config::force_async());
223
224
let cache_entries = {
225
if run_async {
226
feature_gated!("cloud", {
227
Some(polars_io::file_cache::init_entries_from_uri_list(
228
sources
229
.as_paths()
230
.unwrap()
231
.iter()
232
.map(|path| Arc::from(path.to_str())),
233
cloud_options,
234
)?)
235
})
236
} else {
237
None
238
}
239
};
240
241
sources
242
.iter()
243
.map(|source| {
244
let memslice =
245
source.to_memslice_possibly_async(run_async, cache_entries.as_ref(), 0)?;
246
247
let owned = &mut vec![];
248
let reader = polars_io::ndjson::core::JsonLineReader::new(std::io::Cursor::new(
249
maybe_decompress_bytes(&memslice[..], owned)?,
250
));
251
reader.count()
252
})
253
.sum()
254
}
255
256