Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/file_cache/cache.rs
8415 views
1
use std::sync::atomic::AtomicU64;
2
use std::sync::{Arc, LazyLock, RwLock};
3
4
use polars_core::config;
5
use polars_error::PolarsResult;
6
use polars_utils::aliases::PlHashMap;
7
use polars_utils::pl_path::PlRefPath;
8
9
use super::entry::{DATA_PREFIX, FileCacheEntry, METADATA_PREFIX};
10
use super::eviction::EvictionManager;
11
use super::file_fetcher::FileFetcher;
12
use super::utils::FILE_CACHE_PREFIX;
13
use crate::path_utils::ensure_directory_init;
14
15
pub static FILE_CACHE: LazyLock<FileCache> = LazyLock::new(|| {
16
let prefix = FILE_CACHE_PREFIX.clone();
17
18
if config::verbose() {
19
eprintln!("file cache prefix: {}", prefix);
20
}
21
22
let min_ttl = Arc::new(AtomicU64::from(get_env_file_cache_ttl()));
23
let notify_ttl_updated = Arc::new(tokio::sync::Notify::new());
24
25
let metadata_dir = prefix.join(std::str::from_utf8(&[METADATA_PREFIX]).unwrap());
26
if let Err(err) = ensure_directory_init(metadata_dir.as_std_path()) {
27
panic!(
28
"failed to create file cache metadata directory: path = {}, err = {}",
29
metadata_dir, err
30
)
31
}
32
33
let data_dir = prefix.join(std::str::from_utf8(&[DATA_PREFIX]).unwrap());
34
35
if let Err(err) = ensure_directory_init(data_dir.as_std_path()) {
36
panic!(
37
"failed to create file cache data directory: path = {}, err = {}",
38
data_dir, err
39
)
40
}
41
42
EvictionManager {
43
data_dir,
44
metadata_dir,
45
files_to_remove: None,
46
min_ttl: min_ttl.clone(),
47
notify_ttl_updated: notify_ttl_updated.clone(),
48
}
49
.run_in_background();
50
51
// Safety: We have created the data and metadata directories.
52
unsafe { FileCache::new_unchecked(prefix, min_ttl, notify_ttl_updated) }
53
});
54
55
pub struct FileCache {
56
prefix: PlRefPath,
57
entries: Arc<RwLock<PlHashMap<PlRefPath, Arc<FileCacheEntry>>>>,
58
min_ttl: Arc<AtomicU64>,
59
notify_ttl_updated: Arc<tokio::sync::Notify>,
60
}
61
62
impl FileCache {
63
/// # Safety
64
/// The following directories exist:
65
/// * `{prefix}/{METADATA_PREFIX}/`
66
/// * `{prefix}/{DATA_PREFIX}/`
67
unsafe fn new_unchecked(
68
prefix: PlRefPath,
69
min_ttl: Arc<AtomicU64>,
70
notify_ttl_updated: Arc<tokio::sync::Notify>,
71
) -> Self {
72
Self {
73
prefix,
74
entries: Default::default(),
75
min_ttl,
76
notify_ttl_updated,
77
}
78
}
79
80
/// If `uri` is a local path, it must be an absolute path. This is not exposed
81
/// for now - initialize entries using `init_entries_from_uri_list` instead.
82
pub(super) fn init_entry(
83
&self,
84
uri: PlRefPath,
85
get_file_fetcher: &dyn Fn() -> PolarsResult<Arc<dyn FileFetcher>>,
86
ttl: u64,
87
) -> PolarsResult<Arc<FileCacheEntry>> {
88
let verbose = config::verbose();
89
90
// Local paths must be absolute or else the cache would be wrong.
91
if !uri.has_scheme() {
92
debug_assert_eq!(
93
std::fs::canonicalize(uri.as_str())
94
.ok()
95
.and_then(|x| PlRefPath::try_from_pathbuf(x).ok())
96
.as_ref(),
97
Some(&uri)
98
)
99
}
100
101
if self
102
.min_ttl
103
.fetch_min(ttl, std::sync::atomic::Ordering::Relaxed)
104
< ttl
105
{
106
self.notify_ttl_updated.notify_one();
107
}
108
109
{
110
let entries = self.entries.read().unwrap();
111
112
if let Some(entry) = entries.get(&uri) {
113
if verbose {
114
eprintln!(
115
"[file_cache] init_entry: return existing entry for uri = {}",
116
uri.clone()
117
);
118
}
119
entry.update_ttl(ttl);
120
return Ok(entry.clone());
121
}
122
}
123
124
let uri_hash = blake3::hash(uri.as_bytes()).to_hex()[..32].to_string();
125
126
{
127
let mut entries = self.entries.write().unwrap();
128
129
// May have been raced
130
if let Some(entry) = entries.get(&uri) {
131
if verbose {
132
eprintln!(
133
"[file_cache] init_entry: return existing entry for uri = {} (lost init race)",
134
uri.clone()
135
);
136
}
137
entry.update_ttl(ttl);
138
return Ok(entry.clone());
139
}
140
141
if verbose {
142
eprintln!(
143
"[file_cache] init_entry: creating new entry for uri = {uri}, hash = {uri_hash}"
144
);
145
}
146
147
let entry = Arc::new(FileCacheEntry::new(
148
uri.clone(),
149
uri_hash,
150
self.prefix.clone(),
151
get_file_fetcher()?,
152
ttl,
153
));
154
entries.insert(uri.clone(), entry.clone());
155
Ok(entry)
156
}
157
}
158
159
/// This function can accept relative local paths.
160
pub fn get_entry(&self, path: PlRefPath) -> Option<Arc<FileCacheEntry>> {
161
if path.has_scheme() {
162
self.entries.read().unwrap().get(&path).cloned()
163
} else {
164
let p =
165
PlRefPath::try_from_pathbuf(std::fs::canonicalize(path.as_str()).unwrap()).unwrap();
166
self.entries.read().unwrap().get(&p).cloned()
167
}
168
}
169
}
170
171
pub fn get_env_file_cache_ttl() -> u64 {
172
std::env::var("POLARS_FILE_CACHE_TTL")
173
.map(|x| x.parse::<u64>().expect("integer"))
174
.unwrap_or(60 * 60)
175
}
176
177