Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/file_cache/file_fetcher.rs
6939 views
1
use std::sync::Arc;
2
3
use polars_error::{PolarsError, PolarsResult};
4
5
use super::metadata::FileVersion;
6
use super::utils::last_modified_u64;
7
use crate::cloud::PolarsObjectStore;
8
use crate::pl_async;
9
10
pub trait FileFetcher: Send + Sync {
11
fn get_uri(&self) -> &Arc<str>;
12
fn fetch_metadata(&self) -> PolarsResult<RemoteMetadata>;
13
/// Fetches the object to a `local_path`.
14
fn fetch(&self, local_path: &std::path::Path) -> PolarsResult<()>;
15
fn fetches_as_symlink(&self) -> bool;
16
}
17
18
pub struct RemoteMetadata {
19
pub size: u64,
20
pub(super) version: FileVersion,
21
}
22
23
/// A struct that fetches data from local disk and stores it into the `cache`.
24
/// Mostly used for debugging, it only ever gets called if `POLARS_FORCE_ASYNC` is set.
25
pub(super) struct LocalFileFetcher {
26
uri: Arc<str>,
27
path: Box<std::path::Path>,
28
}
29
30
impl LocalFileFetcher {
31
pub(super) fn from_uri(uri: Arc<str>) -> Self {
32
let path = std::path::PathBuf::from(uri.as_ref()).into_boxed_path();
33
debug_assert_eq!(
34
path,
35
std::fs::canonicalize(&path).unwrap().into_boxed_path()
36
);
37
38
Self { uri, path }
39
}
40
}
41
42
impl FileFetcher for LocalFileFetcher {
43
fn get_uri(&self) -> &Arc<str> {
44
&self.uri
45
}
46
47
fn fetches_as_symlink(&self) -> bool {
48
#[cfg(target_family = "unix")]
49
{
50
true
51
}
52
#[cfg(not(target_family = "unix"))]
53
{
54
false
55
}
56
}
57
58
fn fetch_metadata(&self) -> PolarsResult<RemoteMetadata> {
59
let metadata = std::fs::metadata(&self.path).map_err(PolarsError::from)?;
60
61
Ok(RemoteMetadata {
62
size: metadata.len(),
63
version: FileVersion::Timestamp(last_modified_u64(&metadata)),
64
})
65
}
66
67
fn fetch(&self, local_path: &std::path::Path) -> PolarsResult<()> {
68
#[cfg(target_family = "unix")]
69
{
70
std::os::unix::fs::symlink(&self.path, local_path).map_err(PolarsError::from)
71
}
72
#[cfg(not(target_family = "unix"))]
73
{
74
std::fs::copy(&self.path, local_path).map_err(PolarsError::from)?;
75
Ok(())
76
}
77
}
78
}
79
80
pub(super) struct CloudFileFetcher {
81
pub(super) uri: Arc<str>,
82
pub(super) cloud_path: object_store::path::Path,
83
pub(super) object_store: PolarsObjectStore,
84
}
85
86
impl FileFetcher for CloudFileFetcher {
87
fn get_uri(&self) -> &Arc<str> {
88
&self.uri
89
}
90
91
fn fetches_as_symlink(&self) -> bool {
92
false
93
}
94
95
fn fetch_metadata(&self) -> PolarsResult<RemoteMetadata> {
96
let metadata =
97
pl_async::get_runtime().block_in_place_on(self.object_store.head(&self.cloud_path))?;
98
99
Ok(RemoteMetadata {
100
size: metadata.size as u64,
101
version: metadata
102
.e_tag
103
.map(|x| FileVersion::ETag(blake3::hash(x.as_bytes()).to_hex()[..32].to_string()))
104
.unwrap_or_else(|| {
105
FileVersion::Timestamp(metadata.last_modified.timestamp_millis() as u64)
106
}),
107
})
108
}
109
110
fn fetch(&self, local_path: &std::path::Path) -> PolarsResult<()> {
111
pl_async::get_runtime().block_in_place_on(async {
112
let file = &mut tokio::fs::OpenOptions::new()
113
.write(true)
114
.truncate(true)
115
.open(local_path)
116
.await
117
.map_err(PolarsError::from)?;
118
119
self.object_store.download(&self.cloud_path, file).await
120
})
121
}
122
}
123
124