Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/io/ipc/write/common_sync.rs
8424 views
1
use std::io::Write;
2
use std::sync::Arc;
3
4
use arrow_format::ipc::planus::Builder;
5
use bytes::Bytes;
6
use polars_error::PolarsResult;
7
8
use super::super::{ARROW_MAGIC_V2, ARROW_MAGIC_V2_PADDED, CONTINUATION_MARKER};
9
use super::common::{EncodedData, pad_to_64};
10
use super::schema;
11
use crate::datatypes::*;
12
use crate::io::ipc::IpcField;
13
use crate::io::ipc::write::EncodedDataBytes;
14
15
/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
16
pub fn write_message<W: Write>(
17
writer: &mut W,
18
encoded: &EncodedData,
19
) -> PolarsResult<(usize, usize)> {
20
let arrow_data_len = encoded.arrow_data.len();
21
22
let a = 8 - 1;
23
let buffer = &encoded.ipc_message;
24
let flatbuf_size = buffer.len();
25
let prefix_size = 8;
26
let aligned_size = (flatbuf_size + prefix_size + a) & !a;
27
let padding_bytes = aligned_size - flatbuf_size - prefix_size;
28
29
write_continuation(writer, (aligned_size - prefix_size) as i32)?;
30
31
// write the flatbuf
32
if flatbuf_size > 0 {
33
writer.write_all(buffer)?;
34
}
35
// write padding
36
// aligned to a 8 byte boundary, so maximum is [u8;8]
37
const PADDING_MAX: [u8; 8] = [0u8; 8];
38
writer.write_all(&PADDING_MAX[..padding_bytes])?;
39
40
// write arrow data
41
let body_len = if arrow_data_len > 0 {
42
write_body_buffers(writer, &encoded.arrow_data)?
43
} else {
44
0
45
};
46
47
Ok((aligned_size, body_len))
48
}
49
50
/// Encapsulate an encoded IPC message into the provided Bytes queue. Ownership
51
/// of the data will move. Returns the metadata and body length in bytes.
52
pub fn push_message(queue: &mut Vec<Bytes>, encoded: EncodedDataBytes) -> (usize, usize) {
53
let arrow_data_len = encoded.arrow_data.len();
54
55
let a = 8 - 1;
56
let buffer = encoded.ipc_message;
57
let flatbuf_size = buffer.len();
58
let prefix_size = 8;
59
let aligned_size = (flatbuf_size + prefix_size + a) & !a;
60
let padding_bytes = aligned_size - flatbuf_size - prefix_size;
61
62
// Continuation.
63
let total_len = (aligned_size - prefix_size) as i32;
64
queue.push(Bytes::from_static(&CONTINUATION_MARKER));
65
queue.push(Bytes::copy_from_slice(&total_len.to_le_bytes()[..]));
66
67
// Write the flatbuf.
68
if flatbuf_size > 0 {
69
queue.push(buffer);
70
}
71
72
// Write padding.
73
// This is aligned to a 8 byte boundary, so maximum is [u8;8].
74
const PADDING_MAX: [u8; 8] = [0u8; 8];
75
queue.push(Bytes::from_static(&PADDING_MAX[..padding_bytes]));
76
77
// write arrow data
78
let body_len = if arrow_data_len > 0 {
79
let data = encoded.arrow_data;
80
let len = data.len();
81
let pad_len = pad_to_64(data.len());
82
let total_len = len + pad_len;
83
84
queue.push(data);
85
if pad_len > 0 {
86
queue.push(Bytes::from(vec![0u8; pad_len]));
87
}
88
total_len
89
} else {
90
0
91
};
92
93
(aligned_size, body_len)
94
}
95
96
fn write_body_buffers<W: Write>(mut writer: W, data: &[u8]) -> PolarsResult<usize> {
97
let len = data.len();
98
let pad_len = pad_to_64(data.len());
99
let total_len = len + pad_len;
100
101
// write body buffer
102
writer.write_all(data)?;
103
if pad_len > 0 {
104
writer.write_all(&vec![0u8; pad_len][..])?;
105
}
106
107
Ok(total_len)
108
}
109
110
/// Write a record batch to the writer, writing the message size before the message
111
/// if the record batch is being written to a stream
112
pub fn write_continuation<W: Write>(writer: &mut W, total_len: i32) -> PolarsResult<usize> {
113
writer.write_all(&CONTINUATION_MARKER)?;
114
writer.write_all(&total_len.to_le_bytes()[..])?;
115
Ok(8)
116
}
117
118
/// Push the IPC magic bytes.
119
pub fn push_magic(queue: &mut Vec<Bytes>, padded: bool) -> usize {
120
if padded {
121
queue.push(Bytes::from_static(&ARROW_MAGIC_V2_PADDED));
122
8
123
} else {
124
queue.push(Bytes::from_static(&ARROW_MAGIC_V2));
125
6
126
}
127
}
128
129
/// Append a continuation marker and the `total_len` value to the Bytes queue.
130
pub fn push_continuation(queue: &mut Vec<Bytes>, total_len: i32) -> usize {
131
let mut buf = [0u8; 8];
132
buf[..4].copy_from_slice(&CONTINUATION_MARKER);
133
buf[4..].copy_from_slice(&total_len.to_le_bytes());
134
queue.push(Bytes::copy_from_slice(&buf));
135
8
136
}
137
138
/// Build the IPC File Footer and accumulate the owned Bytes into the queue.
139
/// Returns the total number of bytes added.
140
pub fn push_footer(
141
queue: &mut Vec<Bytes>,
142
schema: &ArrowSchema,
143
ipc_fields: &[IpcField],
144
dictionary_blocks: Vec<arrow_format::ipc::Block>,
145
record_blocks: Vec<arrow_format::ipc::Block>,
146
// Placeholder, inherited from FileWriter for future use.
147
custom_schema_metadata: Option<Arc<Metadata>>,
148
) -> usize {
149
let mut total_len = 0;
150
151
total_len += push_continuation(queue, 0);
152
153
let schema = schema::serialize_schema(schema, ipc_fields, custom_schema_metadata.as_deref());
154
155
let root = arrow_format::ipc::Footer {
156
version: arrow_format::ipc::MetadataVersion::V5,
157
schema: Some(Box::new(schema)),
158
dictionaries: Some(dictionary_blocks),
159
record_batches: Some(record_blocks),
160
custom_metadata: None,
161
};
162
let mut builder = Builder::new();
163
let footer_data = builder.finish(&root, None);
164
let footer_data = Bytes::copy_from_slice(footer_data);
165
let footer_data_len = footer_data.len();
166
167
queue.push(footer_data);
168
total_len += footer_data_len;
169
170
queue.push(Bytes::copy_from_slice(
171
&(footer_data_len as i32).to_le_bytes(),
172
));
173
total_len += 4;
174
175
total_len
176
}
177
178