Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/file_cache/utils.rs
8421 views
1
use std::sync::{Arc, LazyLock};
2
use std::time::UNIX_EPOCH;
3
4
use polars_error::{PolarsError, PolarsResult};
5
use polars_utils::pl_path::{CloudScheme, PlRefPath};
6
7
use super::cache::{FILE_CACHE, get_env_file_cache_ttl};
8
use super::entry::FileCacheEntry;
9
use super::file_fetcher::{CloudFileFetcher, LocalFileFetcher};
10
use crate::cloud::{CloudLocation, CloudOptions, build_object_store, object_path_from_str};
11
use crate::path_utils::{POLARS_TEMP_DIR_BASE_PATH, ensure_directory_init};
12
13
pub static FILE_CACHE_PREFIX: LazyLock<PlRefPath> = LazyLock::new(|| {
14
let path = PlRefPath::try_from_path(&POLARS_TEMP_DIR_BASE_PATH.join("file-cache/")).unwrap();
15
16
if let Err(err) = ensure_directory_init(path.as_ref()) {
17
panic!(
18
"failed to create file cache directory: path = {}, err = {}",
19
path, err
20
);
21
}
22
23
path
24
});
25
26
pub(super) fn last_modified_u64(metadata: &std::fs::Metadata) -> u64 {
27
u64::try_from(
28
metadata
29
.modified()
30
.unwrap()
31
.duration_since(UNIX_EPOCH)
32
.unwrap()
33
.as_millis(),
34
)
35
.unwrap()
36
}
37
38
pub(super) fn update_last_accessed(file: &std::fs::File) {
39
let file_metadata = file.metadata().unwrap();
40
41
if let Err(e) = file.set_times(
42
std::fs::FileTimes::new()
43
.set_modified(file_metadata.modified().unwrap())
44
.set_accessed(std::time::SystemTime::now()),
45
) {
46
panic!("failed to update file last accessed time: {e}");
47
}
48
}
49
50
pub async fn init_entries_from_uri_list(
51
uri_list: impl ExactSizeIterator<Item = PlRefPath> + Send + 'static,
52
cloud_options: Option<&CloudOptions>,
53
) -> PolarsResult<Vec<Arc<FileCacheEntry>>> {
54
init_entries_from_uri_list_impl(Box::new(uri_list), cloud_options).await
55
}
56
57
async fn init_entries_from_uri_list_impl(
58
uri_list: Box<dyn ExactSizeIterator<Item = PlRefPath> + Send + 'static>,
59
cloud_options: Option<&CloudOptions>,
60
) -> PolarsResult<Vec<Arc<FileCacheEntry>>> {
61
#[allow(clippy::len_zero)]
62
if uri_list.len() == 0 {
63
return Ok(Default::default());
64
}
65
66
let mut uri_list = uri_list.peekable();
67
68
let first_uri = uri_list.peek().unwrap().clone();
69
70
let file_cache_ttl = cloud_options
71
.map(|x| x.file_cache_ttl)
72
.unwrap_or_else(get_env_file_cache_ttl);
73
74
if first_uri.has_scheme() {
75
let shared_object_store = if !matches!(
76
first_uri.scheme(),
77
Some(CloudScheme::Http | CloudScheme::Https) // Object stores for http are tied to the path.
78
) {
79
let (_, object_store) = build_object_store(first_uri, cloud_options, false).await?;
80
Some(object_store)
81
} else {
82
None
83
};
84
85
futures::future::try_join_all(uri_list.map(|uri| {
86
let shared_object_store = shared_object_store.clone();
87
88
async move {
89
let object_store = if let Some(shared_object_store) = shared_object_store.clone() {
90
shared_object_store
91
} else {
92
let (_, object_store) =
93
build_object_store(uri.clone(), cloud_options, false).await?;
94
object_store
95
};
96
97
FILE_CACHE.init_entry(
98
uri.clone(),
99
&|| {
100
let CloudLocation { prefix, .. } =
101
CloudLocation::new(uri.clone(), false).unwrap();
102
let cloud_path = object_path_from_str(&prefix)?;
103
let object_store = object_store.clone();
104
105
Ok(Arc::new(CloudFileFetcher {
106
uri: uri.clone(),
107
object_store,
108
cloud_path,
109
}))
110
},
111
file_cache_ttl,
112
)
113
}
114
}))
115
.await
116
} else {
117
let mut out = Vec::with_capacity(uri_list.len());
118
for uri in uri_list {
119
let uri = tokio::fs::canonicalize(uri.as_str()).await.map_err(|err| {
120
let msg = Some(format!("{}: {}", err, uri).into());
121
PolarsError::IO {
122
error: err.into(),
123
msg,
124
}
125
})?;
126
let uri = PlRefPath::try_from_pathbuf(uri)?;
127
128
out.push(FILE_CACHE.init_entry(
129
uri.clone(),
130
&|| Ok(Arc::new(LocalFileFetcher::from_uri(uri.clone()))),
131
file_cache_ttl,
132
)?)
133
}
134
Ok(out)
135
}
136
}
137
138