Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/parquet/read/async_impl.rs
8448 views
1
//! Read parquet files in parallel from the Object Store without a third party crate.
2
3
use arrow::datatypes::ArrowSchemaRef;
4
use object_store::path::Path as ObjectPath;
5
use polars_core::prelude::*;
6
use polars_parquet::write::FileMetadata;
7
use polars_utils::pl_path::PlRefPath;
8
9
use crate::cloud::{
10
CloudLocation, CloudOptions, PolarsObjectStore, build_object_store, object_path_from_str,
11
};
12
use crate::parquet::metadata::FileMetadataRef;
13
14
pub struct ParquetObjectStore {
15
store: PolarsObjectStore,
16
path: ObjectPath,
17
length: Option<usize>,
18
metadata: Option<FileMetadataRef>,
19
schema: Option<ArrowSchemaRef>,
20
}
21
22
impl ParquetObjectStore {
23
pub async fn from_uri(
24
uri: PlRefPath,
25
options: Option<&CloudOptions>,
26
metadata: Option<FileMetadataRef>,
27
) -> PolarsResult<Self> {
28
let (CloudLocation { prefix, .. }, store) = build_object_store(uri, options, false).await?;
29
let path = object_path_from_str(&prefix)?;
30
31
Ok(ParquetObjectStore {
32
store,
33
path,
34
length: None,
35
metadata,
36
schema: None,
37
})
38
}
39
40
/// Initialize the length property of the object, unless it has already been fetched.
41
async fn length(&mut self) -> PolarsResult<usize> {
42
if self.length.is_none() {
43
self.length = Some(self.store.head(&self.path).await?.size as usize);
44
}
45
Ok(self.length.unwrap())
46
}
47
48
/// Number of rows in the parquet file.
49
pub async fn num_rows(&mut self) -> PolarsResult<usize> {
50
let metadata = self.get_metadata().await?;
51
Ok(metadata.num_rows)
52
}
53
54
/// Fetch the metadata of the parquet file, do not memoize it.
55
async fn fetch_metadata(&mut self) -> PolarsResult<FileMetadata> {
56
let length = self.length().await?;
57
fetch_metadata(&self.store, &self.path, length).await
58
}
59
60
/// Fetch and memoize the metadata of the parquet file.
61
pub async fn get_metadata(&mut self) -> PolarsResult<&FileMetadataRef> {
62
if self.metadata.is_none() {
63
self.metadata = Some(Arc::new(self.fetch_metadata().await?));
64
}
65
Ok(self.metadata.as_ref().unwrap())
66
}
67
68
pub async fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {
69
self.schema = Some(match self.schema.as_ref() {
70
Some(schema) => Arc::clone(schema),
71
None => {
72
let metadata = self.get_metadata().await?;
73
let arrow_schema = polars_parquet::arrow::read::infer_schema(metadata)?;
74
Arc::new(arrow_schema)
75
},
76
});
77
78
Ok(self.schema.clone().unwrap())
79
}
80
}
81
82
fn read_n<const N: usize>(reader: &mut &[u8]) -> Option<[u8; N]> {
83
if N <= reader.len() {
84
let (head, tail) = reader.split_at(N);
85
*reader = tail;
86
Some(head.try_into().unwrap())
87
} else {
88
None
89
}
90
}
91
92
fn read_i32le(reader: &mut &[u8]) -> Option<i32> {
93
read_n(reader).map(i32::from_le_bytes)
94
}
95
96
/// Asynchronously reads the files' metadata
97
pub async fn fetch_metadata(
98
store: &PolarsObjectStore,
99
path: &ObjectPath,
100
file_byte_length: usize,
101
) -> PolarsResult<FileMetadata> {
102
let footer_header_bytes = store
103
.get_range(
104
path,
105
file_byte_length
106
.checked_sub(polars_parquet::parquet::FOOTER_SIZE as usize)
107
.ok_or_else(|| {
108
polars_parquet::parquet::error::ParquetError::OutOfSpec(
109
"not enough bytes to contain parquet footer".to_string(),
110
)
111
})?..file_byte_length,
112
)
113
.await?;
114
115
let footer_byte_length: usize = {
116
let reader = &mut footer_header_bytes.as_ref();
117
let footer_byte_size = read_i32le(reader).unwrap();
118
let magic = read_n(reader).unwrap();
119
debug_assert!(reader.is_empty());
120
if magic != polars_parquet::parquet::PARQUET_MAGIC {
121
return Err(polars_parquet::parquet::error::ParquetError::OutOfSpec(
122
"incorrect magic in parquet footer".to_string(),
123
)
124
.into());
125
}
126
footer_byte_size.try_into().map_err(|_| {
127
polars_parquet::parquet::error::ParquetError::OutOfSpec(
128
"negative footer byte length".to_string(),
129
)
130
})?
131
};
132
133
let footer_bytes = store
134
.get_range(
135
path,
136
file_byte_length
137
.checked_sub(polars_parquet::parquet::FOOTER_SIZE as usize + footer_byte_length)
138
.ok_or_else(|| {
139
polars_parquet::parquet::error::ParquetError::OutOfSpec(
140
"not enough bytes to contain parquet footer".to_string(),
141
)
142
})?..file_byte_length,
143
)
144
.await?;
145
146
Ok(polars_parquet::parquet::read::deserialize_metadata(
147
std::io::Cursor::new(footer_bytes.as_ref()),
148
// TODO: Describe why this makes sense. Taken from the previous
149
// implementation which said "a highly nested but sparse struct could
150
// result in many allocations".
151
footer_bytes.as_ref().len() * 2 + 1024,
152
)?)
153
}
154
155