Path: blob/main/crates/polars-io/src/cloud/cloud_writer/bufferer.rs
8420 views
use bytes::Bytes;1use object_store::PutPayload;23use crate::configs::{cloud_writer_coalesce_run_length, cloud_writer_copy_buffer_size};45/// Utility for byte buffering logic. Accepts both owned [`Bytes`] and borrowed `&[u8]` incoming6/// bytes. Buffered bytes can be flushed to a [`PutPayload`].7pub(super) struct BytesBufferer {8/// Buffer until this many bytes. If set to `0`, buffering is disabled.9target_output_size: usize,10buffered_bytes: Vec<Bytes>,11/// Copy bytes from small or borrowed (`&[u8]`) incoming buffers.12copy_buffer: Vec<u8>,13copy_buffer_reserve_size: usize,14/// Total bytes buffered, includes both `buffered_bytes` and `copy_buffer.len()`.15num_bytes_buffered: usize,16tail_coalesce_num_items: usize,17tail_coalesce_byte_offset: usize,18}1920impl BytesBufferer {21pub(super) fn new(target_output_size: usize) -> Self {22let copy_buffer_reserve_size =23usize::min(target_output_size, cloud_writer_copy_buffer_size().get());2425BytesBufferer {26target_output_size,2728buffered_bytes: Vec::with_capacity(if target_output_size == 0 {29130} else {31usize::max(32target_output_size.div_ceil(copy_buffer_reserve_size),33match cloud_writer_coalesce_run_length() {34n if n <= copy_buffer_reserve_size => n,35_ => 0,36},37)38}),39copy_buffer: vec![],40copy_buffer_reserve_size,41num_bytes_buffered: 0,42tail_coalesce_num_items: 0,43tail_coalesce_byte_offset: 0,44}45}4647/// Push owned [`Bytes`] into this bufferer. This will consume from a mutable reference48/// via [`Bytes::split_to`] until either the bytes is fully consumed, or `self` is full.49pub(super) fn push_owned(&mut self, bytes: &mut Bytes) {50if bytes.is_empty() {51return;52}5354let available_capacity = self.available_capacity_current_chunk(bytes.len());5556if available_capacity == 0 {57return;58}5960loop {61let copy_buffer_available_capacity = usize::min(62available_capacity,63self.copy_buffer.capacity() - self.copy_buffer.len(),64);6566if bytes.len() <= copy_buffer_available_capacity {67self.copy_buffer.extend_from_slice(bytes);68self.num_bytes_buffered += bytes.len();69*bytes = Bytes::new();7071return;72}7374self.commit_active_copy_buffer();7576if self.tail_coalesce_num_items >= cloud_writer_coalesce_run_length() {77self.coalesce_tail();78continue;79}8081break;82}8384let bytes = bytes.split_to(usize::min(bytes.len(), available_capacity));8586let bytes_len = bytes.len();87self.buffered_bytes.push(bytes);88self.num_bytes_buffered += bytes_len;8990if self.num_bytes_buffered - self.tail_coalesce_byte_offset <= self.copy_buffer_reserve_size91{92self.tail_coalesce_num_items += 1;93} else {94self.reset_tail_coalesce_counters();95}96}9798/// Push borrowed `&[u8]` into this bufferer. This will consume from a mutable reference99/// via `split_off` until either the slice is fully consumed, or `self` is full.100pub(super) fn push_slice(&mut self, bytes: &mut &[u8]) {101while !bytes.is_empty() {102let available_capacity = self.available_capacity_current_chunk(bytes.len());103104if available_capacity == 0 {105break;106}107108let mut copy_buffer_available_capacity = usize::min(109available_capacity,110self.copy_buffer.capacity() - self.copy_buffer.len(),111);112113if copy_buffer_available_capacity == 0 {114self.commit_active_copy_buffer();115copy_buffer_available_capacity =116self.reserve_active_copy_buffer(available_capacity);117}118119let n = usize::min(bytes.len(), copy_buffer_available_capacity);120121self.copy_buffer122.extend_from_slice(bytes.split_off(..n).unwrap());123self.num_bytes_buffered += n;124}125}126127fn coalesce_tail(&mut self) {128if self.tail_coalesce_num_items < 2 {129return;130}131132assert_eq!(self.copy_buffer.capacity(), 0);133assert!(self.tail_coalesce_byte_offset < self.target_output_size);134135let copy_buffer_reserve = usize::min(136self.copy_buffer_reserve_size,137self.target_output_size - self.tail_coalesce_byte_offset,138);139140assert!(copy_buffer_reserve >= (self.num_bytes_buffered - self.tail_coalesce_byte_offset));141142let drain_start: usize = self.buffered_bytes.len() - self.tail_coalesce_num_items;143let drain_range = drain_start..;144self.reset_tail_coalesce_counters();145146self.copy_buffer.reserve_exact(copy_buffer_reserve);147self.buffered_bytes148.drain(drain_range)149.for_each(|bytes| self.copy_buffer.extend_from_slice(&bytes));150}151152fn reset_tail_coalesce_counters(&mut self) {153self.tail_coalesce_byte_offset = self.num_bytes_buffered;154self.tail_coalesce_num_items = 0;155}156157pub(super) fn is_empty(&self) -> bool {158if self.num_bytes_buffered == 0 {159assert!(self.buffered_bytes.is_empty());160assert_eq!(self.copy_buffer.capacity(), 0);161true162} else {163false164}165}166167pub(super) fn is_full(&self) -> bool {168self.num_bytes_buffered >= usize::max(1, self.target_output_size)169}170171pub(super) fn flush_full_chunk(&mut self) -> Option<PutPayload> {172self.is_full().then(|| self.flush().unwrap())173}174175pub(super) fn flush(&mut self) -> Option<PutPayload> {176if self.is_empty() {177return None;178}179180self.commit_active_copy_buffer();181182self.num_bytes_buffered = 0;183self.reset_tail_coalesce_counters();184185let payload = PutPayload::from_iter(self.buffered_bytes.drain(..));186187Some(payload)188}189190fn available_capacity_current_chunk(&self, incoming_len: usize) -> usize {191if self.target_output_size > 0 {192self.target_output_size - self.num_bytes_buffered193} else if self.is_empty() {194incoming_len195} else {1960197}198}199200#[inline]201fn commit_active_copy_buffer(&mut self) {202if !self.copy_buffer.is_empty() {203self.num_bytes_buffered -= self.copy_buffer.len();204let mut bytes: Bytes = std::mem::take(&mut self.copy_buffer).into();205self.push_owned(&mut bytes);206assert!(bytes.is_empty());207}208}209210fn reserve_active_copy_buffer(&mut self, available_capacity_current_chunk: usize) -> usize {211let n = if self.copy_buffer_reserve_size > 0 {212usize::min(213self.copy_buffer_reserve_size,214available_capacity_current_chunk,215)216} else {217available_capacity_current_chunk218};219220self.copy_buffer.reserve_exact(n);221222usize::min(223self.copy_buffer.capacity() - self.copy_buffer.len(),224available_capacity_current_chunk,225)226}227}228229230