Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-utils/src/mmap.rs
6939 views
1
use std::ffi::c_void;
2
use std::fs::File;
3
use std::io;
4
use std::mem::ManuallyDrop;
5
use std::sync::LazyLock;
6
7
pub use memmap::Mmap;
8
9
mod private {
10
use std::fs::File;
11
use std::ops::Deref;
12
use std::sync::Arc;
13
14
use polars_error::PolarsResult;
15
16
use super::MMapSemaphore;
17
use crate::mem::prefetch::prefetch_l2;
18
19
/// A read-only reference to a slice of memory that can potentially be memory-mapped.
20
///
21
/// A reference count is kept to the underlying buffer to ensure the memory is kept alive.
22
/// [`MemSlice::slice`] can be used to slice the memory in a zero-copy manner.
23
///
24
/// This still owns the all the original memory and therefore should probably not be a long-lasting
25
/// structure.
26
#[derive(Clone, Debug)]
27
pub struct MemSlice {
28
// Store the `&[u8]` to make the `Deref` free.
29
// `slice` is not 'static - it is backed by `inner`. This is safe as long as `slice` is not
30
// directly accessed, and we are in a private module to guarantee that. Access should only
31
// be done through `Deref<Target = [u8]>`, which automatically gives the correct lifetime.
32
slice: &'static [u8],
33
#[allow(unused)]
34
inner: MemSliceInner,
35
}
36
37
/// Keeps the underlying buffer alive. This should be cheaply cloneable.
38
#[derive(Clone, Debug)]
39
#[allow(unused)]
40
enum MemSliceInner {
41
Bytes(bytes::Bytes), // Separate because it does atomic refcounting internally
42
Arc(Arc<dyn std::fmt::Debug + Send + Sync>),
43
}
44
45
impl Deref for MemSlice {
46
type Target = [u8];
47
48
#[inline(always)]
49
fn deref(&self) -> &Self::Target {
50
self.slice
51
}
52
}
53
54
impl AsRef<[u8]> for MemSlice {
55
#[inline(always)]
56
fn as_ref(&self) -> &[u8] {
57
self.slice
58
}
59
}
60
61
impl Default for MemSlice {
62
fn default() -> Self {
63
Self::from_bytes(bytes::Bytes::new())
64
}
65
}
66
67
impl From<Vec<u8>> for MemSlice {
68
fn from(value: Vec<u8>) -> Self {
69
Self::from_vec(value)
70
}
71
}
72
73
impl MemSlice {
74
pub const EMPTY: Self = Self::from_static(&[]);
75
76
/// Copy the contents into a new owned `Vec`
77
#[inline(always)]
78
pub fn to_vec(self) -> Vec<u8> {
79
<[u8]>::to_vec(self.deref())
80
}
81
82
/// Construct a `MemSlice` from an existing `Vec<u8>`. This is zero-copy.
83
#[inline]
84
pub fn from_vec(v: Vec<u8>) -> Self {
85
Self::from_bytes(bytes::Bytes::from(v))
86
}
87
88
/// Construct a `MemSlice` from [`bytes::Bytes`]. This is zero-copy.
89
#[inline]
90
pub fn from_bytes(bytes: bytes::Bytes) -> Self {
91
Self {
92
slice: unsafe { std::mem::transmute::<&[u8], &'static [u8]>(bytes.as_ref()) },
93
inner: MemSliceInner::Bytes(bytes),
94
}
95
}
96
97
#[inline]
98
pub fn from_mmap(mmap: Arc<MMapSemaphore>) -> Self {
99
Self {
100
slice: unsafe {
101
std::mem::transmute::<&[u8], &'static [u8]>(mmap.as_ref().as_ref())
102
},
103
inner: MemSliceInner::Arc(mmap),
104
}
105
}
106
107
#[inline]
108
pub fn from_arc<T>(slice: &[u8], arc: Arc<T>) -> Self
109
where
110
T: std::fmt::Debug + Send + Sync + 'static,
111
{
112
Self {
113
slice: unsafe { std::mem::transmute::<&[u8], &'static [u8]>(slice) },
114
inner: MemSliceInner::Arc(arc),
115
}
116
}
117
118
#[inline]
119
pub fn from_file(file: &File) -> PolarsResult<Self> {
120
let mmap = MMapSemaphore::new_from_file(file)?;
121
Ok(Self::from_mmap(Arc::new(mmap)))
122
}
123
124
/// Construct a `MemSlice` that simply wraps around a `&[u8]`.
125
#[inline]
126
pub const fn from_static(slice: &'static [u8]) -> Self {
127
let inner = MemSliceInner::Bytes(bytes::Bytes::from_static(slice));
128
Self { slice, inner }
129
}
130
131
/// Attempt to prefetch the memory belonging to to this [`MemSlice`]
132
#[inline]
133
pub fn prefetch(&self) {
134
prefetch_l2(self.as_ref());
135
}
136
137
/// # Panics
138
/// Panics if range is not in bounds.
139
#[inline]
140
#[track_caller]
141
pub fn slice(&self, range: std::ops::Range<usize>) -> Self {
142
let mut out = self.clone();
143
out.slice = &out.slice[range];
144
out
145
}
146
}
147
148
impl From<bytes::Bytes> for MemSlice {
149
fn from(value: bytes::Bytes) -> Self {
150
Self::from_bytes(value)
151
}
152
}
153
}
154
155
use memmap::MmapOptions;
156
use polars_error::PolarsResult;
157
#[cfg(target_family = "unix")]
158
use polars_error::polars_bail;
159
pub use private::MemSlice;
160
use rayon::{ThreadPool, ThreadPoolBuilder};
161
162
use crate::mem::PAGE_SIZE;
163
164
/// A cursor over a [`MemSlice`].
165
#[derive(Debug, Clone)]
166
pub struct MemReader {
167
data: MemSlice,
168
position: usize,
169
}
170
171
impl MemReader {
172
pub fn new(data: MemSlice) -> Self {
173
Self { data, position: 0 }
174
}
175
176
#[inline(always)]
177
pub fn remaining_len(&self) -> usize {
178
self.data.len() - self.position
179
}
180
181
#[inline(always)]
182
pub fn total_len(&self) -> usize {
183
self.data.len()
184
}
185
186
#[inline(always)]
187
pub fn position(&self) -> usize {
188
self.position
189
}
190
191
/// Construct a `MemSlice` from an existing `Vec<u8>`. This is zero-copy.
192
#[inline(always)]
193
pub fn from_vec(v: Vec<u8>) -> Self {
194
Self::new(MemSlice::from_vec(v))
195
}
196
197
/// Construct a `MemSlice` from [`bytes::Bytes`]. This is zero-copy.
198
#[inline(always)]
199
pub fn from_bytes(bytes: bytes::Bytes) -> Self {
200
Self::new(MemSlice::from_bytes(bytes))
201
}
202
203
// Construct a `MemSlice` that simply wraps around a `&[u8]`. The caller must ensure the
204
/// slice outlives the returned `MemSlice`.
205
#[inline]
206
pub fn from_slice(slice: &'static [u8]) -> Self {
207
Self::new(MemSlice::from_static(slice))
208
}
209
210
#[inline(always)]
211
pub fn from_reader<R: io::Read>(mut reader: R) -> io::Result<Self> {
212
let mut vec = Vec::new();
213
reader.read_to_end(&mut vec)?;
214
Ok(Self::from_vec(vec))
215
}
216
217
#[inline(always)]
218
pub fn read_slice(&mut self, n: usize) -> MemSlice {
219
let start = self.position;
220
let end = usize::min(self.position + n, self.data.len());
221
self.position = end;
222
self.data.slice(start..end)
223
}
224
}
225
226
impl From<MemSlice> for MemReader {
227
fn from(data: MemSlice) -> Self {
228
Self { data, position: 0 }
229
}
230
}
231
232
impl io::Read for MemReader {
233
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
234
let n = usize::min(buf.len(), self.remaining_len());
235
buf[..n].copy_from_slice(&self.data[self.position..self.position + n]);
236
self.position += n;
237
Ok(n)
238
}
239
}
240
241
impl io::Seek for MemReader {
242
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
243
let position = match pos {
244
io::SeekFrom::Start(position) => usize::min(position as usize, self.total_len()),
245
io::SeekFrom::End(offset) => {
246
let Some(position) = self.total_len().checked_add_signed(offset as isize) else {
247
return Err(io::Error::other("Seek before to before buffer"));
248
};
249
250
position
251
},
252
io::SeekFrom::Current(offset) => {
253
let Some(position) = self.position.checked_add_signed(offset as isize) else {
254
return Err(io::Error::other("Seek before to before buffer"));
255
};
256
257
position
258
},
259
};
260
261
self.position = position;
262
263
Ok(position as u64)
264
}
265
}
266
267
pub static UNMAP_POOL: LazyLock<ThreadPool> = LazyLock::new(|| {
268
let thread_name = std::env::var("POLARS_THREAD_NAME").unwrap_or_else(|_| "polars".to_string());
269
ThreadPoolBuilder::new()
270
.num_threads(1)
271
.thread_name(move |i| format!("{thread_name}-unmap-{i}"))
272
.build()
273
.expect("could not spawn threads")
274
});
275
276
// Keep track of memory mapped files so we don't write to them while reading
277
// Use a btree as it uses less memory than a hashmap and this thing never shrinks.
278
// Write handle in Windows is exclusive, so this is only necessary in Unix.
279
#[cfg(target_family = "unix")]
280
static MEMORY_MAPPED_FILES: std::sync::LazyLock<
281
std::sync::Mutex<std::collections::BTreeMap<(u64, u64), u32>>,
282
> = std::sync::LazyLock::new(|| std::sync::Mutex::new(Default::default()));
283
284
#[derive(Debug)]
285
pub struct MMapSemaphore {
286
#[cfg(target_family = "unix")]
287
key: (u64, u64),
288
mmap: ManuallyDrop<Mmap>,
289
}
290
291
impl Drop for MMapSemaphore {
292
fn drop(&mut self) {
293
#[cfg(target_family = "unix")]
294
{
295
let mut guard = MEMORY_MAPPED_FILES.lock().unwrap();
296
if let std::collections::btree_map::Entry::Occupied(mut e) = guard.entry(self.key) {
297
let v = e.get_mut();
298
*v -= 1;
299
300
if *v == 0 {
301
e.remove_entry();
302
}
303
}
304
}
305
306
unsafe {
307
let mmap = ManuallyDrop::take(&mut self.mmap);
308
// If the unmap is 1 MiB or bigger, we do it in a background thread.
309
let len = self.mmap.len();
310
if len >= 1024 * 1024 {
311
UNMAP_POOL.spawn(move || {
312
#[cfg(target_family = "unix")]
313
{
314
// If the unmap is bigger than our chunk size (32 MiB), we do it in chunks.
315
// This is because munmap holds a lock on the unmap file, which we don't
316
// want to hold for extended periods of time.
317
let chunk_size = (32_usize * 1024 * 1024).next_multiple_of(*PAGE_SIZE);
318
if len > chunk_size {
319
let mmap = ManuallyDrop::new(mmap);
320
let ptr: *const u8 = mmap.as_ptr();
321
let mut offset = 0;
322
while offset < len {
323
let remaining = len - offset;
324
libc::munmap(
325
ptr.add(offset) as *mut c_void,
326
remaining.min(chunk_size),
327
);
328
offset += chunk_size;
329
}
330
return;
331
}
332
}
333
drop(mmap)
334
});
335
} else {
336
drop(mmap);
337
}
338
}
339
}
340
}
341
342
impl MMapSemaphore {
343
pub fn new_from_file_with_options(
344
file: &File,
345
options: MmapOptions,
346
) -> PolarsResult<MMapSemaphore> {
347
let mmap = match unsafe { options.map(file) } {
348
Ok(m) => m,
349
350
// Mmap can fail with ENODEV on filesystems which don't support
351
// MAP_SHARED, try MAP_PRIVATE instead, see #24343.
352
#[cfg(target_family = "unix")]
353
Err(e) if e.raw_os_error() == Some(libc::ENODEV) => unsafe {
354
options.map_copy_read_only(file)?
355
},
356
357
Err(e) => return Err(e.into()),
358
};
359
360
#[cfg(target_family = "unix")]
361
{
362
// FIXME: We aren't handling the case where the file is already open in write-mode here.
363
364
use std::os::unix::fs::MetadataExt;
365
let metadata = file.metadata()?;
366
367
let mut guard = MEMORY_MAPPED_FILES.lock().unwrap();
368
let key = (metadata.dev(), metadata.ino());
369
match guard.entry(key) {
370
std::collections::btree_map::Entry::Occupied(mut e) => *e.get_mut() += 1,
371
std::collections::btree_map::Entry::Vacant(e) => _ = e.insert(1),
372
}
373
Ok(Self {
374
key,
375
mmap: ManuallyDrop::new(mmap),
376
})
377
}
378
379
#[cfg(not(target_family = "unix"))]
380
Ok(Self {
381
mmap: ManuallyDrop::new(mmap),
382
})
383
}
384
385
pub fn new_from_file(file: &File) -> PolarsResult<MMapSemaphore> {
386
Self::new_from_file_with_options(file, MmapOptions::default())
387
}
388
389
pub fn as_ptr(&self) -> *const u8 {
390
self.mmap.as_ptr()
391
}
392
}
393
394
impl AsRef<[u8]> for MMapSemaphore {
395
#[inline]
396
fn as_ref(&self) -> &[u8] {
397
self.mmap.as_ref()
398
}
399
}
400
401
pub fn ensure_not_mapped(
402
#[cfg_attr(not(target_family = "unix"), allow(unused))] file_md: &std::fs::Metadata,
403
) -> PolarsResult<()> {
404
// TODO: We need to actually register that this file has been write-opened and prevent
405
// read-opening this file based on that.
406
#[cfg(target_family = "unix")]
407
{
408
use std::os::unix::fs::MetadataExt;
409
let guard = MEMORY_MAPPED_FILES.lock().unwrap();
410
if guard.contains_key(&(file_md.dev(), file_md.ino())) {
411
polars_bail!(ComputeError: "cannot write to file: already memory mapped");
412
}
413
}
414
Ok(())
415
}
416
417
mod tests {
418
#[test]
419
fn test_mem_slice_zero_copy() {
420
use std::sync::Arc;
421
422
use super::MemSlice;
423
424
{
425
let vec = vec![1u8, 2, 3, 4, 5];
426
let ptr = vec.as_ptr();
427
428
let mem_slice = MemSlice::from_vec(vec);
429
let ptr_out = mem_slice.as_ptr();
430
431
assert_eq!(ptr_out, ptr);
432
}
433
434
{
435
let mut vec = vec![1u8, 2, 3, 4, 5];
436
vec.truncate(2);
437
let ptr = vec.as_ptr();
438
439
let mem_slice = MemSlice::from_vec(vec);
440
let ptr_out = mem_slice.as_ptr();
441
442
assert_eq!(ptr_out, ptr);
443
}
444
445
{
446
let bytes = bytes::Bytes::from(vec![1u8, 2, 3, 4, 5]);
447
let ptr = bytes.as_ptr();
448
449
let mem_slice = MemSlice::from_bytes(bytes);
450
let ptr_out = mem_slice.as_ptr();
451
452
assert_eq!(ptr_out, ptr);
453
}
454
455
{
456
use crate::mmap::MMapSemaphore;
457
458
let path = "../../examples/datasets/foods1.csv";
459
let file = std::fs::File::open(path).unwrap();
460
let mmap = MMapSemaphore::new_from_file(&file).unwrap();
461
let ptr = mmap.as_ptr();
462
463
let mem_slice = MemSlice::from_mmap(Arc::new(mmap));
464
let ptr_out = mem_slice.as_ptr();
465
466
assert_eq!(ptr_out, ptr);
467
}
468
469
{
470
let vec = vec![1u8, 2, 3, 4, 5];
471
let slice = vec.as_slice();
472
let ptr = slice.as_ptr();
473
474
let mem_slice = MemSlice::from_static(unsafe {
475
std::mem::transmute::<&[u8], &'static [u8]>(slice)
476
});
477
let ptr_out = mem_slice.as_ptr();
478
479
assert_eq!(ptr_out, ptr);
480
}
481
}
482
483
#[test]
484
fn test_mem_slice_slicing() {
485
use super::MemSlice;
486
487
{
488
let vec = vec![1u8, 2, 3, 4, 5];
489
let slice = vec.as_slice();
490
491
let mem_slice = MemSlice::from_static(unsafe {
492
std::mem::transmute::<&[u8], &'static [u8]>(slice)
493
});
494
495
let out = &*mem_slice.slice(3..5);
496
assert_eq!(out, &slice[3..5]);
497
assert_eq!(out.as_ptr(), slice[3..5].as_ptr());
498
}
499
}
500
}
501
502