Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/file_cache/entry.rs
6939 views
1
use std::io::{Seek, SeekFrom};
2
use std::path::{Path, PathBuf};
3
use std::sync::atomic::AtomicU64;
4
use std::sync::{Arc, LazyLock, Mutex};
5
6
use fs4::fs_std::FileExt;
7
use polars_core::config;
8
use polars_error::{PolarsError, PolarsResult, polars_bail, to_compute_err};
9
10
use super::cache_lock::{self, GLOBAL_FILE_CACHE_LOCK};
11
use super::file_fetcher::{FileFetcher, RemoteMetadata};
12
use super::file_lock::{FileLock, FileLockAnyGuard};
13
use super::metadata::{EntryMetadata, FileVersion};
14
use super::utils::update_last_accessed;
15
16
pub(super) const DATA_PREFIX: u8 = b'd';
17
pub(super) const METADATA_PREFIX: u8 = b'm';
18
19
struct CachedData {
20
last_modified: u64,
21
metadata: Arc<EntryMetadata>,
22
data_file_path: PathBuf,
23
}
24
25
struct Inner {
26
uri: Arc<str>,
27
uri_hash: String,
28
path_prefix: Arc<Path>,
29
metadata: FileLock<PathBuf>,
30
cached_data: Option<CachedData>,
31
ttl: Arc<AtomicU64>,
32
file_fetcher: Arc<dyn FileFetcher>,
33
}
34
35
struct EntryData {
36
uri: Arc<str>,
37
inner: Mutex<Inner>,
38
ttl: Arc<AtomicU64>,
39
}
40
41
pub struct FileCacheEntry(EntryData);
42
43
impl EntryMetadata {
44
fn matches_remote_metadata(&self, remote_metadata: &RemoteMetadata) -> bool {
45
self.remote_version == remote_metadata.version && self.local_size == remote_metadata.size
46
}
47
}
48
49
impl Inner {
50
fn try_open_assume_latest(&mut self) -> PolarsResult<std::fs::File> {
51
let verbose = config::verbose();
52
53
{
54
let cache_guard = GLOBAL_FILE_CACHE_LOCK.lock_shared();
55
// We want to use an exclusive lock here to avoid an API call in the case where only the
56
// local TTL was updated.
57
let metadata_file = &mut self.metadata.acquire_exclusive().unwrap();
58
update_last_accessed(metadata_file);
59
60
if let Ok(metadata) = self.try_get_metadata(metadata_file, &cache_guard) {
61
let data_file_path = self.get_cached_data_file_path();
62
63
if metadata.compare_local_state(data_file_path).is_ok() {
64
if verbose {
65
eprintln!(
66
"[file_cache::entry] try_open_assume_latest: opening already fetched file for uri = {}",
67
self.uri.clone()
68
);
69
}
70
return Ok(finish_open(data_file_path, metadata_file));
71
}
72
}
73
}
74
75
if verbose {
76
eprintln!(
77
"[file_cache::entry] try_open_assume_latest: did not find cached file for uri = {}",
78
self.uri.clone()
79
);
80
}
81
82
self.try_open_check_latest()
83
}
84
85
fn try_open_check_latest(&mut self) -> PolarsResult<std::fs::File> {
86
let verbose = config::verbose();
87
let remote_metadata = &self.file_fetcher.fetch_metadata()?;
88
let cache_guard = GLOBAL_FILE_CACHE_LOCK.lock_shared();
89
90
{
91
let metadata_file = &mut self.metadata.acquire_shared().unwrap();
92
update_last_accessed(metadata_file);
93
94
if let Ok(metadata) = self.try_get_metadata(metadata_file, &cache_guard) {
95
if metadata.matches_remote_metadata(remote_metadata) {
96
let data_file_path = self.get_cached_data_file_path();
97
98
if metadata.compare_local_state(data_file_path).is_ok() {
99
if verbose {
100
eprintln!(
101
"[file_cache::entry] try_open_check_latest: opening already fetched file for uri = {}",
102
self.uri.clone()
103
);
104
}
105
return Ok(finish_open(data_file_path, metadata_file));
106
}
107
}
108
}
109
}
110
111
let metadata_file = &mut self.metadata.acquire_exclusive().unwrap();
112
let metadata = self
113
.try_get_metadata(metadata_file, &cache_guard)
114
// Safety: `metadata_file` is an exclusive guard.
115
.unwrap_or_else(|_| {
116
Arc::new(EntryMetadata::new(
117
self.uri.clone(),
118
self.ttl.load(std::sync::atomic::Ordering::Relaxed),
119
))
120
});
121
122
if metadata.matches_remote_metadata(remote_metadata) {
123
let data_file_path = self.get_cached_data_file_path();
124
125
if metadata.compare_local_state(data_file_path).is_ok() {
126
if verbose {
127
eprintln!(
128
"[file_cache::entry] try_open_check_latest: opening already fetched file (lost race) for uri = {}",
129
self.uri.clone()
130
);
131
}
132
return Ok(finish_open(data_file_path, metadata_file));
133
}
134
}
135
136
if verbose {
137
eprintln!(
138
"[file_cache::entry] try_open_check_latest: fetching new data file for uri = {}, remote_version = {:?}, remote_size = {}",
139
self.uri.clone(),
140
remote_metadata.version,
141
remote_metadata.size
142
);
143
}
144
145
let data_file_path = &get_data_file_path(
146
self.path_prefix.to_str().unwrap().as_bytes(),
147
self.uri_hash.as_bytes(),
148
&remote_metadata.version,
149
);
150
// Remove the file if it exists, since it doesn't match the metadata.
151
// This could be left from an aborted process.
152
let _ = std::fs::remove_file(data_file_path);
153
if !self.file_fetcher.fetches_as_symlink() {
154
let file = std::fs::OpenOptions::new()
155
.write(true)
156
.create(true)
157
.truncate(true)
158
.open(data_file_path)
159
.map_err(PolarsError::from)?;
160
161
// * Some(true) => always raise
162
// * Some(false) => never raise
163
// * None => do not raise if fallocate() is not permitted, otherwise raise.
164
static RAISE_ALLOC_ERROR: LazyLock<Option<bool>> = LazyLock::new(|| {
165
let v = match std::env::var("POLARS_IGNORE_FILE_CACHE_ALLOCATE_ERROR").as_deref() {
166
Ok("1") => Some(false),
167
Ok("0") => Some(true),
168
Err(_) => None,
169
Ok(v) => {
170
panic!("invalid value {v} for POLARS_IGNORE_FILE_CACHE_ALLOCATE_ERROR")
171
},
172
};
173
if config::verbose() {
174
eprintln!("[file_cache]: RAISE_ALLOC_ERROR: {v:?}");
175
}
176
v
177
});
178
179
// Initialize it to get the verbose print
180
let raise_alloc_err = *RAISE_ALLOC_ERROR;
181
182
file.lock_exclusive().unwrap();
183
if let Err(e) = file.allocate(remote_metadata.size) {
184
let msg = format!(
185
"failed to reserve {} bytes on disk to download uri = {}: {:?}",
186
remote_metadata.size,
187
self.uri.as_ref(),
188
e
189
);
190
191
if raise_alloc_err == Some(true)
192
|| (raise_alloc_err.is_none() && file.allocate(1).is_ok())
193
{
194
polars_bail!(ComputeError: msg)
195
} else if config::verbose() {
196
eprintln!("[file_cache]: warning: {msg}")
197
}
198
}
199
}
200
self.file_fetcher.fetch(data_file_path)?;
201
202
// Don't do this on windows as it will break setting last accessed times.
203
#[cfg(target_family = "unix")]
204
if !self.file_fetcher.fetches_as_symlink() {
205
let mut perms = std::fs::metadata(data_file_path.clone())
206
.unwrap()
207
.permissions();
208
perms.set_readonly(true);
209
std::fs::set_permissions(data_file_path, perms).unwrap();
210
}
211
212
let data_file_metadata = std::fs::metadata(data_file_path).unwrap();
213
let local_last_modified = super::utils::last_modified_u64(&data_file_metadata);
214
let local_size = data_file_metadata.len();
215
216
if local_size != remote_metadata.size {
217
polars_bail!(ComputeError: "downloaded file size ({}) does not match expected size ({})", local_size, remote_metadata.size);
218
}
219
220
let mut metadata = metadata;
221
let metadata = Arc::make_mut(&mut metadata);
222
metadata.local_last_modified = local_last_modified;
223
metadata.local_size = local_size;
224
metadata.remote_version = remote_metadata.version.clone();
225
226
if let Err(e) = metadata.compare_local_state(data_file_path) {
227
panic!("metadata mismatch after file fetch: {e}");
228
}
229
230
let data_file = finish_open(data_file_path, metadata_file);
231
232
metadata_file.set_len(0).unwrap();
233
metadata_file.seek(SeekFrom::Start(0)).unwrap();
234
metadata
235
.try_write(&mut **metadata_file)
236
.map_err(to_compute_err)?;
237
238
Ok(data_file)
239
}
240
241
/// Try to read the metadata from disk. If `F` is an exclusive guard, this
242
/// will update the TTL stored in the metadata file if it does not match.
243
fn try_get_metadata<F: FileLockAnyGuard>(
244
&mut self,
245
metadata_file: &mut F,
246
_cache_guard: &cache_lock::GlobalFileCacheGuardAny,
247
) -> PolarsResult<Arc<EntryMetadata>> {
248
let last_modified = super::utils::last_modified_u64(&metadata_file.metadata().unwrap());
249
let ttl = self.ttl.load(std::sync::atomic::Ordering::Relaxed);
250
251
for _ in 0..2 {
252
if let Some(ref cached) = self.cached_data {
253
if cached.last_modified == last_modified {
254
if cached.metadata.ttl != ttl {
255
polars_bail!(ComputeError: "TTL mismatch");
256
}
257
258
if cached.metadata.uri != self.uri {
259
unimplemented!(
260
"hash collision: uri1 = {}, uri2 = {}, hash = {}",
261
cached.metadata.uri,
262
self.uri,
263
self.uri_hash,
264
);
265
}
266
267
return Ok(cached.metadata.clone());
268
}
269
}
270
271
// Ensure cache is unset if read fails
272
self.cached_data = None;
273
274
let mut metadata =
275
EntryMetadata::try_from_reader(&mut **metadata_file).map_err(to_compute_err)?;
276
277
// Note this means if multiple processes on the same system set a
278
// different TTL for the same path, the metadata file will constantly
279
// get overwritten.
280
if metadata.ttl != ttl {
281
if F::IS_EXCLUSIVE {
282
metadata.ttl = ttl;
283
metadata_file.set_len(0).unwrap();
284
metadata_file.seek(SeekFrom::Start(0)).unwrap();
285
metadata
286
.try_write(&mut **metadata_file)
287
.map_err(to_compute_err)?;
288
} else {
289
polars_bail!(ComputeError: "TTL mismatch");
290
}
291
}
292
293
let metadata = Arc::new(metadata);
294
let data_file_path = get_data_file_path(
295
self.path_prefix.to_str().unwrap().as_bytes(),
296
self.uri_hash.as_bytes(),
297
&metadata.remote_version,
298
);
299
self.cached_data = Some(CachedData {
300
last_modified,
301
metadata,
302
data_file_path,
303
});
304
}
305
306
unreachable!();
307
}
308
309
/// # Panics
310
/// Panics if `self.cached_data` is `None`.
311
fn get_cached_data_file_path(&self) -> &Path {
312
&self.cached_data.as_ref().unwrap().data_file_path
313
}
314
}
315
316
impl FileCacheEntry {
317
pub(crate) fn new(
318
uri: Arc<str>,
319
uri_hash: String,
320
path_prefix: Arc<Path>,
321
file_fetcher: Arc<dyn FileFetcher>,
322
file_cache_ttl: u64,
323
) -> Self {
324
let metadata = FileLock::from(get_metadata_file_path(
325
path_prefix.to_str().unwrap().as_bytes(),
326
uri_hash.as_bytes(),
327
));
328
329
debug_assert!(
330
Arc::ptr_eq(&uri, file_fetcher.get_uri()),
331
"impl error: entry uri != file_fetcher uri"
332
);
333
334
let ttl = Arc::new(AtomicU64::from(file_cache_ttl));
335
336
Self(EntryData {
337
uri: uri.clone(),
338
inner: Mutex::new(Inner {
339
uri,
340
uri_hash,
341
path_prefix,
342
metadata,
343
cached_data: None,
344
ttl: ttl.clone(),
345
file_fetcher,
346
}),
347
ttl,
348
})
349
}
350
351
pub fn uri(&self) -> &Arc<str> {
352
&self.0.uri
353
}
354
355
/// Directly returns the cached file if it finds one without checking if
356
/// there is a newer version on the remote. This does not make any API calls
357
/// if it finds a cached file, otherwise it simply downloads the file.
358
pub fn try_open_assume_latest(&self) -> PolarsResult<std::fs::File> {
359
self.0.inner.lock().unwrap().try_open_assume_latest()
360
}
361
362
/// Returns the cached file after ensuring it is up to date against the remote
363
/// This will always perform at least 1 API call for fetching metadata.
364
pub fn try_open_check_latest(&self) -> PolarsResult<std::fs::File> {
365
self.0.inner.lock().unwrap().try_open_check_latest()
366
}
367
368
pub fn update_ttl(&self, ttl: u64) {
369
self.0.ttl.store(ttl, std::sync::atomic::Ordering::Relaxed);
370
}
371
}
372
373
fn finish_open<F: FileLockAnyGuard>(data_file_path: &Path, _metadata_guard: &F) -> std::fs::File {
374
let file = {
375
#[cfg(not(target_family = "windows"))]
376
{
377
std::fs::OpenOptions::new()
378
.read(true)
379
.open(data_file_path)
380
.unwrap()
381
}
382
// windows requires write access to update the last accessed time
383
#[cfg(target_family = "windows")]
384
{
385
std::fs::OpenOptions::new()
386
.read(true)
387
.write(true)
388
.open(data_file_path)
389
.unwrap()
390
}
391
};
392
update_last_accessed(&file);
393
if FileExt::try_lock_shared(&file).is_err() {
394
panic!(
395
"finish_open: could not acquire shared lock on data file at {}",
396
data_file_path.to_str().unwrap()
397
);
398
}
399
file
400
}
401
402
/// `[prefix]/d/[uri hash][last modified]`
403
fn get_data_file_path(
404
path_prefix: &[u8],
405
uri_hash: &[u8],
406
remote_version: &FileVersion,
407
) -> PathBuf {
408
let owned;
409
let path = [
410
path_prefix,
411
&[b'/', DATA_PREFIX, b'/'],
412
uri_hash,
413
match remote_version {
414
FileVersion::Timestamp(v) => {
415
owned = Some(format!("{v:013x}"));
416
owned.as_deref().unwrap()
417
},
418
FileVersion::ETag(v) => v.as_str(),
419
FileVersion::Uninitialized => panic!("impl error: version not initialized"),
420
}
421
.as_bytes(),
422
]
423
.concat();
424
PathBuf::from(String::from_utf8(path).unwrap())
425
}
426
427
/// `[prefix]/m/[uri hash]`
428
fn get_metadata_file_path(path_prefix: &[u8], uri_hash: &[u8]) -> PathBuf {
429
let bytes = [path_prefix, &[b'/', METADATA_PREFIX, b'/'], uri_hash].concat();
430
PathBuf::from(String::from_utf8(bytes).unwrap())
431
}
432
433