use std::ffi::c_void;
use std::fs::File;
use std::mem::ManuallyDrop;
use std::sync::LazyLock;
pub use memmap::Mmap;
use memmap::MmapOptions;
use polars_error::PolarsResult;
#[cfg(target_family = "unix")]
use polars_error::polars_bail;
use rayon::{ThreadPool, ThreadPoolBuilder};
use crate::mem::PAGE_SIZE;
pub static UNMAP_POOL: LazyLock<ThreadPool> = LazyLock::new(|| {
let thread_name = std::env::var("POLARS_THREAD_NAME").unwrap_or_else(|_| "polars".to_string());
ThreadPoolBuilder::new()
.num_threads(1)
.thread_name(move |i| format!("{thread_name}-unmap-{i}"))
.build()
.expect("could not spawn threads")
});
#[cfg(target_family = "unix")]
static MEMORY_MAPPED_FILES: std::sync::LazyLock<
std::sync::Mutex<std::collections::BTreeMap<(u64, u64), u32>>,
> = std::sync::LazyLock::new(|| std::sync::Mutex::new(Default::default()));
#[derive(Debug)]
pub struct MMapSemaphore {
#[cfg(target_family = "unix")]
key: (u64, u64),
mmap: ManuallyDrop<Mmap>,
}
impl Drop for MMapSemaphore {
fn drop(&mut self) {
#[cfg(target_family = "unix")]
{
let mut guard = MEMORY_MAPPED_FILES.lock().unwrap();
if let std::collections::btree_map::Entry::Occupied(mut e) = guard.entry(self.key) {
let v = e.get_mut();
*v -= 1;
if *v == 0 {
e.remove_entry();
}
}
}
unsafe {
let mmap = ManuallyDrop::take(&mut self.mmap);
let len = self.mmap.len();
if len >= 1024 * 1024 {
UNMAP_POOL.spawn(move || {
#[cfg(target_family = "unix")]
{
let chunk_size = (32_usize * 1024 * 1024).next_multiple_of(*PAGE_SIZE);
if len > chunk_size {
let mmap = ManuallyDrop::new(mmap);
let ptr: *const u8 = mmap.as_ptr();
let mut offset = 0;
while offset < len {
let remaining = len - offset;
libc::munmap(
ptr.add(offset) as *mut c_void,
remaining.min(chunk_size),
);
offset += chunk_size;
}
return;
}
}
drop(mmap)
});
} else {
drop(mmap);
}
}
}
}
impl MMapSemaphore {
pub fn new_from_file_with_options(
file: &File,
options: MmapOptions,
) -> PolarsResult<MMapSemaphore> {
let mmap = match unsafe { options.map(file) } {
Ok(m) => m,
#[cfg(target_family = "unix")]
Err(e) if e.raw_os_error() == Some(libc::ENODEV) => unsafe {
options.map_copy_read_only(file)?
},
Err(e) => return Err(e.into()),
};
#[cfg(target_family = "unix")]
{
use std::os::unix::fs::MetadataExt;
let metadata = file.metadata()?;
let mut guard = MEMORY_MAPPED_FILES.lock().unwrap();
let key = (metadata.dev(), metadata.ino());
match guard.entry(key) {
std::collections::btree_map::Entry::Occupied(mut e) => *e.get_mut() += 1,
std::collections::btree_map::Entry::Vacant(e) => _ = e.insert(1),
}
Ok(Self {
key,
mmap: ManuallyDrop::new(mmap),
})
}
#[cfg(not(target_family = "unix"))]
Ok(Self {
mmap: ManuallyDrop::new(mmap),
})
}
pub fn new_from_file(file: &File) -> PolarsResult<MMapSemaphore> {
Self::new_from_file_with_options(file, MmapOptions::default())
}
pub fn as_ptr(&self) -> *const u8 {
self.mmap.as_ptr()
}
}
impl AsRef<[u8]> for MMapSemaphore {
#[inline]
fn as_ref(&self) -> &[u8] {
self.mmap.as_ref()
}
}
pub fn ensure_not_mapped(
#[cfg_attr(not(target_family = "unix"), allow(unused))] file_md: &std::fs::Metadata,
) -> PolarsResult<()> {
#[cfg(target_family = "unix")]
{
use std::os::unix::fs::MetadataExt;
let guard = MEMORY_MAPPED_FILES.lock().unwrap();
if guard.contains_key(&(file_md.dev(), file_md.ino())) {
polars_bail!(ComputeError: "cannot write to file: already memory mapped");
}
}
Ok(())
}