Path: blob/main/crates/polars-utils/src/chunked_bytes_cursor.rs
7884 views
use std::num::NonZeroUsize;12/// Cursor over fixed size chunks of bytes.3pub struct FixedSizeChunkedBytesCursor<'a, T> {4position: usize,5total_size: usize,6chunk_size: NonZeroUsize,7/// Note, the last chunk is allowed to have a length shorter than the `chunk_size`.8chunked_bytes: &'a [T],9}1011#[derive(Debug)]12pub enum FixedSizeChunkedBytesCursorInitErr {13ChunkLengthMismatch { index: usize },14EmptyFirstChunk,15NoChunks,16}1718impl<'a, T> FixedSizeChunkedBytesCursor<'a, T>19where20T: AsRef<[u8]>,21{22/// Expects `chunked_bytes` to have a non-empty length `n`, where `chunked_bytes[..n - 1]` all have the same length.23pub fn try_new(chunked_bytes: &'a [T]) -> Result<Self, FixedSizeChunkedBytesCursorInitErr> {24use FixedSizeChunkedBytesCursorInitErr as E;2526if chunked_bytes.is_empty() {27return Err(E::NoChunks);28}2930let Ok(chunk_size) = NonZeroUsize::try_from(chunked_bytes[0].as_ref().len()) else {31return Err(E::EmptyFirstChunk);32};3334let mut total_size: usize = 0;3536for (i, bytes) in chunked_bytes.iter().enumerate() {37let bytes = bytes.as_ref();3839if bytes.len() != chunk_size.get() && chunked_bytes.len() - i > 1 {40return Err(E::ChunkLengthMismatch { index: i });41}4243total_size = total_size.checked_add(bytes.len()).unwrap();44}4546Ok(Self {47position: 0,48total_size,49chunk_size,50chunked_bytes,51})52}53}5455impl<'a, T> std::io::Read for FixedSizeChunkedBytesCursor<'a, T>56where57T: AsRef<[u8]>,58{59fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {60let available_bytes = self.total_size.saturating_sub(self.position);61let new_position = self.position + buf.len().min(available_bytes);6263let requested_byte_range = self.position..new_position;6465if requested_byte_range.is_empty() {66return Ok(0);67}6869// First chunk needs special handling as the offset within the chunk can be non-zero.70let mut bytes_read = {71let (first_chunk_idx, offset_in_chunk) = (72requested_byte_range.start / self.chunk_size,73requested_byte_range.start % self.chunk_size,74);75let chunk_bytes = self.chunked_bytes[first_chunk_idx].as_ref();76let len = requested_byte_range77.len()78.min(chunk_bytes.len() - offset_in_chunk);7980buf[..len].copy_from_slice(&chunk_bytes[offset_in_chunk..offset_in_chunk + len]);8182len83};8485assert!(86(requested_byte_range.start + bytes_read).is_multiple_of(self.chunk_size.get())87|| bytes_read == requested_byte_range.len()88);8990for chunk_idx in (requested_byte_range.start + bytes_read) / self.chunk_size91..requested_byte_range.end.div_ceil(self.chunk_size.get())92{93let chunk_bytes = self.chunked_bytes[chunk_idx].as_ref();94let len = (requested_byte_range.len() - bytes_read).min(chunk_bytes.len());9596buf[bytes_read..bytes_read + len].copy_from_slice(&chunk_bytes[..len]);9798bytes_read += len;99}100101assert_eq!(bytes_read, requested_byte_range.len());102103self.position = new_position;104105Ok(requested_byte_range.len())106}107}108109impl<'a, T> std::io::Seek for FixedSizeChunkedBytesCursor<'a, T> {110fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {111// Mostly copied from io::Cursor::seek().112use std::io::SeekFrom;113114let (base_pos, offset) = match pos {115SeekFrom::Start(n) => {116self.position = usize::try_from(n).unwrap().min(self.total_size);117return Ok(self.position as u64);118},119SeekFrom::End(n) => (self.total_size as u64, n),120SeekFrom::Current(n) => (self.position as u64, n),121};122match base_pos.checked_add_signed(offset) {123Some(n) => {124self.position = usize::try_from(n).unwrap();125Ok(self.position as u64)126},127None => Err(std::io::Error::new(128std::io::ErrorKind::InvalidInput,129"invalid seek to a negative or overflowing position",130)),131}132}133}134135136