Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/read/mod.rs
6940 views
1
//! APIs to read from Parquet format.
2
#![allow(clippy::type_complexity)]
3
4
mod deserialize;
5
pub mod expr;
6
pub mod schema;
7
pub mod statistics;
8
9
use std::io::{Read, Seek};
10
11
use arrow::types::{NativeType, i256};
12
pub use deserialize::{
13
Filter, InitNested, NestedState, PredicateFilter, column_iter_to_arrays, create_list,
14
create_map, get_page_iterator, init_nested, n_columns,
15
};
16
#[cfg(feature = "async")]
17
use futures::{AsyncRead, AsyncSeek};
18
use polars_error::PolarsResult;
19
pub use schema::{FileMetadata, infer_schema};
20
21
#[cfg(feature = "async")]
22
pub use crate::parquet::read::{get_page_stream, read_metadata_async as _read_metadata_async};
23
// re-exports of crate::parquet's relevant APIs
24
pub use crate::parquet::{
25
FallibleStreamingIterator,
26
error::ParquetError,
27
fallible_streaming_iterator,
28
metadata::{ColumnChunkMetadata, ColumnDescriptor, RowGroupMetadata},
29
page::{CompressedDataPage, DataPageHeader, Page},
30
read::{
31
BasicDecompressor, MutStreamingIterator, PageReader, ReadColumnIterator, State, decompress,
32
get_column_iterator, read_metadata as _read_metadata,
33
},
34
schema::types::{
35
GroupLogicalType, ParquetType, PhysicalType, PrimitiveConvertedType, PrimitiveLogicalType,
36
TimeUnit as ParquetTimeUnit,
37
},
38
types::int96_to_i64_ns,
39
};
40
41
/// Returns all [`ColumnChunkMetadata`] associated to `field_name`.
42
/// For non-nested parquet types, this returns a single column
43
pub fn get_field_pages<'a, T>(
44
columns: &'a [ColumnChunkMetadata],
45
items: &'a [T],
46
field_name: &str,
47
) -> Vec<&'a T> {
48
columns
49
.iter()
50
.zip(items)
51
.filter(|(metadata, _)| metadata.descriptor().path_in_schema[0].as_str() == field_name)
52
.map(|(_, item)| item)
53
.collect()
54
}
55
56
/// Reads parquets' metadata synchronously.
57
pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> PolarsResult<FileMetadata> {
58
Ok(_read_metadata(reader)?)
59
}
60
61
/// Reads parquets' metadata asynchronously.
62
#[cfg(feature = "async")]
63
pub async fn read_metadata_async<R: AsyncRead + AsyncSeek + Send + Unpin>(
64
reader: &mut R,
65
) -> PolarsResult<FileMetadata> {
66
Ok(_read_metadata_async(reader).await?)
67
}
68
69
fn convert_year_month(value: &[u8]) -> i32 {
70
i32::from_le_bytes(value[..4].try_into().unwrap())
71
}
72
73
fn convert_days_ms(value: &[u8]) -> arrow::types::days_ms {
74
arrow::types::days_ms(
75
i32::from_le_bytes(value[4..8].try_into().unwrap()),
76
i32::from_le_bytes(value[8..12].try_into().unwrap()),
77
)
78
}
79
80
fn convert_i128(value: &[u8], n: usize) -> i128 {
81
// Copy the fixed-size byte value to the start of a 16 byte stack
82
// allocated buffer, then use an arithmetic right shift to fill in
83
// MSBs, which accounts for leading 1's in negative (two's complement)
84
// values.
85
let mut bytes = [0u8; 16];
86
bytes[..n].copy_from_slice(value);
87
i128::from_be_bytes(bytes) >> (8 * (16 - n))
88
}
89
90
fn convert_i256(value: &[u8]) -> i256 {
91
if value[0] >= 128 {
92
let mut neg_bytes = [255u8; 32];
93
neg_bytes[32 - value.len()..].copy_from_slice(value);
94
i256::from_be_bytes(neg_bytes)
95
} else {
96
let mut bytes = [0u8; 32];
97
bytes[32 - value.len()..].copy_from_slice(value);
98
i256::from_be_bytes(bytes)
99
}
100
}
101
102