Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/io/ipc/read/read_basic.rs
8424 views
1
use std::collections::VecDeque;
2
use std::io::{Read, Seek, SeekFrom};
3
4
use polars_buffer::Buffer;
5
use polars_error::{PolarsResult, polars_bail, polars_ensure, polars_err};
6
7
use super::super::compression;
8
use super::super::endianness::is_native_little_endian;
9
use super::{Compression, IpcBuffer, Node, OutOfSpecKind};
10
use crate::bitmap::Bitmap;
11
use crate::types::NativeType;
12
13
fn read_swapped<T: NativeType, R: Read + Seek>(
14
reader: &mut R,
15
length: usize,
16
buffer: &mut Vec<T>,
17
is_little_endian: bool,
18
) -> PolarsResult<()> {
19
// Slow case where we must reverse bits.
20
#[expect(clippy::slow_vector_initialization)] // Avoid alloc_zeroed, leads to syscall.
21
let mut slice = Vec::new();
22
slice.resize(length * size_of::<T>(), 0);
23
reader.read_exact(&mut slice)?;
24
25
let chunks = slice.chunks_exact(size_of::<T>());
26
if !is_little_endian {
27
// machine is little endian, file is big endian
28
buffer
29
.as_mut_slice()
30
.iter_mut()
31
.zip(chunks)
32
.try_for_each(|(slot, chunk)| {
33
let a: T::Bytes = match chunk.try_into() {
34
Ok(a) => a,
35
Err(_) => unreachable!(),
36
};
37
*slot = T::from_be_bytes(a);
38
PolarsResult::Ok(())
39
})?;
40
} else {
41
// machine is big endian, file is little endian
42
polars_bail!(ComputeError:
43
"Reading little endian files from big endian machines",
44
)
45
}
46
Ok(())
47
}
48
49
fn read_uncompressed_bytes<R: Read + Seek>(
50
reader: &mut R,
51
buffer_length: usize,
52
is_little_endian: bool,
53
) -> PolarsResult<Vec<u8>> {
54
if is_native_little_endian() == is_little_endian {
55
let mut buffer = Vec::with_capacity(buffer_length);
56
let _ = reader
57
.take(buffer_length as u64)
58
.read_to_end(&mut buffer)
59
.unwrap();
60
61
polars_ensure!(buffer.len() == buffer_length, ComputeError: "Malformed IPC file: expected compressed buffer of len {buffer_length}, got {}", buffer.len());
62
63
Ok(buffer)
64
} else {
65
unreachable!()
66
}
67
}
68
69
fn read_uncompressed_buffer<T: NativeType, R: Read + Seek>(
70
reader: &mut R,
71
buffer_length: usize,
72
length: usize,
73
is_little_endian: bool,
74
) -> PolarsResult<Vec<T>> {
75
let required_number_of_bytes = length.saturating_mul(size_of::<T>());
76
if required_number_of_bytes > buffer_length {
77
polars_bail!(
78
oos = OutOfSpecKind::InvalidBuffer {
79
length,
80
type_name: std::any::type_name::<T>(),
81
required_number_of_bytes,
82
buffer_length,
83
}
84
);
85
}
86
87
// it is undefined behavior to call read_exact on un-initialized, https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read
88
// see also https://github.com/MaikKlein/ash/issues/354#issue-781730580
89
let mut buffer = vec![T::default(); length];
90
91
if is_native_little_endian() == is_little_endian {
92
// fast case where we can just copy the contents
93
let slice = bytemuck::cast_slice_mut(&mut buffer);
94
reader.read_exact(slice)?;
95
} else {
96
read_swapped(reader, length, &mut buffer, is_little_endian)?;
97
}
98
Ok(buffer)
99
}
100
101
fn read_compressed_buffer<T: NativeType, R: Read + Seek>(
102
reader: &mut R,
103
buffer_length: usize,
104
// Upper bound for the number of rows to be returned.
105
row_limit: Option<usize>,
106
is_little_endian: bool,
107
compression: Compression,
108
scratch: &mut Vec<u8>,
109
) -> PolarsResult<Vec<T>> {
110
if row_limit == Some(0) {
111
return Ok(vec![]);
112
}
113
114
if is_little_endian != is_native_little_endian() {
115
polars_bail!(ComputeError:
116
"Reading compressed and big endian IPC",
117
)
118
}
119
120
// Decompress first.
121
scratch.clear();
122
scratch.try_reserve(buffer_length)?;
123
reader
124
.by_ref()
125
.take(buffer_length as u64)
126
.read_to_end(scratch)?;
127
128
polars_ensure!(scratch.len() == buffer_length, ComputeError: "Malformed IPC file: expected compressed buffer of len {buffer_length}, got {}", scratch.len());
129
130
let decompressed_len_field = i64::from_le_bytes(scratch[..8].try_into().unwrap());
131
let decompressed_bytes: usize = if decompressed_len_field == -1 {
132
buffer_length - 8
133
} else {
134
decompressed_len_field.try_into().map_err(|_| {
135
polars_err!(ComputeError: "Malformed IPC file: got invalid decompressed length {decompressed_len_field}")
136
})?
137
};
138
139
polars_ensure!(decompressed_bytes.is_multiple_of(size_of::<T>()),
140
ComputeError: "Malformed IPC file: got decompressed buffer length which is not a multiple of the data type");
141
let n_rows_in_array = decompressed_bytes / size_of::<T>();
142
143
if decompressed_len_field == -1 {
144
return Ok(bytemuck::cast_slice(&scratch[8..]).to_vec());
145
}
146
147
// It is undefined behavior to call read_exact on un-initialized, https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read
148
// see also https://github.com/MaikKlein/ash/issues/354#issue-781730580
149
150
let n_rows_exact = row_limit
151
.map(|limit| std::cmp::min(limit, n_rows_in_array))
152
.unwrap_or(n_rows_in_array);
153
154
let mut buffer = vec![T::default(); n_rows_exact];
155
let out_slice = bytemuck::cast_slice_mut(&mut buffer);
156
157
let compression = compression
158
.codec()
159
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferCompression(err)))?;
160
161
match compression {
162
arrow_format::ipc::CompressionType::Lz4Frame => {
163
compression::decompress_lz4(&scratch[8..], out_slice)?;
164
},
165
arrow_format::ipc::CompressionType::Zstd => {
166
compression::decompress_zstd(&scratch[8..], out_slice)?;
167
},
168
}
169
Ok(buffer)
170
}
171
172
fn read_compressed_bytes<R: Read + Seek>(
173
reader: &mut R,
174
buffer_length: usize,
175
is_little_endian: bool,
176
compression: Compression,
177
scratch: &mut Vec<u8>,
178
) -> PolarsResult<Vec<u8>> {
179
read_compressed_buffer::<u8, _>(
180
reader,
181
buffer_length,
182
None,
183
is_little_endian,
184
compression,
185
scratch,
186
)
187
}
188
189
pub fn read_bytes<R: Read + Seek>(
190
buf: &mut VecDeque<IpcBuffer>,
191
reader: &mut R,
192
block_offset: u64,
193
is_little_endian: bool,
194
compression: Option<Compression>,
195
scratch: &mut Vec<u8>,
196
) -> PolarsResult<Buffer<u8>> {
197
let buf = buf
198
.pop_front()
199
.ok_or_else(|| polars_err!(oos = OutOfSpecKind::ExpectedBuffer))?;
200
201
let offset: u64 = buf
202
.offset()
203
.try_into()
204
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
205
206
let buffer_length: usize = buf
207
.length()
208
.try_into()
209
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
210
211
reader.seek(SeekFrom::Start(block_offset + offset))?;
212
213
if let Some(compression) = compression {
214
Ok(read_compressed_bytes(
215
reader,
216
buffer_length,
217
is_little_endian,
218
compression,
219
scratch,
220
)?
221
.into())
222
} else {
223
Ok(read_uncompressed_bytes(reader, buffer_length, is_little_endian)?.into())
224
}
225
}
226
227
pub fn read_buffer<T: NativeType, R: Read + Seek>(
228
buf: &mut VecDeque<IpcBuffer>,
229
length: usize, // in slots
230
reader: &mut R,
231
block_offset: u64,
232
is_little_endian: bool,
233
compression: Option<Compression>,
234
scratch: &mut Vec<u8>,
235
) -> PolarsResult<Buffer<T>> {
236
let buf = buf
237
.pop_front()
238
.ok_or_else(|| polars_err!(oos = OutOfSpecKind::ExpectedBuffer))?;
239
240
let offset: u64 = buf
241
.offset()
242
.try_into()
243
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
244
245
let buffer_length: usize = buf
246
.length()
247
.try_into()
248
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
249
250
reader.seek(SeekFrom::Start(block_offset + offset))?;
251
252
if let Some(compression) = compression {
253
Ok(read_compressed_buffer(
254
reader,
255
buffer_length,
256
Some(length),
257
is_little_endian,
258
compression,
259
scratch,
260
)?
261
.into())
262
} else {
263
Ok(read_uncompressed_buffer(reader, buffer_length, length, is_little_endian)?.into())
264
}
265
}
266
267
fn read_uncompressed_bitmap<R: Read + Seek>(
268
row_limit: usize,
269
bytes: usize,
270
reader: &mut R,
271
) -> PolarsResult<Vec<u8>> {
272
if row_limit > bytes * 8 {
273
polars_bail!(
274
oos = OutOfSpecKind::InvalidBitmap {
275
length: row_limit,
276
number_of_bits: bytes * 8,
277
}
278
)
279
}
280
281
let mut buffer = vec![];
282
buffer.try_reserve(bytes)?;
283
reader
284
.by_ref()
285
.take(bytes as u64)
286
.read_to_end(&mut buffer)?;
287
288
polars_ensure!(buffer.len() == bytes, ComputeError: "Malformed IPC file: expected compressed buffer of len {bytes}, got {}", buffer.len());
289
290
Ok(buffer)
291
}
292
293
fn read_compressed_bitmap<R: Read + Seek>(
294
row_limit: usize,
295
bytes: usize,
296
compression: Compression,
297
reader: &mut R,
298
scratch: &mut Vec<u8>,
299
) -> PolarsResult<Vec<u8>> {
300
scratch.clear();
301
scratch.try_reserve(bytes)?;
302
reader.by_ref().take(bytes as u64).read_to_end(scratch)?;
303
if scratch.len() != bytes {
304
polars_bail!(ComputeError: "Malformed IPC file: expected compressed buffer of len {bytes}, got {}", scratch.len());
305
}
306
307
let decompressed_len_field = i64::from_le_bytes(scratch[..8].try_into().unwrap());
308
let decompressed_bytes: usize = if decompressed_len_field == -1 {
309
scratch.len() - 8
310
} else {
311
decompressed_len_field.try_into().map_err(|_| {
312
polars_err!(ComputeError: "Malformed IPC file: got invalid decompressed length {decompressed_len_field}")
313
})?
314
};
315
316
// In addition to the slicing use case, we allow for excess bytes in untruncated buffers,
317
// see https://github.com/pola-rs/polars/issues/26126
318
// and https://github.com/apache/arrow/issues/48883
319
polars_ensure!(decompressed_bytes >= row_limit.div_ceil(8),
320
ComputeError: "Malformed IPC file: got unexpected decompressed output length {decompressed_bytes}, expected {}", row_limit.div_ceil(8));
321
322
if decompressed_len_field == -1 {
323
return Ok(bytemuck::cast_slice(&scratch[8..]).to_vec());
324
}
325
326
#[expect(clippy::slow_vector_initialization)] // Avoid alloc_zeroed, leads to syscall.
327
let mut buffer = Vec::new();
328
buffer.resize(decompressed_bytes, 0);
329
330
let compression = compression
331
.codec()
332
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferCompression(err)))?;
333
334
match compression {
335
arrow_format::ipc::CompressionType::Lz4Frame => {
336
compression::decompress_lz4(&scratch[8..], &mut buffer)?;
337
},
338
arrow_format::ipc::CompressionType::Zstd => {
339
compression::decompress_zstd(&scratch[8..], &mut buffer)?;
340
},
341
}
342
Ok(buffer)
343
}
344
345
pub fn read_bitmap<R: Read + Seek>(
346
buf: &mut VecDeque<IpcBuffer>,
347
row_limit: usize,
348
reader: &mut R,
349
block_offset: u64,
350
_: bool,
351
compression: Option<Compression>,
352
scratch: &mut Vec<u8>,
353
) -> PolarsResult<Bitmap> {
354
let buf = buf
355
.pop_front()
356
.ok_or_else(|| polars_err!(oos = OutOfSpecKind::ExpectedBuffer))?;
357
358
let offset: u64 = buf
359
.offset()
360
.try_into()
361
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
362
363
let bytes: usize = buf
364
.length()
365
.try_into()
366
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
367
368
reader.seek(SeekFrom::Start(block_offset + offset))?;
369
370
let buffer = if let Some(compression) = compression {
371
read_compressed_bitmap(row_limit, bytes, compression, reader, scratch)
372
} else {
373
read_uncompressed_bitmap(row_limit, bytes, reader)
374
}?;
375
376
Bitmap::try_new(buffer, row_limit)
377
}
378
379
#[allow(clippy::too_many_arguments)]
380
pub fn read_validity<R: Read + Seek>(
381
buffers: &mut VecDeque<IpcBuffer>,
382
field_node: Node,
383
reader: &mut R,
384
block_offset: u64,
385
is_little_endian: bool,
386
compression: Option<Compression>,
387
limit: Option<usize>,
388
scratch: &mut Vec<u8>,
389
) -> PolarsResult<Option<Bitmap>> {
390
let length: usize = field_node
391
.length()
392
.try_into()
393
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
394
let row_limit = limit.map(|limit| limit.min(length)).unwrap_or(length);
395
396
Ok(if field_node.null_count() > 0 {
397
Some(read_bitmap(
398
buffers,
399
row_limit,
400
reader,
401
block_offset,
402
is_little_endian,
403
compression,
404
scratch,
405
)?)
406
} else {
407
let _ = buffers
408
.pop_front()
409
.ok_or_else(|| polars_err!(oos = OutOfSpecKind::ExpectedBuffer))?;
410
None
411
})
412
}
413
414