Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/mmap/mod.rs
6939 views
1
#![allow(unsafe_op_in_unsafe_fn)]
2
//! Memory maps regions defined on the IPC format into [`Array`].
3
use std::collections::VecDeque;
4
use std::sync::Arc;
5
6
mod array;
7
8
use arrow_format::ipc::planus::ReadAsRoot;
9
use arrow_format::ipc::{Block, DictionaryBatchRef, MessageRef, RecordBatchRef};
10
use polars_error::{PolarsResult, polars_bail, polars_err, to_compute_err};
11
use polars_utils::pl_str::PlSmallStr;
12
13
use crate::array::Array;
14
use crate::datatypes::{ArrowDataType, ArrowSchema, Field};
15
use crate::io::ipc::read::file::{get_dictionary_batch, get_record_batch};
16
use crate::io::ipc::read::{
17
Dictionaries, FileMetadata, IpcBuffer, Node, OutOfSpecKind, first_dict_field,
18
};
19
use crate::io::ipc::{CONTINUATION_MARKER, IpcField};
20
use crate::record_batch::RecordBatchT;
21
22
fn read_message(
23
mut bytes: &[u8],
24
block: arrow_format::ipc::Block,
25
) -> PolarsResult<(MessageRef<'_>, usize)> {
26
let offset: usize = block.offset.try_into().map_err(
27
|_err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::NegativeFooterLength),
28
)?;
29
30
let block_length: usize = block.meta_data_length.try_into().map_err(
31
|_err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::NegativeFooterLength),
32
)?;
33
34
bytes = &bytes[offset..];
35
let mut message_length = bytes[..4].try_into().unwrap();
36
bytes = &bytes[4..];
37
38
if message_length == CONTINUATION_MARKER {
39
// continuation marker encountered, read message next
40
message_length = bytes[..4].try_into().unwrap();
41
bytes = &bytes[4..];
42
};
43
44
let message_length: usize = i32::from_le_bytes(message_length).try_into().map_err(
45
|_err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::NegativeFooterLength),
46
)?;
47
48
let message = arrow_format::ipc::MessageRef::read_as_root(&bytes[..message_length])
49
.map_err(|err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidFlatbufferMessage(err)))?;
50
51
Ok((message, offset + block_length))
52
}
53
54
fn get_buffers_nodes(batch: RecordBatchRef) -> PolarsResult<(VecDeque<IpcBuffer>, VecDeque<Node>)> {
55
let compression = batch.compression().map_err(to_compute_err)?;
56
if compression.is_some() {
57
polars_bail!(ComputeError: "memory_map can only be done on uncompressed IPC files")
58
}
59
60
let buffers = batch
61
.buffers()
62
.map_err(|err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidFlatbufferBuffers(err)))?
63
.ok_or_else(|| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::MissingMessageBuffers))?;
64
let buffers = buffers.iter().collect::<VecDeque<_>>();
65
66
let field_nodes = batch
67
.nodes()
68
.map_err(|err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidFlatbufferNodes(err)))?
69
.ok_or_else(|| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::MissingMessageNodes))?;
70
let field_nodes = field_nodes.iter().collect::<VecDeque<_>>();
71
72
Ok((buffers, field_nodes))
73
}
74
75
pub(crate) unsafe fn mmap_record<T: AsRef<[u8]>>(
76
fields: &ArrowSchema,
77
ipc_fields: &[IpcField],
78
data: Arc<T>,
79
batch: RecordBatchRef,
80
offset: usize,
81
dictionaries: &Dictionaries,
82
) -> PolarsResult<RecordBatchT<Box<dyn Array>>> {
83
let (mut buffers, mut field_nodes) = get_buffers_nodes(batch)?;
84
let mut variadic_buffer_counts = batch
85
.variadic_buffer_counts()
86
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err)))?
87
.map(|v| v.iter().map(|v| v as usize).collect::<VecDeque<usize>>())
88
.unwrap_or_else(VecDeque::new);
89
90
let length = batch
91
.length()
92
.map_err(|_| polars_err!(oos = OutOfSpecKind::MissingData))
93
.unwrap()
94
.try_into()
95
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
96
97
fields
98
.iter_values()
99
.map(|f| &f.dtype)
100
.cloned()
101
.zip(ipc_fields)
102
.map(|(dtype, ipc_field)| {
103
array::mmap(
104
data.clone(),
105
offset,
106
dtype,
107
ipc_field,
108
dictionaries,
109
&mut field_nodes,
110
&mut variadic_buffer_counts,
111
&mut buffers,
112
)
113
})
114
.collect::<PolarsResult<_>>()
115
.and_then(|arr| {
116
RecordBatchT::try_new(
117
length,
118
Arc::new(fields.iter_values().cloned().collect()),
119
arr,
120
)
121
})
122
}
123
124
/// Memory maps an record batch from an IPC file into a [`RecordBatchT`].
125
/// # Errors
126
/// This function errors when:
127
/// * The IPC file is not valid
128
/// * the buffers on the file are un-aligned with their corresponding data. This can happen when:
129
/// * the file was written with 8-bit alignment
130
/// * the file contains type decimal 128 or 256
131
/// # Safety
132
/// The caller must ensure that `data` contains a valid buffers, for example:
133
/// * Offsets in variable-sized containers must be in-bounds and increasing
134
/// * Utf8 data is valid
135
pub unsafe fn mmap_unchecked<T: AsRef<[u8]>>(
136
metadata: &FileMetadata,
137
dictionaries: &Dictionaries,
138
data: Arc<T>,
139
chunk: usize,
140
) -> PolarsResult<RecordBatchT<Box<dyn Array>>> {
141
let block = metadata.blocks[chunk];
142
143
let (message, offset) = read_message(data.as_ref().as_ref(), block)?;
144
let batch = get_record_batch(message)?;
145
mmap_record(
146
&metadata.schema,
147
&metadata.ipc_schema.fields,
148
data.clone(),
149
batch,
150
offset,
151
dictionaries,
152
)
153
}
154
155
unsafe fn mmap_dictionary<T: AsRef<[u8]>>(
156
schema: &ArrowSchema,
157
ipc_fields: &[IpcField],
158
data: Arc<T>,
159
block: Block,
160
dictionaries: &mut Dictionaries,
161
) -> PolarsResult<()> {
162
let (message, offset) = read_message(data.as_ref().as_ref(), block)?;
163
let batch = get_dictionary_batch(&message)?;
164
mmap_dictionary_from_batch(schema, ipc_fields, &data, batch, dictionaries, offset)
165
}
166
167
pub(crate) unsafe fn mmap_dictionary_from_batch<T: AsRef<[u8]>>(
168
schema: &ArrowSchema,
169
ipc_fields: &[IpcField],
170
data: &Arc<T>,
171
batch: DictionaryBatchRef,
172
dictionaries: &mut Dictionaries,
173
offset: usize,
174
) -> PolarsResult<()> {
175
let id = batch
176
.id()
177
.map_err(|err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidFlatbufferId(err)))?;
178
let (first_field, first_ipc_field) = first_dict_field(id, schema, ipc_fields)?;
179
180
let batch = batch
181
.data()
182
.map_err(|err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidFlatbufferData(err)))?
183
.ok_or_else(|| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::MissingData))?;
184
185
let value_type = if let ArrowDataType::Dictionary(_, value_type, _) =
186
first_field.dtype.to_logical_type()
187
{
188
value_type.as_ref()
189
} else {
190
polars_bail!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidIdDataType {requested_id: id} )
191
};
192
193
// Make a fake schema for the dictionary batch.
194
let field = Field::new(PlSmallStr::EMPTY, value_type.clone(), false);
195
196
let chunk = mmap_record(
197
&std::iter::once((field.name.clone(), field)).collect(),
198
std::slice::from_ref(first_ipc_field),
199
data.clone(),
200
batch,
201
offset,
202
dictionaries,
203
)?;
204
205
dictionaries.insert(id, chunk.into_arrays().pop().unwrap());
206
207
Ok(())
208
}
209
210
/// Memory maps dictionaries from an IPC file into
211
/// # Safety
212
/// The caller must ensure that `data` contains a valid buffers, for example:
213
/// * Offsets in variable-sized containers must be in-bounds and increasing
214
/// * Utf8 data is valid
215
pub unsafe fn mmap_dictionaries_unchecked<T: AsRef<[u8]>>(
216
metadata: &FileMetadata,
217
data: Arc<T>,
218
) -> PolarsResult<Dictionaries> {
219
mmap_dictionaries_unchecked2(
220
metadata.schema.as_ref(),
221
&metadata.ipc_schema.fields,
222
metadata.dictionaries.as_ref(),
223
data,
224
)
225
}
226
227
pub(crate) unsafe fn mmap_dictionaries_unchecked2<T: AsRef<[u8]>>(
228
schema: &ArrowSchema,
229
ipc_fields: &[IpcField],
230
dictionaries: Option<&Vec<arrow_format::ipc::Block>>,
231
data: Arc<T>,
232
) -> PolarsResult<Dictionaries> {
233
let blocks = if let Some(blocks) = &dictionaries {
234
blocks
235
} else {
236
return Ok(Default::default());
237
};
238
239
let mut dictionaries = Default::default();
240
241
blocks.iter().cloned().try_for_each(|block| {
242
mmap_dictionary(schema, ipc_fields, data.clone(), block, &mut dictionaries)
243
})?;
244
Ok(dictionaries)
245
}
246
247