Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sources/ipc/metadata.rs
8439 views
1
use arrow::io::ipc::read::OutOfSpecKind;
2
use polars_buffer::Buffer;
3
use polars_error::{PolarsResult, polars_bail, polars_ensure, polars_err};
4
use polars_io::utils::byte_source::{ByteSource, DynByteSource};
5
6
/// Read the metadata bytes of a parquet file, does not decode the bytes. If during metadata fetch
7
/// the bytes of the entire file are loaded, it is returned in the second return value.
8
pub async fn read_ipc_metadata_bytes(
9
byte_source: &DynByteSource,
10
verbose: bool,
11
) -> PolarsResult<(Buffer<u8>, Option<Buffer<u8>>)> {
12
const FOOTER_HEADER_SIZE: usize = 10;
13
const ARROW_MAGIC_V1: [u8; 4] = [b'F', b'E', b'A', b'1'];
14
const ARROW_MAGIC_V2: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];
15
16
let file_size = byte_source.get_size().await?;
17
18
polars_ensure!(
19
file_size >= FOOTER_HEADER_SIZE,
20
ComputeError: "ipc file size is smaller than the minimum"
21
);
22
23
let estimated_metadata_size = if let DynByteSource::Buffer(_) = byte_source {
24
// Mmapped or in-memory, reads are free.
25
file_size
26
} else {
27
(file_size / 2048).clamp(16_384, 131_072).min(file_size)
28
};
29
30
let bytes = byte_source
31
.get_range((file_size - estimated_metadata_size)..file_size)
32
.await?;
33
34
let footer_header_bytes = bytes.clone().sliced((bytes.len() - FOOTER_HEADER_SIZE)..);
35
36
if footer_header_bytes[4..] != ARROW_MAGIC_V2 {
37
if footer_header_bytes[..4] == ARROW_MAGIC_V1 {
38
polars_bail!(ComputeError: "feather v1 not supported");
39
}
40
return Err(polars_err!(oos = OutOfSpecKind::InvalidFooter));
41
}
42
43
let footer_size = u32::from_le_bytes(footer_header_bytes[..4].try_into().unwrap());
44
let footer_size = footer_size as usize + FOOTER_HEADER_SIZE;
45
46
polars_ensure!(
47
file_size >= footer_size,
48
ComputeError:
49
"file size ({file_size}) is less than the indicated footer size ({footer_size})",
50
);
51
52
if bytes.len() < footer_size {
53
debug_assert!(!matches!(byte_source, DynByteSource::Buffer(_)));
54
if verbose {
55
eprintln!(
56
"[IpcFileReader]: Extra {} bytes need to be fetched for metadata \
57
(initial estimate = {}, actual size = {})",
58
footer_size - estimated_metadata_size,
59
bytes.len(),
60
footer_size,
61
);
62
}
63
64
let mut out = Vec::with_capacity(footer_size);
65
let offset = file_size - footer_size;
66
let len = footer_size - bytes.len();
67
let delta_bytes = byte_source.get_range(offset..(offset + len)).await?;
68
69
debug_assert!(out.capacity() >= delta_bytes.len() + bytes.len());
70
71
out.extend_from_slice(&delta_bytes);
72
out.extend_from_slice(&bytes);
73
74
Ok((Buffer::from_vec(out), None))
75
} else {
76
if verbose && !matches!(byte_source, DynByteSource::Buffer(_)) {
77
eprintln!(
78
"[IpcFileReader]: Fetched all bytes for metadata on first try \
79
(initial estimate = {}, actual size = {}, excess = {}, total file size = {})",
80
bytes.len(),
81
footer_size,
82
estimated_metadata_size - footer_size,
83
file_size,
84
);
85
}
86
87
let metadata_bytes = bytes.clone().sliced((bytes.len() - footer_size)..);
88
89
if bytes.len() == file_size {
90
Ok((metadata_bytes, Some(bytes)))
91
} else {
92
debug_assert!(!matches!(byte_source, DynByteSource::Buffer(_)));
93
let metadata_bytes = if bytes.len() - footer_size >= bytes.len() {
94
// Re-allocate to drop the excess bytes
95
Buffer::from_vec(metadata_bytes.to_vec())
96
} else {
97
metadata_bytes
98
};
99
100
Ok((metadata_bytes, None))
101
}
102
}
103
}
104
105