Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/ipc/mmap.rs
6939 views
1
use arrow::io::ipc::read;
2
use arrow::io::ipc::read::{Dictionaries, FileMetadata};
3
use arrow::mmap::{mmap_dictionaries_unchecked, mmap_unchecked};
4
use arrow::record_batch::RecordBatch;
5
use polars_core::prelude::*;
6
use polars_utils::mmap::MMapSemaphore;
7
8
use super::ipc_file::IpcReader;
9
use crate::mmap::MmapBytesReader;
10
use crate::predicates::PhysicalIoExpr;
11
use crate::shared::{ArrowReader, finish_reader};
12
use crate::utils::{apply_projection, columns_to_projection};
13
14
impl<R: MmapBytesReader> IpcReader<R> {
15
pub(super) fn finish_memmapped(
16
&mut self,
17
predicate: Option<Arc<dyn PhysicalIoExpr>>,
18
) -> PolarsResult<DataFrame> {
19
match self.reader.to_file() {
20
Some(file) => {
21
let semaphore = MMapSemaphore::new_from_file(file)?;
22
let metadata =
23
read::read_file_metadata(&mut std::io::Cursor::new(semaphore.as_ref()))?;
24
25
if let Some(columns) = &self.columns {
26
let schema = &metadata.schema;
27
let prj = columns_to_projection(columns, schema)?;
28
self.projection = Some(prj);
29
}
30
31
let schema = if let Some(projection) = &self.projection {
32
Arc::new(apply_projection(&metadata.schema, projection))
33
} else {
34
metadata.schema.clone()
35
};
36
37
let reader = MMapChunkIter::new(Arc::new(semaphore), metadata, &self.projection)?;
38
39
finish_reader(
40
reader,
41
// don't rechunk, that would trigger a read.
42
false,
43
self.n_rows,
44
predicate,
45
&schema,
46
self.row_index.clone(),
47
)
48
},
49
None => polars_bail!(ComputeError: "cannot memory-map, you must provide a file"),
50
}
51
}
52
}
53
54
struct MMapChunkIter<'a> {
55
dictionaries: Dictionaries,
56
metadata: FileMetadata,
57
mmap: Arc<MMapSemaphore>,
58
idx: usize,
59
end: usize,
60
projection: &'a Option<Vec<usize>>,
61
}
62
63
impl<'a> MMapChunkIter<'a> {
64
fn new(
65
mmap: Arc<MMapSemaphore>,
66
metadata: FileMetadata,
67
projection: &'a Option<Vec<usize>>,
68
) -> PolarsResult<Self> {
69
let end = metadata.blocks.len();
70
// mmap the dictionaries
71
let dictionaries = unsafe { mmap_dictionaries_unchecked(&metadata, mmap.clone())? };
72
73
Ok(Self {
74
dictionaries,
75
metadata,
76
mmap,
77
idx: 0,
78
end,
79
projection,
80
})
81
}
82
}
83
84
impl ArrowReader for MMapChunkIter<'_> {
85
fn next_record_batch(&mut self) -> PolarsResult<Option<RecordBatch>> {
86
if self.idx < self.end {
87
let chunk = unsafe {
88
mmap_unchecked(
89
&self.metadata,
90
&self.dictionaries,
91
self.mmap.clone(),
92
self.idx,
93
)
94
}?;
95
self.idx += 1;
96
let chunk = match &self.projection {
97
None => chunk,
98
Some(proj) => {
99
let length = chunk.len();
100
let (schema, cols) = chunk.into_schema_and_arrays();
101
let schema = schema.try_project_indices(proj).unwrap();
102
let arrays = proj.iter().map(|i| cols[*i].clone()).collect();
103
RecordBatch::new(length, Arc::new(schema), arrays)
104
},
105
};
106
Ok(Some(chunk))
107
} else {
108
Ok(None)
109
}
110
}
111
}
112
113