Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/parquet/read/async_impl.rs
6940 views
1
//! Read parquet files in parallel from the Object Store without a third party crate.
2
3
use arrow::datatypes::ArrowSchemaRef;
4
use object_store::path::Path as ObjectPath;
5
use polars_core::prelude::*;
6
use polars_parquet::write::FileMetadata;
7
8
use crate::cloud::{
9
CloudLocation, CloudOptions, PolarsObjectStore, build_object_store, object_path_from_str,
10
};
11
use crate::parquet::metadata::FileMetadataRef;
12
13
pub struct ParquetObjectStore {
14
store: PolarsObjectStore,
15
path: ObjectPath,
16
length: Option<usize>,
17
metadata: Option<FileMetadataRef>,
18
schema: Option<ArrowSchemaRef>,
19
}
20
21
impl ParquetObjectStore {
22
pub async fn from_uri(
23
uri: &str,
24
options: Option<&CloudOptions>,
25
metadata: Option<FileMetadataRef>,
26
) -> PolarsResult<Self> {
27
let (CloudLocation { prefix, .. }, store) = build_object_store(uri, options, false).await?;
28
let path = object_path_from_str(&prefix)?;
29
30
Ok(ParquetObjectStore {
31
store,
32
path,
33
length: None,
34
metadata,
35
schema: None,
36
})
37
}
38
39
/// Initialize the length property of the object, unless it has already been fetched.
40
async fn length(&mut self) -> PolarsResult<usize> {
41
if self.length.is_none() {
42
self.length = Some(self.store.head(&self.path).await?.size as usize);
43
}
44
Ok(self.length.unwrap())
45
}
46
47
/// Number of rows in the parquet file.
48
pub async fn num_rows(&mut self) -> PolarsResult<usize> {
49
let metadata = self.get_metadata().await?;
50
Ok(metadata.num_rows)
51
}
52
53
/// Fetch the metadata of the parquet file, do not memoize it.
54
async fn fetch_metadata(&mut self) -> PolarsResult<FileMetadata> {
55
let length = self.length().await?;
56
fetch_metadata(&self.store, &self.path, length).await
57
}
58
59
/// Fetch and memoize the metadata of the parquet file.
60
pub async fn get_metadata(&mut self) -> PolarsResult<&FileMetadataRef> {
61
if self.metadata.is_none() {
62
self.metadata = Some(Arc::new(self.fetch_metadata().await?));
63
}
64
Ok(self.metadata.as_ref().unwrap())
65
}
66
67
pub async fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {
68
self.schema = Some(match self.schema.as_ref() {
69
Some(schema) => Arc::clone(schema),
70
None => {
71
let metadata = self.get_metadata().await?;
72
let arrow_schema = polars_parquet::arrow::read::infer_schema(metadata)?;
73
Arc::new(arrow_schema)
74
},
75
});
76
77
Ok(self.schema.clone().unwrap())
78
}
79
}
80
81
fn read_n<const N: usize>(reader: &mut &[u8]) -> Option<[u8; N]> {
82
if N <= reader.len() {
83
let (head, tail) = reader.split_at(N);
84
*reader = tail;
85
Some(head.try_into().unwrap())
86
} else {
87
None
88
}
89
}
90
91
fn read_i32le(reader: &mut &[u8]) -> Option<i32> {
92
read_n(reader).map(i32::from_le_bytes)
93
}
94
95
/// Asynchronously reads the files' metadata
96
pub async fn fetch_metadata(
97
store: &PolarsObjectStore,
98
path: &ObjectPath,
99
file_byte_length: usize,
100
) -> PolarsResult<FileMetadata> {
101
let footer_header_bytes = store
102
.get_range(
103
path,
104
file_byte_length
105
.checked_sub(polars_parquet::parquet::FOOTER_SIZE as usize)
106
.ok_or_else(|| {
107
polars_parquet::parquet::error::ParquetError::OutOfSpec(
108
"not enough bytes to contain parquet footer".to_string(),
109
)
110
})?..file_byte_length,
111
)
112
.await?;
113
114
let footer_byte_length: usize = {
115
let reader = &mut footer_header_bytes.as_ref();
116
let footer_byte_size = read_i32le(reader).unwrap();
117
let magic = read_n(reader).unwrap();
118
debug_assert!(reader.is_empty());
119
if magic != polars_parquet::parquet::PARQUET_MAGIC {
120
return Err(polars_parquet::parquet::error::ParquetError::OutOfSpec(
121
"incorrect magic in parquet footer".to_string(),
122
)
123
.into());
124
}
125
footer_byte_size.try_into().map_err(|_| {
126
polars_parquet::parquet::error::ParquetError::OutOfSpec(
127
"negative footer byte length".to_string(),
128
)
129
})?
130
};
131
132
let footer_bytes = store
133
.get_range(
134
path,
135
file_byte_length
136
.checked_sub(polars_parquet::parquet::FOOTER_SIZE as usize + footer_byte_length)
137
.ok_or_else(|| {
138
polars_parquet::parquet::error::ParquetError::OutOfSpec(
139
"not enough bytes to contain parquet footer".to_string(),
140
)
141
})?..file_byte_length,
142
)
143
.await?;
144
145
Ok(polars_parquet::parquet::read::deserialize_metadata(
146
std::io::Cursor::new(footer_bytes.as_ref()),
147
// TODO: Describe why this makes sense. Taken from the previous
148
// implementation which said "a highly nested but sparse struct could
149
// result in many allocations".
150
footer_bytes.as_ref().len() * 2 + 1024,
151
)?)
152
}
153
154