Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-utils/src/chunked_bytes_cursor.rs
7884 views
1
use std::num::NonZeroUsize;
2
3
/// Cursor over fixed size chunks of bytes.
4
pub struct FixedSizeChunkedBytesCursor<'a, T> {
5
position: usize,
6
total_size: usize,
7
chunk_size: NonZeroUsize,
8
/// Note, the last chunk is allowed to have a length shorter than the `chunk_size`.
9
chunked_bytes: &'a [T],
10
}
11
12
#[derive(Debug)]
13
pub enum FixedSizeChunkedBytesCursorInitErr {
14
ChunkLengthMismatch { index: usize },
15
EmptyFirstChunk,
16
NoChunks,
17
}
18
19
impl<'a, T> FixedSizeChunkedBytesCursor<'a, T>
20
where
21
T: AsRef<[u8]>,
22
{
23
/// Expects `chunked_bytes` to have a non-empty length `n`, where `chunked_bytes[..n - 1]` all have the same length.
24
pub fn try_new(chunked_bytes: &'a [T]) -> Result<Self, FixedSizeChunkedBytesCursorInitErr> {
25
use FixedSizeChunkedBytesCursorInitErr as E;
26
27
if chunked_bytes.is_empty() {
28
return Err(E::NoChunks);
29
}
30
31
let Ok(chunk_size) = NonZeroUsize::try_from(chunked_bytes[0].as_ref().len()) else {
32
return Err(E::EmptyFirstChunk);
33
};
34
35
let mut total_size: usize = 0;
36
37
for (i, bytes) in chunked_bytes.iter().enumerate() {
38
let bytes = bytes.as_ref();
39
40
if bytes.len() != chunk_size.get() && chunked_bytes.len() - i > 1 {
41
return Err(E::ChunkLengthMismatch { index: i });
42
}
43
44
total_size = total_size.checked_add(bytes.len()).unwrap();
45
}
46
47
Ok(Self {
48
position: 0,
49
total_size,
50
chunk_size,
51
chunked_bytes,
52
})
53
}
54
}
55
56
impl<'a, T> std::io::Read for FixedSizeChunkedBytesCursor<'a, T>
57
where
58
T: AsRef<[u8]>,
59
{
60
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
61
let available_bytes = self.total_size.saturating_sub(self.position);
62
let new_position = self.position + buf.len().min(available_bytes);
63
64
let requested_byte_range = self.position..new_position;
65
66
if requested_byte_range.is_empty() {
67
return Ok(0);
68
}
69
70
// First chunk needs special handling as the offset within the chunk can be non-zero.
71
let mut bytes_read = {
72
let (first_chunk_idx, offset_in_chunk) = (
73
requested_byte_range.start / self.chunk_size,
74
requested_byte_range.start % self.chunk_size,
75
);
76
let chunk_bytes = self.chunked_bytes[first_chunk_idx].as_ref();
77
let len = requested_byte_range
78
.len()
79
.min(chunk_bytes.len() - offset_in_chunk);
80
81
buf[..len].copy_from_slice(&chunk_bytes[offset_in_chunk..offset_in_chunk + len]);
82
83
len
84
};
85
86
assert!(
87
(requested_byte_range.start + bytes_read).is_multiple_of(self.chunk_size.get())
88
|| bytes_read == requested_byte_range.len()
89
);
90
91
for chunk_idx in (requested_byte_range.start + bytes_read) / self.chunk_size
92
..requested_byte_range.end.div_ceil(self.chunk_size.get())
93
{
94
let chunk_bytes = self.chunked_bytes[chunk_idx].as_ref();
95
let len = (requested_byte_range.len() - bytes_read).min(chunk_bytes.len());
96
97
buf[bytes_read..bytes_read + len].copy_from_slice(&chunk_bytes[..len]);
98
99
bytes_read += len;
100
}
101
102
assert_eq!(bytes_read, requested_byte_range.len());
103
104
self.position = new_position;
105
106
Ok(requested_byte_range.len())
107
}
108
}
109
110
impl<'a, T> std::io::Seek for FixedSizeChunkedBytesCursor<'a, T> {
111
fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
112
// Mostly copied from io::Cursor::seek().
113
use std::io::SeekFrom;
114
115
let (base_pos, offset) = match pos {
116
SeekFrom::Start(n) => {
117
self.position = usize::try_from(n).unwrap().min(self.total_size);
118
return Ok(self.position as u64);
119
},
120
SeekFrom::End(n) => (self.total_size as u64, n),
121
SeekFrom::Current(n) => (self.position as u64, n),
122
};
123
match base_pos.checked_add_signed(offset) {
124
Some(n) => {
125
self.position = usize::try_from(n).unwrap();
126
Ok(self.position as u64)
127
},
128
None => Err(std::io::Error::new(
129
std::io::ErrorKind::InvalidInput,
130
"invalid seek to a negative or overflowing position",
131
)),
132
}
133
}
134
}
135
136