Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/io/ipc/read/read_basic.rs
6940 views
1
use std::collections::VecDeque;
2
use std::io::{Read, Seek, SeekFrom};
3
4
use polars_error::{PolarsResult, polars_bail, polars_err};
5
6
use super::super::compression;
7
use super::super::endianness::is_native_little_endian;
8
use super::{Compression, IpcBuffer, Node, OutOfSpecKind};
9
use crate::bitmap::Bitmap;
10
use crate::buffer::Buffer;
11
use crate::types::NativeType;
12
13
fn read_swapped<T: NativeType, R: Read + Seek>(
14
reader: &mut R,
15
length: usize,
16
buffer: &mut Vec<T>,
17
is_little_endian: bool,
18
) -> PolarsResult<()> {
19
// Slow case where we must reverse bits.
20
#[expect(clippy::slow_vector_initialization)] // Avoid alloc_zeroed, leads to syscall.
21
let mut slice = Vec::new();
22
slice.resize(length * size_of::<T>(), 0);
23
reader.read_exact(&mut slice)?;
24
25
let chunks = slice.chunks_exact(size_of::<T>());
26
if !is_little_endian {
27
// machine is little endian, file is big endian
28
buffer
29
.as_mut_slice()
30
.iter_mut()
31
.zip(chunks)
32
.try_for_each(|(slot, chunk)| {
33
let a: T::Bytes = match chunk.try_into() {
34
Ok(a) => a,
35
Err(_) => unreachable!(),
36
};
37
*slot = T::from_be_bytes(a);
38
PolarsResult::Ok(())
39
})?;
40
} else {
41
// machine is big endian, file is little endian
42
polars_bail!(ComputeError:
43
"Reading little endian files from big endian machines",
44
)
45
}
46
Ok(())
47
}
48
49
fn read_uncompressed_bytes<R: Read + Seek>(
50
reader: &mut R,
51
buffer_length: usize,
52
is_little_endian: bool,
53
) -> PolarsResult<Vec<u8>> {
54
if is_native_little_endian() == is_little_endian {
55
let mut buffer = Vec::with_capacity(buffer_length);
56
let _ = reader
57
.take(buffer_length as u64)
58
.read_to_end(&mut buffer)
59
.unwrap();
60
Ok(buffer)
61
} else {
62
unreachable!()
63
}
64
}
65
66
fn read_uncompressed_buffer<T: NativeType, R: Read + Seek>(
67
reader: &mut R,
68
buffer_length: usize,
69
length: usize,
70
is_little_endian: bool,
71
) -> PolarsResult<Vec<T>> {
72
let required_number_of_bytes = length.saturating_mul(size_of::<T>());
73
if required_number_of_bytes > buffer_length {
74
polars_bail!(
75
oos = OutOfSpecKind::InvalidBuffer {
76
length,
77
type_name: std::any::type_name::<T>(),
78
required_number_of_bytes,
79
buffer_length,
80
}
81
);
82
}
83
84
// it is undefined behavior to call read_exact on un-initialized, https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read
85
// see also https://github.com/MaikKlein/ash/issues/354#issue-781730580
86
let mut buffer = vec![T::default(); length];
87
88
if is_native_little_endian() == is_little_endian {
89
// fast case where we can just copy the contents
90
let slice = bytemuck::cast_slice_mut(&mut buffer);
91
reader.read_exact(slice)?;
92
} else {
93
read_swapped(reader, length, &mut buffer, is_little_endian)?;
94
}
95
Ok(buffer)
96
}
97
98
fn read_compressed_buffer<T: NativeType, R: Read + Seek>(
99
reader: &mut R,
100
buffer_length: usize,
101
output_length: Option<usize>,
102
is_little_endian: bool,
103
compression: Compression,
104
scratch: &mut Vec<u8>,
105
) -> PolarsResult<Vec<T>> {
106
if output_length == Some(0) {
107
return Ok(vec![]);
108
}
109
110
if is_little_endian != is_native_little_endian() {
111
polars_bail!(ComputeError:
112
"Reading compressed and big endian IPC".to_string(),
113
)
114
}
115
116
// decompress first
117
scratch.clear();
118
scratch.try_reserve(buffer_length)?;
119
reader
120
.by_ref()
121
.take(buffer_length as u64)
122
.read_to_end(scratch)?;
123
124
let length = output_length
125
.unwrap_or_else(|| i64::from_le_bytes(scratch[..8].try_into().unwrap()) as usize);
126
127
// It is undefined behavior to call read_exact on un-initialized, https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read
128
// see also https://github.com/MaikKlein/ash/issues/354#issue-781730580
129
let mut buffer = vec![T::default(); length];
130
131
let out_slice = bytemuck::cast_slice_mut(&mut buffer);
132
133
let compression = compression
134
.codec()
135
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferCompression(err)))?;
136
137
match compression {
138
arrow_format::ipc::CompressionType::Lz4Frame => {
139
compression::decompress_lz4(&scratch[8..], out_slice)?;
140
},
141
arrow_format::ipc::CompressionType::Zstd => {
142
compression::decompress_zstd(&scratch[8..], out_slice)?;
143
},
144
}
145
Ok(buffer)
146
}
147
148
fn read_compressed_bytes<R: Read + Seek>(
149
reader: &mut R,
150
buffer_length: usize,
151
is_little_endian: bool,
152
compression: Compression,
153
scratch: &mut Vec<u8>,
154
) -> PolarsResult<Vec<u8>> {
155
read_compressed_buffer::<u8, _>(
156
reader,
157
buffer_length,
158
None,
159
is_little_endian,
160
compression,
161
scratch,
162
)
163
}
164
165
pub fn read_bytes<R: Read + Seek>(
166
buf: &mut VecDeque<IpcBuffer>,
167
reader: &mut R,
168
block_offset: u64,
169
is_little_endian: bool,
170
compression: Option<Compression>,
171
scratch: &mut Vec<u8>,
172
) -> PolarsResult<Buffer<u8>> {
173
let buf = buf
174
.pop_front()
175
.ok_or_else(|| polars_err!(oos = OutOfSpecKind::ExpectedBuffer))?;
176
177
let offset: u64 = buf
178
.offset()
179
.try_into()
180
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
181
182
let buffer_length: usize = buf
183
.length()
184
.try_into()
185
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
186
187
reader.seek(SeekFrom::Start(block_offset + offset))?;
188
189
if let Some(compression) = compression {
190
Ok(read_compressed_bytes(
191
reader,
192
buffer_length,
193
is_little_endian,
194
compression,
195
scratch,
196
)?
197
.into())
198
} else {
199
Ok(read_uncompressed_bytes(reader, buffer_length, is_little_endian)?.into())
200
}
201
}
202
203
pub fn read_buffer<T: NativeType, R: Read + Seek>(
204
buf: &mut VecDeque<IpcBuffer>,
205
length: usize, // in slots
206
reader: &mut R,
207
block_offset: u64,
208
is_little_endian: bool,
209
compression: Option<Compression>,
210
scratch: &mut Vec<u8>,
211
) -> PolarsResult<Buffer<T>> {
212
let buf = buf
213
.pop_front()
214
.ok_or_else(|| polars_err!(oos = OutOfSpecKind::ExpectedBuffer))?;
215
216
let offset: u64 = buf
217
.offset()
218
.try_into()
219
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
220
221
let buffer_length: usize = buf
222
.length()
223
.try_into()
224
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
225
226
reader.seek(SeekFrom::Start(block_offset + offset))?;
227
228
if let Some(compression) = compression {
229
Ok(read_compressed_buffer(
230
reader,
231
buffer_length,
232
Some(length),
233
is_little_endian,
234
compression,
235
scratch,
236
)?
237
.into())
238
} else {
239
Ok(read_uncompressed_buffer(reader, buffer_length, length, is_little_endian)?.into())
240
}
241
}
242
243
fn read_uncompressed_bitmap<R: Read + Seek>(
244
length: usize,
245
bytes: usize,
246
reader: &mut R,
247
) -> PolarsResult<Vec<u8>> {
248
if length > bytes * 8 {
249
polars_bail!(
250
oos = OutOfSpecKind::InvalidBitmap {
251
length,
252
number_of_bits: bytes * 8,
253
}
254
)
255
}
256
257
let mut buffer = vec![];
258
buffer.try_reserve(bytes)?;
259
reader
260
.by_ref()
261
.take(bytes as u64)
262
.read_to_end(&mut buffer)?;
263
264
Ok(buffer)
265
}
266
267
fn read_compressed_bitmap<R: Read + Seek>(
268
length: usize,
269
bytes: usize,
270
compression: Compression,
271
reader: &mut R,
272
scratch: &mut Vec<u8>,
273
) -> PolarsResult<Vec<u8>> {
274
#[expect(clippy::slow_vector_initialization)] // Avoid alloc_zeroed, leads to syscall.
275
let mut buffer = Vec::new();
276
buffer.resize(length.div_ceil(8), 0);
277
278
scratch.clear();
279
scratch.try_reserve(bytes)?;
280
reader.by_ref().take(bytes as u64).read_to_end(scratch)?;
281
282
let compression = compression
283
.codec()
284
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferCompression(err)))?;
285
286
match compression {
287
arrow_format::ipc::CompressionType::Lz4Frame => {
288
compression::decompress_lz4(&scratch[8..], &mut buffer)?;
289
},
290
arrow_format::ipc::CompressionType::Zstd => {
291
compression::decompress_zstd(&scratch[8..], &mut buffer)?;
292
},
293
}
294
Ok(buffer)
295
}
296
297
pub fn read_bitmap<R: Read + Seek>(
298
buf: &mut VecDeque<IpcBuffer>,
299
length: usize,
300
reader: &mut R,
301
block_offset: u64,
302
_: bool,
303
compression: Option<Compression>,
304
scratch: &mut Vec<u8>,
305
) -> PolarsResult<Bitmap> {
306
let buf = buf
307
.pop_front()
308
.ok_or_else(|| polars_err!(oos = OutOfSpecKind::ExpectedBuffer))?;
309
310
let offset: u64 = buf
311
.offset()
312
.try_into()
313
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
314
315
let bytes: usize = buf
316
.length()
317
.try_into()
318
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
319
320
reader.seek(SeekFrom::Start(block_offset + offset))?;
321
322
let buffer = if let Some(compression) = compression {
323
read_compressed_bitmap(length, bytes, compression, reader, scratch)
324
} else {
325
read_uncompressed_bitmap(length, bytes, reader)
326
}?;
327
328
Bitmap::try_new(buffer, length)
329
}
330
331
#[allow(clippy::too_many_arguments)]
332
pub fn read_validity<R: Read + Seek>(
333
buffers: &mut VecDeque<IpcBuffer>,
334
field_node: Node,
335
reader: &mut R,
336
block_offset: u64,
337
is_little_endian: bool,
338
compression: Option<Compression>,
339
limit: Option<usize>,
340
scratch: &mut Vec<u8>,
341
) -> PolarsResult<Option<Bitmap>> {
342
let length: usize = field_node
343
.length()
344
.try_into()
345
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
346
let length = limit.map(|limit| limit.min(length)).unwrap_or(length);
347
348
Ok(if field_node.null_count() > 0 {
349
Some(read_bitmap(
350
buffers,
351
length,
352
reader,
353
block_offset,
354
is_little_endian,
355
compression,
356
scratch,
357
)?)
358
} else {
359
let _ = buffers
360
.pop_front()
361
.ok_or_else(|| polars_err!(oos = OutOfSpecKind::ExpectedBuffer))?;
362
None
363
})
364
}
365
366