Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/file_cache/cache.rs
6939 views
1
use std::path::Path;
2
use std::sync::atomic::AtomicU64;
3
use std::sync::{Arc, LazyLock, RwLock};
4
5
use polars_core::config;
6
use polars_error::PolarsResult;
7
use polars_utils::aliases::PlHashMap;
8
use polars_utils::plpath::PlPathRef;
9
10
use super::entry::{DATA_PREFIX, FileCacheEntry, METADATA_PREFIX};
11
use super::eviction::EvictionManager;
12
use super::file_fetcher::FileFetcher;
13
use super::utils::FILE_CACHE_PREFIX;
14
use crate::path_utils::ensure_directory_init;
15
16
pub static FILE_CACHE: LazyLock<FileCache> = LazyLock::new(|| {
17
let prefix = FILE_CACHE_PREFIX.as_ref();
18
let prefix = Arc::<Path>::from(prefix);
19
20
if config::verbose() {
21
eprintln!("file cache prefix: {}", prefix.to_str().unwrap());
22
}
23
24
let min_ttl = Arc::new(AtomicU64::from(get_env_file_cache_ttl()));
25
let notify_ttl_updated = Arc::new(tokio::sync::Notify::new());
26
27
let metadata_dir = prefix
28
.as_ref()
29
.join(std::str::from_utf8(&[METADATA_PREFIX]).unwrap())
30
.into_boxed_path();
31
if let Err(err) = ensure_directory_init(&metadata_dir) {
32
panic!(
33
"failed to create file cache metadata directory: path = {}, err = {}",
34
metadata_dir.to_str().unwrap(),
35
err
36
)
37
}
38
39
let data_dir = prefix
40
.as_ref()
41
.join(std::str::from_utf8(&[DATA_PREFIX]).unwrap())
42
.into_boxed_path();
43
44
if let Err(err) = ensure_directory_init(&data_dir) {
45
panic!(
46
"failed to create file cache data directory: path = {}, err = {}",
47
data_dir.to_str().unwrap(),
48
err
49
)
50
}
51
52
EvictionManager {
53
data_dir,
54
metadata_dir,
55
files_to_remove: None,
56
min_ttl: min_ttl.clone(),
57
notify_ttl_updated: notify_ttl_updated.clone(),
58
}
59
.run_in_background();
60
61
// Safety: We have created the data and metadata directories.
62
unsafe { FileCache::new_unchecked(prefix, min_ttl, notify_ttl_updated) }
63
});
64
65
pub struct FileCache {
66
prefix: Arc<Path>,
67
entries: Arc<RwLock<PlHashMap<Arc<str>, Arc<FileCacheEntry>>>>,
68
min_ttl: Arc<AtomicU64>,
69
notify_ttl_updated: Arc<tokio::sync::Notify>,
70
}
71
72
impl FileCache {
73
/// # Safety
74
/// The following directories exist:
75
/// * `{prefix}/{METADATA_PREFIX}/`
76
/// * `{prefix}/{DATA_PREFIX}/`
77
unsafe fn new_unchecked(
78
prefix: Arc<Path>,
79
min_ttl: Arc<AtomicU64>,
80
notify_ttl_updated: Arc<tokio::sync::Notify>,
81
) -> Self {
82
Self {
83
prefix,
84
entries: Default::default(),
85
min_ttl,
86
notify_ttl_updated,
87
}
88
}
89
90
/// If `uri` is a local path, it must be an absolute path. This is not exposed
91
/// for now - initialize entries using `init_entries_from_uri_list` instead.
92
pub(super) fn init_entry(
93
&self,
94
uri: Arc<str>,
95
get_file_fetcher: &dyn Fn() -> PolarsResult<Arc<dyn FileFetcher>>,
96
ttl: u64,
97
) -> PolarsResult<Arc<FileCacheEntry>> {
98
let verbose = config::verbose();
99
100
#[cfg(debug_assertions)]
101
{
102
// Local paths must be absolute or else the cache would be wrong.
103
if !PlPathRef::new(uri.as_ref()).is_cloud_url() {
104
let path = Path::new(uri.as_ref());
105
assert_eq!(path, std::fs::canonicalize(path).unwrap().as_path());
106
}
107
}
108
109
if self
110
.min_ttl
111
.fetch_min(ttl, std::sync::atomic::Ordering::Relaxed)
112
< ttl
113
{
114
self.notify_ttl_updated.notify_one();
115
}
116
117
{
118
let entries = self.entries.read().unwrap();
119
120
if let Some(entry) = entries.get(uri.as_ref()) {
121
if verbose {
122
eprintln!(
123
"[file_cache] init_entry: return existing entry for uri = {}",
124
uri.clone()
125
);
126
}
127
entry.update_ttl(ttl);
128
return Ok(entry.clone());
129
}
130
}
131
132
let uri_hash = blake3::hash(uri.as_bytes()).to_hex()[..32].to_string();
133
134
{
135
let mut entries = self.entries.write().unwrap();
136
137
// May have been raced
138
if let Some(entry) = entries.get(uri.as_ref()) {
139
if verbose {
140
eprintln!(
141
"[file_cache] init_entry: return existing entry for uri = {} (lost init race)",
142
uri.clone()
143
);
144
}
145
entry.update_ttl(ttl);
146
return Ok(entry.clone());
147
}
148
149
if verbose {
150
eprintln!(
151
"[file_cache] init_entry: creating new entry for uri = {uri}, hash = {uri_hash}"
152
);
153
}
154
155
let entry = Arc::new(FileCacheEntry::new(
156
uri.clone(),
157
uri_hash,
158
self.prefix.clone(),
159
get_file_fetcher()?,
160
ttl,
161
));
162
entries.insert(uri, entry.clone());
163
Ok(entry)
164
}
165
}
166
167
/// This function can accept relative local paths.
168
pub fn get_entry(&self, addr: PlPathRef<'_>) -> Option<Arc<FileCacheEntry>> {
169
match addr {
170
PlPathRef::Local(p) => {
171
let p = std::fs::canonicalize(p).unwrap();
172
self.entries
173
.read()
174
.unwrap()
175
.get(p.to_str().unwrap())
176
.map(Arc::clone)
177
},
178
PlPathRef::Cloud(p) => self.entries.read().unwrap().get(p.uri()).map(Arc::clone),
179
}
180
}
181
}
182
183
pub fn get_env_file_cache_ttl() -> u64 {
184
std::env::var("POLARS_FILE_CACHE_TTL")
185
.map(|x| x.parse::<u64>().expect("integer"))
186
.unwrap_or(60 * 60)
187
}
188
189