Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/ndjson/mod.rs
6939 views
1
use core::{get_file_chunks_json, json_lines};
2
use std::num::NonZeroUsize;
3
4
use arrow::array::StructArray;
5
use polars_core::POOL;
6
use polars_core::prelude::*;
7
use rayon::iter::{IntoParallelIterator, ParallelIterator};
8
9
pub(crate) mod buffer;
10
pub mod core;
11
12
pub fn infer_schema<R: std::io::BufRead>(
13
reader: &mut R,
14
infer_schema_len: Option<NonZeroUsize>,
15
) -> PolarsResult<Schema> {
16
let dtypes = polars_json::ndjson::iter_unique_dtypes(reader, infer_schema_len)?;
17
let dtype =
18
crate::json::infer::dtypes_to_supertype(dtypes.map(|dt| DataType::from_arrow_dtype(&dt)))?;
19
let schema = StructArray::get_fields(&dtype.to_arrow(CompatLevel::newest()))
20
.iter()
21
.map(Into::<Field>::into)
22
.collect();
23
Ok(schema)
24
}
25
26
/// Count the number of rows. The slice passed must represent the entire file. This will
27
/// potentially parallelize using rayon.
28
///
29
/// This does not check if the lines are valid NDJSON - it assumes that is the case.
30
pub fn count_rows_par(full_bytes: &[u8], n_threads: Option<usize>) -> usize {
31
let n_threads = n_threads.unwrap_or(POOL.current_num_threads());
32
let file_chunks = get_file_chunks_json(full_bytes, n_threads);
33
34
if file_chunks.len() == 1 {
35
count_rows(full_bytes)
36
} else {
37
let iter = file_chunks
38
.into_par_iter()
39
.map(|(start_pos, stop_at_nbytes)| count_rows(&full_bytes[start_pos..stop_at_nbytes]));
40
41
POOL.install(|| iter.sum())
42
}
43
}
44
45
/// Count the number of rows. The slice passed must represent the entire file.
46
/// This does not check if the lines are valid NDJSON - it assumes that is the case.
47
pub fn count_rows(full_bytes: &[u8]) -> usize {
48
json_lines(full_bytes).count()
49
}
50
51