Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-utils/src/mmap.rs
8422 views
1
use std::ffi::c_void;
2
use std::fs::File;
3
use std::mem::ManuallyDrop;
4
use std::sync::LazyLock;
5
6
pub use memmap::Mmap;
7
use memmap::MmapOptions;
8
use polars_error::PolarsResult;
9
#[cfg(target_family = "unix")]
10
use polars_error::polars_bail;
11
use rayon::{ThreadPool, ThreadPoolBuilder};
12
13
use crate::mem::PAGE_SIZE;
14
15
pub static UNMAP_POOL: LazyLock<ThreadPool> = LazyLock::new(|| {
16
let thread_name = std::env::var("POLARS_THREAD_NAME").unwrap_or_else(|_| "polars".to_string());
17
ThreadPoolBuilder::new()
18
.num_threads(1)
19
.thread_name(move |i| format!("{thread_name}-unmap-{i}"))
20
.build()
21
.expect("could not spawn threads")
22
});
23
24
// Keep track of memory mapped files so we don't write to them while reading
25
// Use a btree as it uses less memory than a hashmap and this thing never shrinks.
26
// Write handle in Windows is exclusive, so this is only necessary in Unix.
27
#[cfg(target_family = "unix")]
28
static MEMORY_MAPPED_FILES: std::sync::LazyLock<
29
std::sync::Mutex<std::collections::BTreeMap<(u64, u64), u32>>,
30
> = std::sync::LazyLock::new(|| std::sync::Mutex::new(Default::default()));
31
32
#[derive(Debug)]
33
pub struct MMapSemaphore {
34
#[cfg(target_family = "unix")]
35
key: (u64, u64),
36
mmap: ManuallyDrop<Mmap>,
37
}
38
39
impl Drop for MMapSemaphore {
40
fn drop(&mut self) {
41
#[cfg(target_family = "unix")]
42
{
43
let mut guard = MEMORY_MAPPED_FILES.lock().unwrap();
44
if let std::collections::btree_map::Entry::Occupied(mut e) = guard.entry(self.key) {
45
let v = e.get_mut();
46
*v -= 1;
47
48
if *v == 0 {
49
e.remove_entry();
50
}
51
}
52
}
53
54
unsafe {
55
let mmap = ManuallyDrop::take(&mut self.mmap);
56
// If the unmap is 1 MiB or bigger, we do it in a background thread.
57
let len = self.mmap.len();
58
if len >= 1024 * 1024 {
59
UNMAP_POOL.spawn(move || {
60
#[cfg(target_family = "unix")]
61
{
62
// If the unmap is bigger than our chunk size (32 MiB), we do it in chunks.
63
// This is because munmap holds a lock on the unmap file, which we don't
64
// want to hold for extended periods of time.
65
let chunk_size = (32_usize * 1024 * 1024).next_multiple_of(*PAGE_SIZE);
66
if len > chunk_size {
67
let mmap = ManuallyDrop::new(mmap);
68
let ptr: *const u8 = mmap.as_ptr();
69
let mut offset = 0;
70
while offset < len {
71
let remaining = len - offset;
72
libc::munmap(
73
ptr.add(offset) as *mut c_void,
74
remaining.min(chunk_size),
75
);
76
offset += chunk_size;
77
}
78
return;
79
}
80
}
81
drop(mmap)
82
});
83
} else {
84
drop(mmap);
85
}
86
}
87
}
88
}
89
90
impl MMapSemaphore {
91
pub fn new_from_file_with_options(
92
file: &File,
93
options: MmapOptions,
94
) -> PolarsResult<MMapSemaphore> {
95
let mmap = match unsafe { options.map(file) } {
96
Ok(m) => m,
97
98
// Mmap can fail with ENODEV on filesystems which don't support
99
// MAP_SHARED, try MAP_PRIVATE instead, see #24343.
100
#[cfg(target_family = "unix")]
101
Err(e) if e.raw_os_error() == Some(libc::ENODEV) => unsafe {
102
options.map_copy_read_only(file)?
103
},
104
105
Err(e) => return Err(e.into()),
106
};
107
108
#[cfg(target_family = "unix")]
109
{
110
// TODO: We aren't handling the case where the file is already open in write-mode here.
111
112
use std::os::unix::fs::MetadataExt;
113
let metadata = file.metadata()?;
114
115
let mut guard = MEMORY_MAPPED_FILES.lock().unwrap();
116
let key = (metadata.dev(), metadata.ino());
117
match guard.entry(key) {
118
std::collections::btree_map::Entry::Occupied(mut e) => *e.get_mut() += 1,
119
std::collections::btree_map::Entry::Vacant(e) => _ = e.insert(1),
120
}
121
Ok(Self {
122
key,
123
mmap: ManuallyDrop::new(mmap),
124
})
125
}
126
127
#[cfg(not(target_family = "unix"))]
128
Ok(Self {
129
mmap: ManuallyDrop::new(mmap),
130
})
131
}
132
133
pub fn new_from_file(file: &File) -> PolarsResult<MMapSemaphore> {
134
Self::new_from_file_with_options(file, MmapOptions::default())
135
}
136
137
pub fn as_ptr(&self) -> *const u8 {
138
self.mmap.as_ptr()
139
}
140
}
141
142
impl AsRef<[u8]> for MMapSemaphore {
143
#[inline]
144
fn as_ref(&self) -> &[u8] {
145
self.mmap.as_ref()
146
}
147
}
148
149
pub fn ensure_not_mapped(
150
#[cfg_attr(not(target_family = "unix"), allow(unused))] file_md: &std::fs::Metadata,
151
) -> PolarsResult<()> {
152
// TODO: We need to actually register that this file has been write-opened and prevent
153
// read-opening this file based on that.
154
#[cfg(target_family = "unix")]
155
{
156
use std::os::unix::fs::MetadataExt;
157
let guard = MEMORY_MAPPED_FILES.lock().unwrap();
158
if guard.contains_key(&(file_md.dev(), file_md.ino())) {
159
polars_bail!(ComputeError: "cannot write to file: already memory mapped");
160
}
161
}
162
Ok(())
163
}
164
165