Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/file_cache/entry.rs
8327 views
1
use std::io::{Seek, SeekFrom};
2
use std::path::{Path, PathBuf};
3
use std::sync::atomic::AtomicU64;
4
use std::sync::{Arc, LazyLock, Mutex};
5
6
use fs4::fs_std::FileExt;
7
use polars_core::config;
8
use polars_error::{PolarsError, PolarsResult, polars_bail, to_compute_err};
9
use polars_utils::pl_path::PlRefPath;
10
11
use super::cache_lock::{self, GLOBAL_FILE_CACHE_LOCK};
12
use super::file_fetcher::{FileFetcher, RemoteMetadata};
13
use super::file_lock::{FileLock, FileLockAnyGuard};
14
use super::metadata::{EntryMetadata, FileVersion};
15
use super::utils::update_last_accessed;
16
17
pub(super) const DATA_PREFIX: u8 = b'd';
18
pub(super) const METADATA_PREFIX: u8 = b'm';
19
20
struct CachedData {
21
last_modified: u64,
22
metadata: Arc<EntryMetadata>,
23
data_file_path: PathBuf,
24
}
25
26
struct Inner {
27
uri: PlRefPath,
28
uri_hash: String,
29
path_prefix: PlRefPath,
30
metadata: FileLock<PathBuf>,
31
cached_data: Option<CachedData>,
32
ttl: Arc<AtomicU64>,
33
file_fetcher: Arc<dyn FileFetcher>,
34
}
35
36
struct EntryData {
37
uri: PlRefPath,
38
inner: Mutex<Inner>,
39
ttl: Arc<AtomicU64>,
40
}
41
42
pub struct FileCacheEntry(EntryData);
43
44
impl EntryMetadata {
45
fn matches_remote_metadata(&self, remote_metadata: &RemoteMetadata) -> bool {
46
self.remote_version == remote_metadata.version && self.local_size == remote_metadata.size
47
}
48
}
49
50
impl Inner {
51
fn try_open_assume_latest(&mut self) -> PolarsResult<std::fs::File> {
52
let verbose = config::verbose();
53
54
{
55
let cache_guard = GLOBAL_FILE_CACHE_LOCK.lock_shared();
56
// We want to use an exclusive lock here to avoid an API call in the case where only the
57
// local TTL was updated.
58
let metadata_file = &mut self.metadata.acquire_exclusive().unwrap();
59
update_last_accessed(metadata_file);
60
61
if let Ok(metadata) = self.try_get_metadata(metadata_file, &cache_guard) {
62
let data_file_path = self.get_cached_data_file_path();
63
64
if metadata.compare_local_state(data_file_path).is_ok() {
65
if verbose {
66
eprintln!(
67
"[file_cache::entry] try_open_assume_latest: opening already fetched file for uri = {}",
68
self.uri.clone()
69
);
70
}
71
return Ok(finish_open(data_file_path, metadata_file));
72
}
73
}
74
}
75
76
if verbose {
77
eprintln!(
78
"[file_cache::entry] try_open_assume_latest: did not find cached file for uri = {}",
79
self.uri.clone()
80
);
81
}
82
83
self.try_open_check_latest()
84
}
85
86
fn try_open_check_latest(&mut self) -> PolarsResult<std::fs::File> {
87
let verbose = config::verbose();
88
let remote_metadata = &self.file_fetcher.fetch_metadata()?;
89
let cache_guard = GLOBAL_FILE_CACHE_LOCK.lock_shared();
90
91
{
92
let metadata_file = &mut self.metadata.acquire_shared().unwrap();
93
update_last_accessed(metadata_file);
94
95
if let Ok(metadata) = self.try_get_metadata(metadata_file, &cache_guard) {
96
if metadata.matches_remote_metadata(remote_metadata) {
97
let data_file_path = self.get_cached_data_file_path();
98
99
if metadata.compare_local_state(data_file_path).is_ok() {
100
if verbose {
101
eprintln!(
102
"[file_cache::entry] try_open_check_latest: opening already fetched file for uri = {}",
103
self.uri.clone()
104
);
105
}
106
return Ok(finish_open(data_file_path, metadata_file));
107
}
108
}
109
}
110
}
111
112
let metadata_file = &mut self.metadata.acquire_exclusive().unwrap();
113
let metadata = self
114
.try_get_metadata(metadata_file, &cache_guard)
115
// Safety: `metadata_file` is an exclusive guard.
116
.unwrap_or_else(|_| {
117
Arc::new(EntryMetadata::new(
118
self.uri.clone(),
119
self.ttl.load(std::sync::atomic::Ordering::Relaxed),
120
))
121
});
122
123
if metadata.matches_remote_metadata(remote_metadata) {
124
let data_file_path = self.get_cached_data_file_path();
125
126
if metadata.compare_local_state(data_file_path).is_ok() {
127
if verbose {
128
eprintln!(
129
"[file_cache::entry] try_open_check_latest: opening already fetched file (lost race) for uri = {}",
130
self.uri.clone()
131
);
132
}
133
return Ok(finish_open(data_file_path, metadata_file));
134
}
135
}
136
137
if verbose {
138
eprintln!(
139
"[file_cache::entry] try_open_check_latest: fetching new data file for uri = {}, remote_version = {:?}, remote_size = {}",
140
self.uri.clone(),
141
remote_metadata.version,
142
remote_metadata.size
143
);
144
}
145
146
let data_file_path = &get_data_file_path(
147
self.path_prefix.as_bytes(),
148
self.uri_hash.as_bytes(),
149
&remote_metadata.version,
150
);
151
// Remove the file if it exists, since it doesn't match the metadata.
152
// This could be left from an aborted process.
153
let _ = std::fs::remove_file(data_file_path);
154
if !self.file_fetcher.fetches_as_symlink() {
155
let file = std::fs::OpenOptions::new()
156
.write(true)
157
.create(true)
158
.truncate(true)
159
.open(data_file_path)
160
.map_err(PolarsError::from)?;
161
162
// * Some(true) => always raise
163
// * Some(false) => never raise
164
// * None => do not raise if fallocate() is not permitted, otherwise raise.
165
static RAISE_ALLOC_ERROR: LazyLock<Option<bool>> = LazyLock::new(|| {
166
let v = match std::env::var("POLARS_IGNORE_FILE_CACHE_ALLOCATE_ERROR").as_deref() {
167
Ok("1") => Some(false),
168
Ok("0") => Some(true),
169
Err(_) => None,
170
Ok(v) => {
171
panic!("invalid value {v} for POLARS_IGNORE_FILE_CACHE_ALLOCATE_ERROR")
172
},
173
};
174
if config::verbose() {
175
eprintln!("[file_cache]: RAISE_ALLOC_ERROR: {v:?}");
176
}
177
v
178
});
179
180
// Initialize it to get the verbose print
181
let raise_alloc_err = *RAISE_ALLOC_ERROR;
182
183
file.lock_exclusive().unwrap();
184
if let Err(e) = file.allocate(remote_metadata.size) {
185
let msg = format!(
186
"failed to reserve {} bytes on disk to download uri = {}: {:?}",
187
remote_metadata.size, &self.uri, e
188
);
189
190
if raise_alloc_err == Some(true)
191
|| (raise_alloc_err.is_none() && file.allocate(1).is_ok())
192
{
193
polars_bail!(ComputeError: msg)
194
} else if config::verbose() {
195
eprintln!("[file_cache]: warning: {msg}")
196
}
197
}
198
}
199
self.file_fetcher.fetch(data_file_path)?;
200
201
// Don't do this on windows as it will break setting last accessed times.
202
#[cfg(target_family = "unix")]
203
if !self.file_fetcher.fetches_as_symlink() {
204
let mut perms = std::fs::metadata(data_file_path.clone())
205
.unwrap()
206
.permissions();
207
perms.set_readonly(true);
208
std::fs::set_permissions(data_file_path, perms).unwrap();
209
}
210
211
let data_file_metadata = std::fs::metadata(data_file_path).unwrap();
212
let local_last_modified = super::utils::last_modified_u64(&data_file_metadata);
213
let local_size = data_file_metadata.len();
214
215
if local_size != remote_metadata.size {
216
polars_bail!(ComputeError: "downloaded file size ({}) does not match expected size ({})", local_size, remote_metadata.size);
217
}
218
219
let mut metadata = metadata;
220
let metadata = Arc::make_mut(&mut metadata);
221
metadata.local_last_modified = local_last_modified;
222
metadata.local_size = local_size;
223
metadata.remote_version = remote_metadata.version.clone();
224
225
if let Err(e) = metadata.compare_local_state(data_file_path) {
226
panic!("metadata mismatch after file fetch: {e}");
227
}
228
229
let data_file = finish_open(data_file_path, metadata_file);
230
231
metadata_file.set_len(0).unwrap();
232
metadata_file.seek(SeekFrom::Start(0)).unwrap();
233
metadata
234
.try_write(&mut **metadata_file)
235
.map_err(to_compute_err)?;
236
237
Ok(data_file)
238
}
239
240
/// Try to read the metadata from disk. If `F` is an exclusive guard, this
241
/// will update the TTL stored in the metadata file if it does not match.
242
fn try_get_metadata<F: FileLockAnyGuard>(
243
&mut self,
244
metadata_file: &mut F,
245
_cache_guard: &cache_lock::GlobalFileCacheGuardAny,
246
) -> PolarsResult<Arc<EntryMetadata>> {
247
let last_modified = super::utils::last_modified_u64(&metadata_file.metadata().unwrap());
248
let ttl = self.ttl.load(std::sync::atomic::Ordering::Relaxed);
249
250
for _ in 0..2 {
251
if let Some(ref cached) = self.cached_data {
252
if cached.last_modified == last_modified {
253
if cached.metadata.ttl != ttl {
254
polars_bail!(ComputeError: "TTL mismatch");
255
}
256
257
if cached.metadata.uri != self.uri {
258
unimplemented!(
259
"hash collision: uri1 = {}, uri2 = {}, hash = {}",
260
cached.metadata.uri,
261
self.uri,
262
self.uri_hash,
263
);
264
}
265
266
return Ok(cached.metadata.clone());
267
}
268
}
269
270
// Ensure cache is unset if read fails
271
self.cached_data = None;
272
273
let mut metadata =
274
EntryMetadata::try_from_reader(&mut **metadata_file).map_err(to_compute_err)?;
275
276
// Note this means if multiple processes on the same system set a
277
// different TTL for the same path, the metadata file will constantly
278
// get overwritten.
279
if metadata.ttl != ttl {
280
if F::IS_EXCLUSIVE {
281
metadata.ttl = ttl;
282
metadata_file.set_len(0).unwrap();
283
metadata_file.seek(SeekFrom::Start(0)).unwrap();
284
metadata
285
.try_write(&mut **metadata_file)
286
.map_err(to_compute_err)?;
287
} else {
288
polars_bail!(ComputeError: "TTL mismatch");
289
}
290
}
291
292
let metadata = Arc::new(metadata);
293
let data_file_path = get_data_file_path(
294
self.path_prefix.as_bytes(),
295
self.uri_hash.as_bytes(),
296
&metadata.remote_version,
297
);
298
self.cached_data = Some(CachedData {
299
last_modified,
300
metadata,
301
data_file_path,
302
});
303
}
304
305
unreachable!();
306
}
307
308
/// # Panics
309
/// Panics if `self.cached_data` is `None`.
310
fn get_cached_data_file_path(&self) -> &Path {
311
&self.cached_data.as_ref().unwrap().data_file_path
312
}
313
}
314
315
impl FileCacheEntry {
316
pub(crate) fn new(
317
uri: PlRefPath,
318
uri_hash: String,
319
path_prefix: PlRefPath,
320
file_fetcher: Arc<dyn FileFetcher>,
321
file_cache_ttl: u64,
322
) -> Self {
323
let metadata = FileLock::from(get_metadata_file_path(
324
path_prefix.as_bytes(),
325
uri_hash.as_bytes(),
326
));
327
328
debug_assert!(
329
PlRefPath::ptr_eq(&uri, file_fetcher.get_uri()),
330
"impl error: entry uri != file_fetcher uri"
331
);
332
333
let ttl = Arc::new(AtomicU64::from(file_cache_ttl));
334
335
Self(EntryData {
336
uri: uri.clone(),
337
inner: Mutex::new(Inner {
338
uri,
339
uri_hash,
340
path_prefix,
341
metadata,
342
cached_data: None,
343
ttl: ttl.clone(),
344
file_fetcher,
345
}),
346
ttl,
347
})
348
}
349
350
pub fn uri(&self) -> &PlRefPath {
351
&self.0.uri
352
}
353
354
/// Directly returns the cached file if it finds one without checking if
355
/// there is a newer version on the remote. This does not make any API calls
356
/// if it finds a cached file, otherwise it simply downloads the file.
357
pub fn try_open_assume_latest(&self) -> PolarsResult<std::fs::File> {
358
self.0.inner.lock().unwrap().try_open_assume_latest()
359
}
360
361
/// Returns the cached file after ensuring it is up to date against the remote
362
/// This will always perform at least 1 API call for fetching metadata.
363
pub fn try_open_check_latest(&self) -> PolarsResult<std::fs::File> {
364
self.0.inner.lock().unwrap().try_open_check_latest()
365
}
366
367
pub fn update_ttl(&self, ttl: u64) {
368
self.0.ttl.store(ttl, std::sync::atomic::Ordering::Relaxed);
369
}
370
}
371
372
fn finish_open<F: FileLockAnyGuard>(data_file_path: &Path, _metadata_guard: &F) -> std::fs::File {
373
let file = {
374
#[cfg(not(target_family = "windows"))]
375
{
376
std::fs::OpenOptions::new()
377
.read(true)
378
.open(data_file_path)
379
.unwrap()
380
}
381
// windows requires write access to update the last accessed time
382
#[cfg(target_family = "windows")]
383
{
384
std::fs::OpenOptions::new()
385
.read(true)
386
.write(true)
387
.open(data_file_path)
388
.unwrap()
389
}
390
};
391
update_last_accessed(&file);
392
if FileExt::try_lock_shared(&file).is_err() {
393
panic!(
394
"finish_open: could not acquire shared lock on data file at {:?}",
395
data_file_path
396
);
397
}
398
file
399
}
400
401
/// `[prefix]/d/[uri hash][last modified]`
402
fn get_data_file_path(
403
path_prefix: &[u8],
404
uri_hash: &[u8],
405
remote_version: &FileVersion,
406
) -> PathBuf {
407
let owned;
408
let path = [
409
path_prefix,
410
&[b'/', DATA_PREFIX, b'/'],
411
uri_hash,
412
match remote_version {
413
FileVersion::Timestamp(v) => {
414
owned = Some(format!("{v:013x}"));
415
owned.as_deref().unwrap()
416
},
417
FileVersion::ETag(v) => v.as_str(),
418
FileVersion::Uninitialized => panic!("impl error: version not initialized"),
419
}
420
.as_bytes(),
421
]
422
.concat();
423
PathBuf::from(String::from_utf8(path).unwrap())
424
}
425
426
/// `[prefix]/m/[uri hash]`
427
fn get_metadata_file_path(path_prefix: &[u8], uri_hash: &[u8]) -> PathBuf {
428
let bytes = [path_prefix, &[b'/', METADATA_PREFIX, b'/'], uri_hash].concat();
429
PathBuf::from(String::from_utf8(bytes).unwrap())
430
}
431
432