Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/file_cache/utils.rs
6939 views
1
use std::path::Path;
2
use std::sync::{Arc, LazyLock};
3
use std::time::UNIX_EPOCH;
4
5
use polars_error::{PolarsError, PolarsResult};
6
use polars_utils::plpath::{CloudScheme, PlPathRef};
7
8
use super::cache::{FILE_CACHE, get_env_file_cache_ttl};
9
use super::entry::FileCacheEntry;
10
use super::file_fetcher::{CloudFileFetcher, LocalFileFetcher};
11
use crate::cloud::{CloudLocation, CloudOptions, build_object_store, object_path_from_str};
12
use crate::path_utils::{POLARS_TEMP_DIR_BASE_PATH, ensure_directory_init};
13
use crate::pl_async;
14
15
pub static FILE_CACHE_PREFIX: LazyLock<Box<Path>> = LazyLock::new(|| {
16
let path = POLARS_TEMP_DIR_BASE_PATH
17
.join("file-cache/")
18
.into_boxed_path();
19
20
if let Err(err) = ensure_directory_init(path.as_ref()) {
21
panic!(
22
"failed to create file cache directory: path = {}, err = {}",
23
path.to_str().unwrap(),
24
err
25
);
26
}
27
28
path
29
});
30
31
pub(super) fn last_modified_u64(metadata: &std::fs::Metadata) -> u64 {
32
u64::try_from(
33
metadata
34
.modified()
35
.unwrap()
36
.duration_since(UNIX_EPOCH)
37
.unwrap()
38
.as_millis(),
39
)
40
.unwrap()
41
}
42
43
pub(super) fn update_last_accessed(file: &std::fs::File) {
44
let file_metadata = file.metadata().unwrap();
45
46
if let Err(e) = file.set_times(
47
std::fs::FileTimes::new()
48
.set_modified(file_metadata.modified().unwrap())
49
.set_accessed(std::time::SystemTime::now()),
50
) {
51
panic!("failed to update file last accessed time: {e}");
52
}
53
}
54
55
pub fn init_entries_from_uri_list(
56
mut uri_list: impl ExactSizeIterator<Item = Arc<str>>,
57
cloud_options: Option<&CloudOptions>,
58
) -> PolarsResult<Vec<Arc<FileCacheEntry>>> {
59
init_entries_from_uri_list_impl(&mut uri_list, cloud_options)
60
}
61
62
fn init_entries_from_uri_list_impl(
63
uri_list: &mut dyn ExactSizeIterator<Item = Arc<str>>,
64
cloud_options: Option<&CloudOptions>,
65
) -> PolarsResult<Vec<Arc<FileCacheEntry>>> {
66
#[expect(clippy::len_zero)]
67
if uri_list.len() == 0 {
68
return Ok(Default::default());
69
}
70
71
let mut uri_list = uri_list.peekable();
72
73
let first_uri = PlPathRef::new(uri_list.peek().unwrap().as_ref());
74
75
let file_cache_ttl = cloud_options
76
.map(|x| x.file_cache_ttl)
77
.unwrap_or_else(get_env_file_cache_ttl);
78
79
if first_uri.is_cloud_url() {
80
let shared_object_store = (!matches!(
81
first_uri.scheme(),
82
Some(CloudScheme::Http | CloudScheme::Https) // Object stores for http are tied to the path.
83
))
84
.then(|| {
85
pl_async::get_runtime().block_in_place_on(async {
86
let (_, object_store) =
87
build_object_store(first_uri.to_str(), cloud_options, false).await?;
88
89
PolarsResult::Ok(object_store)
90
})
91
})
92
.transpose()?;
93
94
pl_async::get_runtime().block_in_place_on(async {
95
futures::future::try_join_all(uri_list.map(|uri| {
96
let shared_object_store = shared_object_store.clone();
97
98
async move {
99
let object_store =
100
if let Some(shared_object_store) = shared_object_store.clone() {
101
shared_object_store
102
} else {
103
let (_, object_store) =
104
build_object_store(&uri, cloud_options, false).await?;
105
object_store
106
};
107
108
FILE_CACHE.init_entry(
109
uri.clone(),
110
&|| {
111
let CloudLocation { prefix, .. } =
112
CloudLocation::new(uri.as_ref(), false).unwrap();
113
let cloud_path = object_path_from_str(&prefix)?;
114
115
let uri = uri.clone();
116
let object_store = object_store.clone();
117
118
Ok(Arc::new(CloudFileFetcher {
119
uri,
120
object_store,
121
cloud_path,
122
}))
123
},
124
file_cache_ttl,
125
)
126
}
127
}))
128
.await
129
})
130
} else {
131
uri_list
132
.map(|uri| {
133
let uri = std::fs::canonicalize(uri.as_ref()).map_err(|err| {
134
let msg = Some(format!("{}: {}", err, uri.as_ref()).into());
135
PolarsError::IO {
136
error: err.into(),
137
msg,
138
}
139
})?;
140
let uri = Arc::<str>::from(uri.to_str().unwrap());
141
142
FILE_CACHE.init_entry(
143
uri.clone(),
144
&|| Ok(Arc::new(LocalFileFetcher::from_uri(uri.clone()))),
145
file_cache_ttl,
146
)
147
})
148
.collect::<PolarsResult<Vec<_>>>()
149
}
150
}
151
152