Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/cloud/cloud_writer/bufferer.rs
8420 views
1
use bytes::Bytes;
2
use object_store::PutPayload;
3
4
use crate::configs::{cloud_writer_coalesce_run_length, cloud_writer_copy_buffer_size};
5
6
/// Utility for byte buffering logic. Accepts both owned [`Bytes`] and borrowed `&[u8]` incoming
7
/// bytes. Buffered bytes can be flushed to a [`PutPayload`].
8
pub(super) struct BytesBufferer {
9
/// Buffer until this many bytes. If set to `0`, buffering is disabled.
10
target_output_size: usize,
11
buffered_bytes: Vec<Bytes>,
12
/// Copy bytes from small or borrowed (`&[u8]`) incoming buffers.
13
copy_buffer: Vec<u8>,
14
copy_buffer_reserve_size: usize,
15
/// Total bytes buffered, includes both `buffered_bytes` and `copy_buffer.len()`.
16
num_bytes_buffered: usize,
17
tail_coalesce_num_items: usize,
18
tail_coalesce_byte_offset: usize,
19
}
20
21
impl BytesBufferer {
22
pub(super) fn new(target_output_size: usize) -> Self {
23
let copy_buffer_reserve_size =
24
usize::min(target_output_size, cloud_writer_copy_buffer_size().get());
25
26
BytesBufferer {
27
target_output_size,
28
29
buffered_bytes: Vec::with_capacity(if target_output_size == 0 {
30
1
31
} else {
32
usize::max(
33
target_output_size.div_ceil(copy_buffer_reserve_size),
34
match cloud_writer_coalesce_run_length() {
35
n if n <= copy_buffer_reserve_size => n,
36
_ => 0,
37
},
38
)
39
}),
40
copy_buffer: vec![],
41
copy_buffer_reserve_size,
42
num_bytes_buffered: 0,
43
tail_coalesce_num_items: 0,
44
tail_coalesce_byte_offset: 0,
45
}
46
}
47
48
/// Push owned [`Bytes`] into this bufferer. This will consume from a mutable reference
49
/// via [`Bytes::split_to`] until either the bytes is fully consumed, or `self` is full.
50
pub(super) fn push_owned(&mut self, bytes: &mut Bytes) {
51
if bytes.is_empty() {
52
return;
53
}
54
55
let available_capacity = self.available_capacity_current_chunk(bytes.len());
56
57
if available_capacity == 0 {
58
return;
59
}
60
61
loop {
62
let copy_buffer_available_capacity = usize::min(
63
available_capacity,
64
self.copy_buffer.capacity() - self.copy_buffer.len(),
65
);
66
67
if bytes.len() <= copy_buffer_available_capacity {
68
self.copy_buffer.extend_from_slice(bytes);
69
self.num_bytes_buffered += bytes.len();
70
*bytes = Bytes::new();
71
72
return;
73
}
74
75
self.commit_active_copy_buffer();
76
77
if self.tail_coalesce_num_items >= cloud_writer_coalesce_run_length() {
78
self.coalesce_tail();
79
continue;
80
}
81
82
break;
83
}
84
85
let bytes = bytes.split_to(usize::min(bytes.len(), available_capacity));
86
87
let bytes_len = bytes.len();
88
self.buffered_bytes.push(bytes);
89
self.num_bytes_buffered += bytes_len;
90
91
if self.num_bytes_buffered - self.tail_coalesce_byte_offset <= self.copy_buffer_reserve_size
92
{
93
self.tail_coalesce_num_items += 1;
94
} else {
95
self.reset_tail_coalesce_counters();
96
}
97
}
98
99
/// Push borrowed `&[u8]` into this bufferer. This will consume from a mutable reference
100
/// via `split_off` until either the slice is fully consumed, or `self` is full.
101
pub(super) fn push_slice(&mut self, bytes: &mut &[u8]) {
102
while !bytes.is_empty() {
103
let available_capacity = self.available_capacity_current_chunk(bytes.len());
104
105
if available_capacity == 0 {
106
break;
107
}
108
109
let mut copy_buffer_available_capacity = usize::min(
110
available_capacity,
111
self.copy_buffer.capacity() - self.copy_buffer.len(),
112
);
113
114
if copy_buffer_available_capacity == 0 {
115
self.commit_active_copy_buffer();
116
copy_buffer_available_capacity =
117
self.reserve_active_copy_buffer(available_capacity);
118
}
119
120
let n = usize::min(bytes.len(), copy_buffer_available_capacity);
121
122
self.copy_buffer
123
.extend_from_slice(bytes.split_off(..n).unwrap());
124
self.num_bytes_buffered += n;
125
}
126
}
127
128
fn coalesce_tail(&mut self) {
129
if self.tail_coalesce_num_items < 2 {
130
return;
131
}
132
133
assert_eq!(self.copy_buffer.capacity(), 0);
134
assert!(self.tail_coalesce_byte_offset < self.target_output_size);
135
136
let copy_buffer_reserve = usize::min(
137
self.copy_buffer_reserve_size,
138
self.target_output_size - self.tail_coalesce_byte_offset,
139
);
140
141
assert!(copy_buffer_reserve >= (self.num_bytes_buffered - self.tail_coalesce_byte_offset));
142
143
let drain_start: usize = self.buffered_bytes.len() - self.tail_coalesce_num_items;
144
let drain_range = drain_start..;
145
self.reset_tail_coalesce_counters();
146
147
self.copy_buffer.reserve_exact(copy_buffer_reserve);
148
self.buffered_bytes
149
.drain(drain_range)
150
.for_each(|bytes| self.copy_buffer.extend_from_slice(&bytes));
151
}
152
153
fn reset_tail_coalesce_counters(&mut self) {
154
self.tail_coalesce_byte_offset = self.num_bytes_buffered;
155
self.tail_coalesce_num_items = 0;
156
}
157
158
pub(super) fn is_empty(&self) -> bool {
159
if self.num_bytes_buffered == 0 {
160
assert!(self.buffered_bytes.is_empty());
161
assert_eq!(self.copy_buffer.capacity(), 0);
162
true
163
} else {
164
false
165
}
166
}
167
168
pub(super) fn is_full(&self) -> bool {
169
self.num_bytes_buffered >= usize::max(1, self.target_output_size)
170
}
171
172
pub(super) fn flush_full_chunk(&mut self) -> Option<PutPayload> {
173
self.is_full().then(|| self.flush().unwrap())
174
}
175
176
pub(super) fn flush(&mut self) -> Option<PutPayload> {
177
if self.is_empty() {
178
return None;
179
}
180
181
self.commit_active_copy_buffer();
182
183
self.num_bytes_buffered = 0;
184
self.reset_tail_coalesce_counters();
185
186
let payload = PutPayload::from_iter(self.buffered_bytes.drain(..));
187
188
Some(payload)
189
}
190
191
fn available_capacity_current_chunk(&self, incoming_len: usize) -> usize {
192
if self.target_output_size > 0 {
193
self.target_output_size - self.num_bytes_buffered
194
} else if self.is_empty() {
195
incoming_len
196
} else {
197
0
198
}
199
}
200
201
#[inline]
202
fn commit_active_copy_buffer(&mut self) {
203
if !self.copy_buffer.is_empty() {
204
self.num_bytes_buffered -= self.copy_buffer.len();
205
let mut bytes: Bytes = std::mem::take(&mut self.copy_buffer).into();
206
self.push_owned(&mut bytes);
207
assert!(bytes.is_empty());
208
}
209
}
210
211
fn reserve_active_copy_buffer(&mut self, available_capacity_current_chunk: usize) -> usize {
212
let n = if self.copy_buffer_reserve_size > 0 {
213
usize::min(
214
self.copy_buffer_reserve_size,
215
available_capacity_current_chunk,
216
)
217
} else {
218
available_capacity_current_chunk
219
};
220
221
self.copy_buffer.reserve_exact(n);
222
223
usize::min(
224
self.copy_buffer.capacity() - self.copy_buffer.len(),
225
available_capacity_current_chunk,
226
)
227
}
228
}
229
230