Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/ipc/ipc_reader_async.rs
8424 views
1
use std::sync::Arc;
2
3
use arrow::io::ipc::read::{FileMetadata, OutOfSpecKind, get_row_count};
4
use object_store::ObjectMeta;
5
use object_store::path::Path;
6
use polars_core::datatypes::IDX_DTYPE;
7
use polars_core::frame::DataFrame;
8
use polars_core::schema::{Schema, SchemaExt};
9
use polars_error::{PolarsResult, polars_bail, polars_err, to_compute_err};
10
use polars_utils::mmap::MMapSemaphore;
11
use polars_utils::pl_path::PlRefPath;
12
use polars_utils::pl_str::PlSmallStr;
13
14
use crate::RowIndex;
15
use crate::cloud::{
16
CloudLocation, CloudOptions, PolarsObjectStore, build_object_store, object_path_from_str,
17
};
18
use crate::file_cache::{FileCacheEntry, init_entries_from_uri_list};
19
use crate::predicates::PhysicalIoExpr;
20
use crate::prelude::{IpcReader, materialize_projection};
21
use crate::shared::SerReader;
22
23
/// An Arrow IPC reader implemented on top of PolarsObjectStore.
24
pub struct IpcReaderAsync {
25
store: PolarsObjectStore,
26
cache_entry: Arc<FileCacheEntry>,
27
path: Path,
28
}
29
30
#[derive(Default, Clone)]
31
pub struct IpcReadOptions {
32
// Names of the columns to include in the output.
33
projection: Option<Arc<[PlSmallStr]>>,
34
35
// The maximum number of rows to include in the output.
36
row_limit: Option<usize>,
37
38
// Include a column with the row number under the provided name starting at the provided index.
39
row_index: Option<RowIndex>,
40
41
// Only include rows that pass this predicate.
42
predicate: Option<Arc<dyn PhysicalIoExpr>>,
43
}
44
45
impl IpcReadOptions {
46
pub fn with_projection(mut self, projection: Option<Arc<[PlSmallStr]>>) -> Self {
47
self.projection = projection;
48
self
49
}
50
51
pub fn with_row_limit(mut self, row_limit: impl Into<Option<usize>>) -> Self {
52
self.row_limit = row_limit.into();
53
self
54
}
55
56
pub fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
57
self.row_index = row_index.into();
58
self
59
}
60
61
pub fn with_predicate(mut self, predicate: impl Into<Option<Arc<dyn PhysicalIoExpr>>>) -> Self {
62
self.predicate = predicate.into();
63
self
64
}
65
}
66
67
impl IpcReaderAsync {
68
pub async fn from_uri(
69
uri: PlRefPath,
70
cloud_options: Option<&CloudOptions>,
71
) -> PolarsResult<IpcReaderAsync> {
72
let cache_entry =
73
init_entries_from_uri_list([uri.clone()].into_iter(), cloud_options).await?[0].clone();
74
let (CloudLocation { prefix, .. }, store) =
75
build_object_store(uri, cloud_options, false).await?;
76
77
let path = object_path_from_str(&prefix)?;
78
79
Ok(Self {
80
store,
81
cache_entry,
82
path,
83
})
84
}
85
86
async fn object_metadata(&self) -> PolarsResult<ObjectMeta> {
87
self.store.head(&self.path).await
88
}
89
90
async fn file_size(&self) -> PolarsResult<usize> {
91
Ok(self.object_metadata().await?.size as usize)
92
}
93
94
pub async fn metadata(&self) -> PolarsResult<FileMetadata> {
95
let file_size = self.file_size().await?;
96
97
// TODO: Do a larger request and hope that the entire footer is contained within it to save one round-trip.
98
let footer_metadata =
99
self.store
100
.get_range(
101
&self.path,
102
file_size.checked_sub(FOOTER_METADATA_SIZE).ok_or_else(|| {
103
to_compute_err("ipc file size is smaller than the minimum")
104
})?..file_size,
105
)
106
.await?;
107
108
let footer_size = deserialize_footer_metadata(
109
footer_metadata
110
.as_ref()
111
.try_into()
112
.map_err(to_compute_err)?,
113
)?;
114
115
let footer = self
116
.store
117
.get_range(
118
&self.path,
119
file_size
120
.checked_sub(FOOTER_METADATA_SIZE + footer_size)
121
.ok_or_else(|| {
122
to_compute_err("invalid ipc footer metadata: footer size too large")
123
})?..file_size,
124
)
125
.await?;
126
127
arrow::io::ipc::read::deserialize_footer(
128
footer.as_ref(),
129
footer_size.try_into().map_err(to_compute_err)?,
130
)
131
}
132
133
pub async fn data(
134
&self,
135
metadata: Option<&FileMetadata>,
136
options: IpcReadOptions,
137
verbose: bool,
138
) -> PolarsResult<DataFrame> {
139
// TODO: Only download what is needed rather than the entire file by
140
// making use of the projection, row limit, predicate and such.
141
let file = tokio::task::block_in_place(|| self.cache_entry.try_open_check_latest())?;
142
let bytes = MMapSemaphore::new_from_file(&file).unwrap();
143
144
let projection = match options.projection.as_deref() {
145
Some(projection) => {
146
fn prepare_schema(mut schema: Schema, row_index: Option<&RowIndex>) -> Schema {
147
if let Some(rc) = row_index {
148
let _ = schema.insert_at_index(0, rc.name.clone(), IDX_DTYPE);
149
}
150
schema
151
}
152
153
// Retrieve the metadata for the schema so we can map column names to indices.
154
let fetched_metadata;
155
let metadata = if let Some(metadata) = metadata {
156
metadata
157
} else {
158
// This branch is happens when _metadata is None, which can happen if we Deserialize the execution plan.
159
fetched_metadata = self.metadata().await?;
160
&fetched_metadata
161
};
162
163
let schema = prepare_schema(
164
Schema::from_arrow_schema(metadata.schema.as_ref()),
165
options.row_index.as_ref(),
166
);
167
168
let hive_partitions = None;
169
170
materialize_projection(
171
Some(projection),
172
&schema,
173
hive_partitions,
174
options.row_index.is_some(),
175
)
176
},
177
None => None,
178
};
179
180
let reader = <IpcReader<_> as SerReader<_>>::new(std::io::Cursor::new(bytes.as_ref()))
181
.with_row_index(options.row_index)
182
.with_n_rows(options.row_limit)
183
.with_projection(projection);
184
reader.finish_with_scan_ops(options.predicate, verbose)
185
}
186
187
pub async fn count_rows(&self, _metadata: Option<&FileMetadata>) -> PolarsResult<i64> {
188
// TODO: Only download what is needed rather than the entire file by
189
// making use of the projection, row limit, predicate and such.
190
let file = tokio::task::block_in_place(|| self.cache_entry.try_open_check_latest())?;
191
let bytes = MMapSemaphore::new_from_file(&file).unwrap();
192
get_row_count(&mut std::io::Cursor::new(bytes.as_ref()))
193
}
194
}
195
196
const FOOTER_METADATA_SIZE: usize = 10;
197
198
// TODO: Move to polars-arrow and deduplicate parsing of footer metadata in
199
// sync and async readers.
200
fn deserialize_footer_metadata(bytes: [u8; FOOTER_METADATA_SIZE]) -> PolarsResult<usize> {
201
let footer_size: usize =
202
i32::from_le_bytes(bytes[0..4].try_into().unwrap_or_else(|_| unreachable!()))
203
.try_into()
204
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
205
206
if &bytes[4..] != b"ARROW1" {
207
polars_bail!(oos = OutOfSpecKind::InvalidFooter);
208
}
209
210
Ok(footer_size)
211
}
212
213