Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/io/ipc/write/common_sync.rs
6940 views
1
use std::io::Write;
2
3
use polars_error::PolarsResult;
4
5
use super::super::CONTINUATION_MARKER;
6
use super::common::{EncodedData, pad_to_64};
7
8
/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
9
pub fn write_message<W: Write>(
10
writer: &mut W,
11
encoded: &EncodedData,
12
) -> PolarsResult<(usize, usize)> {
13
let arrow_data_len = encoded.arrow_data.len();
14
15
let a = 8 - 1;
16
let buffer = &encoded.ipc_message;
17
let flatbuf_size = buffer.len();
18
let prefix_size = 8;
19
let aligned_size = (flatbuf_size + prefix_size + a) & !a;
20
let padding_bytes = aligned_size - flatbuf_size - prefix_size;
21
22
write_continuation(writer, (aligned_size - prefix_size) as i32)?;
23
24
// write the flatbuf
25
if flatbuf_size > 0 {
26
writer.write_all(buffer)?;
27
}
28
// write padding
29
// aligned to a 8 byte boundary, so maximum is [u8;8]
30
const PADDING_MAX: [u8; 8] = [0u8; 8];
31
writer.write_all(&PADDING_MAX[..padding_bytes])?;
32
33
// write arrow data
34
let body_len = if arrow_data_len > 0 {
35
write_body_buffers(writer, &encoded.arrow_data)?
36
} else {
37
0
38
};
39
40
Ok((aligned_size, body_len))
41
}
42
43
fn write_body_buffers<W: Write>(mut writer: W, data: &[u8]) -> PolarsResult<usize> {
44
let len = data.len();
45
let pad_len = pad_to_64(data.len());
46
let total_len = len + pad_len;
47
48
// write body buffer
49
writer.write_all(data)?;
50
if pad_len > 0 {
51
writer.write_all(&vec![0u8; pad_len][..])?;
52
}
53
54
Ok(total_len)
55
}
56
57
/// Write a record batch to the writer, writing the message size before the message
58
/// if the record batch is being written to a stream
59
pub fn write_continuation<W: Write>(writer: &mut W, total_len: i32) -> PolarsResult<usize> {
60
writer.write_all(&CONTINUATION_MARKER)?;
61
writer.write_all(&total_len.to_le_bytes()[..])?;
62
Ok(8)
63
}
64
65