Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/ipc/ipc_reader_async.rs
6939 views
1
use std::sync::Arc;
2
3
use arrow::io::ipc::read::{FileMetadata, OutOfSpecKind, get_row_count};
4
use object_store::ObjectMeta;
5
use object_store::path::Path;
6
use polars_core::datatypes::IDX_DTYPE;
7
use polars_core::frame::DataFrame;
8
use polars_core::schema::{Schema, SchemaExt};
9
use polars_error::{PolarsResult, polars_bail, polars_err, to_compute_err};
10
use polars_utils::mmap::MMapSemaphore;
11
use polars_utils::pl_str::PlSmallStr;
12
13
use crate::RowIndex;
14
use crate::cloud::{
15
CloudLocation, CloudOptions, PolarsObjectStore, build_object_store, object_path_from_str,
16
};
17
use crate::file_cache::{FileCacheEntry, init_entries_from_uri_list};
18
use crate::predicates::PhysicalIoExpr;
19
use crate::prelude::{IpcReader, materialize_projection};
20
use crate::shared::SerReader;
21
22
/// An Arrow IPC reader implemented on top of PolarsObjectStore.
23
pub struct IpcReaderAsync {
24
store: PolarsObjectStore,
25
cache_entry: Arc<FileCacheEntry>,
26
path: Path,
27
}
28
29
#[derive(Default, Clone)]
30
pub struct IpcReadOptions {
31
// Names of the columns to include in the output.
32
projection: Option<Arc<[PlSmallStr]>>,
33
34
// The maximum number of rows to include in the output.
35
row_limit: Option<usize>,
36
37
// Include a column with the row number under the provided name starting at the provided index.
38
row_index: Option<RowIndex>,
39
40
// Only include rows that pass this predicate.
41
predicate: Option<Arc<dyn PhysicalIoExpr>>,
42
}
43
44
impl IpcReadOptions {
45
pub fn with_projection(mut self, projection: Option<Arc<[PlSmallStr]>>) -> Self {
46
self.projection = projection;
47
self
48
}
49
50
pub fn with_row_limit(mut self, row_limit: impl Into<Option<usize>>) -> Self {
51
self.row_limit = row_limit.into();
52
self
53
}
54
55
pub fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
56
self.row_index = row_index.into();
57
self
58
}
59
60
pub fn with_predicate(mut self, predicate: impl Into<Option<Arc<dyn PhysicalIoExpr>>>) -> Self {
61
self.predicate = predicate.into();
62
self
63
}
64
}
65
66
impl IpcReaderAsync {
67
pub async fn from_uri(
68
uri: &str,
69
cloud_options: Option<&CloudOptions>,
70
) -> PolarsResult<IpcReaderAsync> {
71
let cache_entry =
72
init_entries_from_uri_list([Arc::from(uri)].into_iter(), cloud_options)?[0].clone();
73
let (CloudLocation { prefix, .. }, store) =
74
build_object_store(uri, cloud_options, false).await?;
75
76
let path = object_path_from_str(&prefix)?;
77
78
Ok(Self {
79
store,
80
cache_entry,
81
path,
82
})
83
}
84
85
async fn object_metadata(&self) -> PolarsResult<ObjectMeta> {
86
self.store.head(&self.path).await
87
}
88
89
async fn file_size(&self) -> PolarsResult<usize> {
90
Ok(self.object_metadata().await?.size as usize)
91
}
92
93
pub async fn metadata(&self) -> PolarsResult<FileMetadata> {
94
let file_size = self.file_size().await?;
95
96
// TODO: Do a larger request and hope that the entire footer is contained within it to save one round-trip.
97
let footer_metadata =
98
self.store
99
.get_range(
100
&self.path,
101
file_size.checked_sub(FOOTER_METADATA_SIZE).ok_or_else(|| {
102
to_compute_err("ipc file size is smaller than the minimum")
103
})?..file_size,
104
)
105
.await?;
106
107
let footer_size = deserialize_footer_metadata(
108
footer_metadata
109
.as_ref()
110
.try_into()
111
.map_err(to_compute_err)?,
112
)?;
113
114
let footer = self
115
.store
116
.get_range(
117
&self.path,
118
file_size
119
.checked_sub(FOOTER_METADATA_SIZE + footer_size)
120
.ok_or_else(|| {
121
to_compute_err("invalid ipc footer metadata: footer size too large")
122
})?..file_size,
123
)
124
.await?;
125
126
arrow::io::ipc::read::deserialize_footer(
127
footer.as_ref(),
128
footer_size.try_into().map_err(to_compute_err)?,
129
)
130
}
131
132
pub async fn data(
133
&self,
134
metadata: Option<&FileMetadata>,
135
options: IpcReadOptions,
136
verbose: bool,
137
) -> PolarsResult<DataFrame> {
138
// TODO: Only download what is needed rather than the entire file by
139
// making use of the projection, row limit, predicate and such.
140
let file = tokio::task::block_in_place(|| self.cache_entry.try_open_check_latest())?;
141
let bytes = MMapSemaphore::new_from_file(&file).unwrap();
142
143
let projection = match options.projection.as_deref() {
144
Some(projection) => {
145
fn prepare_schema(mut schema: Schema, row_index: Option<&RowIndex>) -> Schema {
146
if let Some(rc) = row_index {
147
let _ = schema.insert_at_index(0, rc.name.clone(), IDX_DTYPE);
148
}
149
schema
150
}
151
152
// Retrieve the metadata for the schema so we can map column names to indices.
153
let fetched_metadata;
154
let metadata = if let Some(metadata) = metadata {
155
metadata
156
} else {
157
// This branch is happens when _metadata is None, which can happen if we Deserialize the execution plan.
158
fetched_metadata = self.metadata().await?;
159
&fetched_metadata
160
};
161
162
let schema = prepare_schema(
163
Schema::from_arrow_schema(metadata.schema.as_ref()),
164
options.row_index.as_ref(),
165
);
166
167
let hive_partitions = None;
168
169
materialize_projection(
170
Some(projection),
171
&schema,
172
hive_partitions,
173
options.row_index.is_some(),
174
)
175
},
176
None => None,
177
};
178
179
let reader = <IpcReader<_> as SerReader<_>>::new(std::io::Cursor::new(bytes.as_ref()))
180
.with_row_index(options.row_index)
181
.with_n_rows(options.row_limit)
182
.with_projection(projection);
183
reader.finish_with_scan_ops(options.predicate, verbose)
184
}
185
186
pub async fn count_rows(&self, _metadata: Option<&FileMetadata>) -> PolarsResult<i64> {
187
// TODO: Only download what is needed rather than the entire file by
188
// making use of the projection, row limit, predicate and such.
189
let file = tokio::task::block_in_place(|| self.cache_entry.try_open_check_latest())?;
190
let bytes = MMapSemaphore::new_from_file(&file).unwrap();
191
get_row_count(&mut std::io::Cursor::new(bytes.as_ref()))
192
}
193
}
194
195
const FOOTER_METADATA_SIZE: usize = 10;
196
197
// TODO: Move to polars-arrow and deduplicate parsing of footer metadata in
198
// sync and async readers.
199
fn deserialize_footer_metadata(bytes: [u8; FOOTER_METADATA_SIZE]) -> PolarsResult<usize> {
200
let footer_size: usize =
201
i32::from_le_bytes(bytes[0..4].try_into().unwrap_or_else(|_| unreachable!()))
202
.try_into()
203
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
204
205
if &bytes[4..] != b"ARROW1" {
206
polars_bail!(oos = OutOfSpecKind::InvalidFooter);
207
}
208
209
Ok(footer_size)
210
}
211
212