Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/io/ipc/read/reader.rs
8421 views
1
use std::io::{Read, Seek};
2
3
use arrow_format::ipc::KeyValueRef;
4
use polars_error::{PolarsResult, polars_err};
5
use polars_utils::bool::UnsafeBool;
6
7
use super::common::*;
8
use super::file::{get_message_from_block, get_message_from_block_offset, get_record_batch};
9
use super::{Dictionaries, FileMetadata, read_batch, read_file_dictionaries};
10
use crate::array::Array;
11
use crate::datatypes::ArrowSchema;
12
use crate::record_batch::RecordBatchT;
13
14
/// An iterator of [`RecordBatchT`]s from an Arrow IPC file.
15
pub struct FileReader<R: Read + Seek> {
16
reader: R,
17
metadata: FileMetadata,
18
// the dictionaries are going to be read
19
dictionaries: Option<Dictionaries>,
20
current_block: usize,
21
projection: Option<ProjectionInfo>,
22
remaining: usize,
23
data_scratch: Vec<u8>,
24
message_scratch: Vec<u8>,
25
checked: UnsafeBool,
26
}
27
28
impl<R: Read + Seek> FileReader<R> {
29
/// Creates a new [`FileReader`]. Use `projection` to only take certain columns.
30
/// # Panic
31
/// Panics iff the projection is not in increasing order (e.g. `[1, 0]` nor `[0, 1, 1]` are valid)
32
pub fn new(
33
reader: R,
34
metadata: FileMetadata,
35
projection: Option<Vec<usize>>,
36
limit: Option<usize>,
37
) -> Self {
38
let projection =
39
projection.map(|projection| prepare_projection(&metadata.schema, projection));
40
Self {
41
reader,
42
metadata,
43
dictionaries: Default::default(),
44
projection,
45
remaining: limit.unwrap_or(usize::MAX),
46
current_block: 0,
47
data_scratch: Default::default(),
48
message_scratch: Default::default(),
49
checked: Default::default(),
50
}
51
}
52
53
/// # Safety
54
/// Don't do expensive checks.
55
/// This means the data source has to be trusted to be correct.
56
pub unsafe fn unchecked(mut self) -> Self {
57
unsafe {
58
self.checked = UnsafeBool::new_false();
59
}
60
self
61
}
62
63
/// Creates a new [`FileReader`]. Use `projection` to only take certain columns.
64
/// # Panic
65
/// Panics iff the projection is not in increasing order (e.g. `[1, 0]` nor `[0, 1, 1]` are valid)
66
pub fn new_with_projection_info(
67
reader: R,
68
metadata: FileMetadata,
69
projection: Option<ProjectionInfo>,
70
limit: Option<usize>,
71
) -> Self {
72
Self {
73
reader,
74
metadata,
75
dictionaries: Default::default(),
76
projection,
77
remaining: limit.unwrap_or(usize::MAX),
78
current_block: 0,
79
data_scratch: Default::default(),
80
message_scratch: Default::default(),
81
checked: Default::default(),
82
}
83
}
84
85
/// Return the schema of the file
86
pub fn schema(&self) -> &ArrowSchema {
87
self.projection
88
.as_ref()
89
.map(|x| &x.schema)
90
.unwrap_or(&self.metadata.schema)
91
}
92
93
/// Returns the [`FileMetadata`]
94
pub fn metadata(&self) -> &FileMetadata {
95
&self.metadata
96
}
97
98
/// Consumes this FileReader, returning the underlying reader
99
pub fn into_inner(self) -> R {
100
self.reader
101
}
102
103
pub fn set_current_block(&mut self, idx: usize) {
104
self.current_block = idx;
105
}
106
107
pub fn get_current_block(&self) -> usize {
108
self.current_block
109
}
110
111
/// Get the inner memory scratches so they can be reused in a new writer.
112
/// This can be utilized to save memory allocations for performance reasons.
113
pub fn take_projection_info(&mut self) -> Option<ProjectionInfo> {
114
std::mem::take(&mut self.projection)
115
}
116
117
/// Get the inner memory scratches so they can be reused in a new writer.
118
/// This can be utilized to save memory allocations for performance reasons.
119
pub fn take_scratches(&mut self) -> (Vec<u8>, Vec<u8>) {
120
(
121
std::mem::take(&mut self.data_scratch),
122
std::mem::take(&mut self.message_scratch),
123
)
124
}
125
126
/// Set the inner memory scratches so they can be reused in a new writer.
127
/// This can be utilized to save memory allocations for performance reasons.
128
pub fn set_scratches(&mut self, scratches: (Vec<u8>, Vec<u8>)) {
129
(self.data_scratch, self.message_scratch) = scratches;
130
}
131
132
pub fn read_dictionaries(&mut self) -> PolarsResult<()> {
133
if self.dictionaries.is_none() {
134
self.dictionaries = Some(read_file_dictionaries(
135
&mut self.reader,
136
&self.metadata,
137
&mut self.data_scratch,
138
self.checked,
139
)?);
140
};
141
Ok(())
142
}
143
144
/// Skip over blocks until we have seen at most `offset` rows, returning how many rows we are
145
/// still too see.
146
///
147
/// This will never go over the `offset`. Meaning that if the `offset < current_block.len()`,
148
/// the block will not be skipped.
149
pub fn skip_blocks_till_limit(&mut self, offset: u64) -> PolarsResult<u64> {
150
let mut remaining_offset = offset;
151
152
for (i, block) in self.metadata.blocks.iter().enumerate() {
153
let message =
154
get_message_from_block(&mut self.reader, block, &mut self.message_scratch)?;
155
let record_batch = get_record_batch(message)?;
156
157
let length = record_batch.length()?;
158
let length = length as u64;
159
160
if length > remaining_offset {
161
self.current_block = i;
162
return Ok(remaining_offset);
163
}
164
165
remaining_offset -= length;
166
}
167
168
self.current_block = self.metadata.blocks.len();
169
Ok(remaining_offset)
170
}
171
172
pub fn next_record_batch(
173
&mut self,
174
) -> Option<PolarsResult<arrow_format::ipc::RecordBatchRef<'_>>> {
175
let block = self.metadata.blocks.get(self.current_block)?;
176
self.current_block += 1;
177
let message = get_message_from_block(&mut self.reader, block, &mut self.message_scratch);
178
Some(message.and_then(|m| get_record_batch(m)))
179
}
180
}
181
182
impl<R: Read + Seek> Iterator for FileReader<R> {
183
type Item = PolarsResult<RecordBatchT<Box<dyn Array>>>;
184
185
fn next(&mut self) -> Option<Self::Item> {
186
// get current block
187
if self.current_block == self.metadata.blocks.len() {
188
return None;
189
}
190
191
match self.read_dictionaries() {
192
Ok(_) => {},
193
Err(e) => return Some(Err(e)),
194
};
195
196
let block = self.current_block;
197
self.current_block += 1;
198
199
let chunk = read_batch(
200
&mut self.reader,
201
self.dictionaries.as_ref().unwrap(),
202
&self.metadata,
203
self.projection.as_ref().map(|x| x.columns.as_ref()),
204
Some(self.remaining),
205
block,
206
false,
207
&mut self.message_scratch,
208
&mut self.data_scratch,
209
self.checked,
210
);
211
self.remaining -= chunk.as_ref().map(|x| x.len()).unwrap_or_default();
212
213
let chunk = if let Some(ProjectionInfo { map, .. }) = &self.projection {
214
// re-order according to projection
215
chunk.map(|chunk| apply_projection(chunk, map))
216
} else {
217
chunk
218
};
219
Some(chunk)
220
}
221
}
222
223
/// A reader that has access to exactly one standalone IPC Block of an Arrow IPC file.
224
/// The block contains either a `RecordBatch` or a `DictionaryBatch`.
225
/// The `dictionaries` field must be initialized prior to decoding a `RecordBatch`.
226
pub struct BlockReader<R: Read + Seek> {
227
pub reader: R,
228
}
229
230
impl<R: Read + Seek> BlockReader<R> {
231
pub fn new(reader: R) -> Self {
232
Self { reader }
233
}
234
235
/// Reads the record batch header and returns its length (i.e., number of rows).
236
pub fn record_batch_num_rows(&mut self, message_scratch: &mut Vec<u8>) -> PolarsResult<usize> {
237
let offset: u64 = 0;
238
239
let message = get_message_from_block_offset(&mut self.reader, offset, message_scratch)?;
240
let batch = get_record_batch(message)?;
241
let out = batch.length().map(|l| usize::try_from(l).unwrap())?;
242
Ok(out)
243
}
244
245
/// Reads the record batch header and returns the custom_metadata.
246
pub fn record_batch_custom_metadata<'a>(
247
&mut self,
248
message_scratch: &'a mut Vec<u8>,
249
) -> PolarsResult<Option<Vec<KeyValueRef<'a>>>> {
250
let offset: u64 = 0;
251
let message = get_message_from_block_offset(&mut self.reader, offset, message_scratch)?;
252
let custom_metadata = message.custom_metadata()?;
253
254
custom_metadata
255
.map(|kv_results| {
256
kv_results
257
.into_iter()
258
.map(|res| {
259
res.map_err(|e| {
260
polars_err!(
261
ComputeError:
262
"failed to get KeyValue from IPC custom metadata: {}",
263
e
264
)
265
})
266
})
267
.collect::<Result<Vec<KeyValueRef>, _>>()
268
})
269
.transpose()
270
}
271
}
272
273