Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/io/ipc/read/reader.rs
6940 views
1
use std::io::{Read, Seek};
2
3
use polars_error::PolarsResult;
4
5
use super::common::*;
6
use super::file::{get_message_from_block, get_record_batch};
7
use super::{Dictionaries, FileMetadata, read_batch, read_file_dictionaries};
8
use crate::array::Array;
9
use crate::datatypes::ArrowSchema;
10
use crate::record_batch::RecordBatchT;
11
12
/// An iterator of [`RecordBatchT`]s from an Arrow IPC file.
13
pub struct FileReader<R: Read + Seek> {
14
reader: R,
15
metadata: FileMetadata,
16
// the dictionaries are going to be read
17
dictionaries: Option<Dictionaries>,
18
current_block: usize,
19
projection: Option<ProjectionInfo>,
20
remaining: usize,
21
data_scratch: Vec<u8>,
22
message_scratch: Vec<u8>,
23
}
24
25
impl<R: Read + Seek> FileReader<R> {
26
/// Creates a new [`FileReader`]. Use `projection` to only take certain columns.
27
/// # Panic
28
/// Panics iff the projection is not in increasing order (e.g. `[1, 0]` nor `[0, 1, 1]` are valid)
29
pub fn new(
30
reader: R,
31
metadata: FileMetadata,
32
projection: Option<Vec<usize>>,
33
limit: Option<usize>,
34
) -> Self {
35
let projection =
36
projection.map(|projection| prepare_projection(&metadata.schema, projection));
37
Self {
38
reader,
39
metadata,
40
dictionaries: Default::default(),
41
projection,
42
remaining: limit.unwrap_or(usize::MAX),
43
current_block: 0,
44
data_scratch: Default::default(),
45
message_scratch: Default::default(),
46
}
47
}
48
49
/// Creates a new [`FileReader`]. Use `projection` to only take certain columns.
50
/// # Panic
51
/// Panics iff the projection is not in increasing order (e.g. `[1, 0]` nor `[0, 1, 1]` are valid)
52
pub fn new_with_projection_info(
53
reader: R,
54
metadata: FileMetadata,
55
projection: Option<ProjectionInfo>,
56
limit: Option<usize>,
57
) -> Self {
58
Self {
59
reader,
60
metadata,
61
dictionaries: Default::default(),
62
projection,
63
remaining: limit.unwrap_or(usize::MAX),
64
current_block: 0,
65
data_scratch: Default::default(),
66
message_scratch: Default::default(),
67
}
68
}
69
70
/// Return the schema of the file
71
pub fn schema(&self) -> &ArrowSchema {
72
self.projection
73
.as_ref()
74
.map(|x| &x.schema)
75
.unwrap_or(&self.metadata.schema)
76
}
77
78
/// Returns the [`FileMetadata`]
79
pub fn metadata(&self) -> &FileMetadata {
80
&self.metadata
81
}
82
83
/// Consumes this FileReader, returning the underlying reader
84
pub fn into_inner(self) -> R {
85
self.reader
86
}
87
88
pub fn set_current_block(&mut self, idx: usize) {
89
self.current_block = idx;
90
}
91
92
pub fn get_current_block(&self) -> usize {
93
self.current_block
94
}
95
96
/// Get the inner memory scratches so they can be reused in a new writer.
97
/// This can be utilized to save memory allocations for performance reasons.
98
pub fn take_projection_info(&mut self) -> Option<ProjectionInfo> {
99
std::mem::take(&mut self.projection)
100
}
101
102
/// Get the inner memory scratches so they can be reused in a new writer.
103
/// This can be utilized to save memory allocations for performance reasons.
104
pub fn take_scratches(&mut self) -> (Vec<u8>, Vec<u8>) {
105
(
106
std::mem::take(&mut self.data_scratch),
107
std::mem::take(&mut self.message_scratch),
108
)
109
}
110
111
/// Set the inner memory scratches so they can be reused in a new writer.
112
/// This can be utilized to save memory allocations for performance reasons.
113
pub fn set_scratches(&mut self, scratches: (Vec<u8>, Vec<u8>)) {
114
(self.data_scratch, self.message_scratch) = scratches;
115
}
116
117
fn read_dictionaries(&mut self) -> PolarsResult<()> {
118
if self.dictionaries.is_none() {
119
self.dictionaries = Some(read_file_dictionaries(
120
&mut self.reader,
121
&self.metadata,
122
&mut self.data_scratch,
123
)?);
124
};
125
Ok(())
126
}
127
128
/// Skip over blocks until we have seen at most `offset` rows, returning how many rows we are
129
/// still too see.
130
///
131
/// This will never go over the `offset`. Meaning that if the `offset < current_block.len()`,
132
/// the block will not be skipped.
133
pub fn skip_blocks_till_limit(&mut self, offset: u64) -> PolarsResult<u64> {
134
let mut remaining_offset = offset;
135
136
for (i, block) in self.metadata.blocks.iter().enumerate() {
137
let message =
138
get_message_from_block(&mut self.reader, block, &mut self.message_scratch)?;
139
let record_batch = get_record_batch(message)?;
140
141
let length = record_batch.length()?;
142
let length = length as u64;
143
144
if length > remaining_offset {
145
self.current_block = i;
146
return Ok(remaining_offset);
147
}
148
149
remaining_offset -= length;
150
}
151
152
self.current_block = self.metadata.blocks.len();
153
Ok(remaining_offset)
154
}
155
156
pub fn next_record_batch(
157
&mut self,
158
) -> Option<PolarsResult<arrow_format::ipc::RecordBatchRef<'_>>> {
159
let block = self.metadata.blocks.get(self.current_block)?;
160
self.current_block += 1;
161
let message = get_message_from_block(&mut self.reader, block, &mut self.message_scratch);
162
Some(message.and_then(|m| get_record_batch(m)))
163
}
164
}
165
166
impl<R: Read + Seek> Iterator for FileReader<R> {
167
type Item = PolarsResult<RecordBatchT<Box<dyn Array>>>;
168
169
fn next(&mut self) -> Option<Self::Item> {
170
// get current block
171
if self.current_block == self.metadata.blocks.len() {
172
return None;
173
}
174
175
match self.read_dictionaries() {
176
Ok(_) => {},
177
Err(e) => return Some(Err(e)),
178
};
179
180
let block = self.current_block;
181
self.current_block += 1;
182
183
let chunk = read_batch(
184
&mut self.reader,
185
self.dictionaries.as_ref().unwrap(),
186
&self.metadata,
187
self.projection.as_ref().map(|x| x.columns.as_ref()),
188
Some(self.remaining),
189
block,
190
&mut self.message_scratch,
191
&mut self.data_scratch,
192
);
193
self.remaining -= chunk.as_ref().map(|x| x.len()).unwrap_or_default();
194
195
let chunk = if let Some(ProjectionInfo { map, .. }) = &self.projection {
196
// re-order according to projection
197
chunk.map(|chunk| apply_projection(chunk, map))
198
} else {
199
chunk
200
};
201
Some(chunk)
202
}
203
}
204
205