Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/path_utils/mod.rs
8421 views
1
use std::borrow::Cow;
2
use std::collections::VecDeque;
3
use std::path::{Path, PathBuf};
4
use std::sync::LazyLock;
5
6
use polars_buffer::Buffer;
7
use polars_core::config;
8
use polars_core::error::{PolarsResult, polars_bail, to_compute_err};
9
use polars_utils::pl_path::{CloudScheme, PlRefPath};
10
use polars_utils::pl_str::PlSmallStr;
11
12
#[cfg(feature = "cloud")]
13
mod hugging_face;
14
15
use crate::cloud::CloudOptions;
16
17
#[allow(clippy::bind_instead_of_map)]
18
pub static POLARS_TEMP_DIR_BASE_PATH: LazyLock<Box<Path>> = LazyLock::new(|| {
19
(|| {
20
let verbose = config::verbose();
21
22
let path = if let Ok(v) = std::env::var("POLARS_TEMP_DIR").map(PathBuf::from) {
23
if verbose {
24
eprintln!("init_temp_dir: sourced from POLARS_TEMP_DIR")
25
}
26
v
27
} else if cfg!(target_family = "unix") {
28
let id = std::env::var("USER")
29
.inspect(|_| {
30
if verbose {
31
eprintln!("init_temp_dir: sourced $USER")
32
}
33
})
34
.or_else(|_e| {
35
// We shouldn't hit here, but we can fallback to hashing $HOME if blake3 is
36
// available (it is available when file_cache is activated).
37
#[cfg(feature = "file_cache")]
38
{
39
std::env::var("HOME")
40
.inspect(|_| {
41
if verbose {
42
eprintln!("init_temp_dir: sourced $HOME")
43
}
44
})
45
.map(|x| blake3::hash(x.as_bytes()).to_hex()[..32].to_string())
46
}
47
#[cfg(not(feature = "file_cache"))]
48
{
49
Err(_e)
50
}
51
});
52
53
if let Ok(v) = id {
54
std::env::temp_dir().join(format!("polars-{v}/"))
55
} else {
56
return Err(std::io::Error::other(
57
"could not load $USER or $HOME environment variables",
58
));
59
}
60
} else if cfg!(target_family = "windows") {
61
// Setting permissions on Windows is not as easy compared to Unix, but fortunately
62
// the default temporary directory location is underneath the user profile, so we
63
// shouldn't need to do anything.
64
std::env::temp_dir().join("polars/")
65
} else {
66
std::env::temp_dir().join("polars/")
67
}
68
.into_boxed_path();
69
70
if let Err(err) = std::fs::create_dir_all(path.as_ref()) {
71
if !path.is_dir() {
72
panic!(
73
"failed to create temporary directory: {} (path = {:?})",
74
err,
75
path.as_ref()
76
);
77
}
78
}
79
80
#[cfg(target_family = "unix")]
81
{
82
use std::os::unix::fs::PermissionsExt;
83
84
let result = (|| {
85
std::fs::set_permissions(path.as_ref(), std::fs::Permissions::from_mode(0o700))?;
86
let perms = std::fs::metadata(path.as_ref())?.permissions();
87
88
if (perms.mode() % 0o1000) != 0o700 {
89
std::io::Result::Err(std::io::Error::other(format!(
90
"permission mismatch: {perms:?}"
91
)))
92
} else {
93
std::io::Result::Ok(())
94
}
95
})()
96
.map_err(|e| {
97
std::io::Error::new(
98
e.kind(),
99
format!(
100
"error setting temporary directory permissions: {} (path = {:?})",
101
e,
102
path.as_ref()
103
),
104
)
105
});
106
107
if std::env::var("POLARS_ALLOW_UNSECURED_TEMP_DIR").as_deref() != Ok("1") {
108
result?;
109
}
110
}
111
112
std::io::Result::Ok(path)
113
})()
114
.map_err(|e| {
115
std::io::Error::new(
116
e.kind(),
117
format!(
118
"error initializing temporary directory: {e} \
119
consider explicitly setting POLARS_TEMP_DIR"
120
),
121
)
122
})
123
.unwrap()
124
});
125
126
/// Replaces a "~" in the Path with the home directory.
127
pub fn resolve_homedir<'a, S: AsRef<Path> + ?Sized>(path: &'a S) -> Cow<'a, Path> {
128
return inner(path.as_ref());
129
130
fn inner(path: &Path) -> Cow<'_, Path> {
131
if path.starts_with("~") {
132
// home crate does not compile on wasm https://github.com/rust-lang/cargo/issues/12297
133
#[cfg(not(target_family = "wasm"))]
134
if let Some(homedir) = home::home_dir() {
135
return Cow::Owned(homedir.join(path.strip_prefix("~").unwrap()));
136
}
137
}
138
139
Cow::Borrowed(path)
140
}
141
}
142
143
fn has_glob(path: &[u8]) -> bool {
144
return get_glob_start_idx(path).is_some();
145
146
/// Get the index of the first occurrence of a glob symbol.
147
fn get_glob_start_idx(path: &[u8]) -> Option<usize> {
148
memchr::memchr3(b'*', b'?', b'[', path)
149
}
150
}
151
152
/// Returns `true` if `expanded_paths` were expanded from a single directory
153
pub fn expanded_from_single_directory(paths: &[PlRefPath], expanded_paths: &[PlRefPath]) -> bool {
154
// Single input that isn't a glob
155
paths.len() == 1 && !has_glob(paths[0].strip_scheme().as_bytes())
156
// And isn't a file
157
&& {
158
(
159
// For local paths, we can just use `is_dir`
160
!paths[0].has_scheme() && paths[0].as_std_path().is_dir()
161
)
162
|| (
163
// For cloud paths, we determine that the input path isn't a file by checking that the
164
// output path differs.
165
expanded_paths.is_empty() || (paths[0] != expanded_paths[0])
166
)
167
}
168
}
169
170
/// Recursively traverses directories and expands globs if `glob` is `true`.
171
pub async fn expand_paths(
172
paths: &[PlRefPath],
173
glob: bool,
174
hidden_file_prefix: &[PlSmallStr],
175
#[allow(unused_variables)] cloud_options: &mut Option<CloudOptions>,
176
) -> PolarsResult<Buffer<PlRefPath>> {
177
expand_paths_hive(paths, glob, hidden_file_prefix, cloud_options, false)
178
.await
179
.map(|x| x.0)
180
}
181
182
struct HiveIdxTracker<'a> {
183
idx: usize,
184
paths: &'a [PlRefPath],
185
check_directory_level: bool,
186
}
187
188
impl HiveIdxTracker<'_> {
189
fn update(&mut self, i: usize, path_idx: usize) -> PolarsResult<()> {
190
let check_directory_level = self.check_directory_level;
191
let paths = self.paths;
192
193
if check_directory_level
194
&& ![usize::MAX, i].contains(&self.idx)
195
// They could still be the same directory level, just with different name length
196
&& (path_idx > 0 && paths[path_idx].parent() != paths[path_idx - 1].parent())
197
{
198
polars_bail!(
199
InvalidOperation:
200
"attempted to read from different directory levels with hive partitioning enabled: \
201
first path: {}, second path: {}",
202
&paths[path_idx - 1],
203
&paths[path_idx],
204
)
205
} else {
206
self.idx = std::cmp::min(self.idx, i);
207
Ok(())
208
}
209
}
210
}
211
212
#[cfg(feature = "cloud")]
213
async fn expand_path_cloud(
214
path: PlRefPath,
215
cloud_options: Option<&CloudOptions>,
216
glob: bool,
217
first_path_has_scheme: bool,
218
) -> PolarsResult<(usize, Vec<PlRefPath>)> {
219
let format_path = |scheme: &str, bucket: &str, location: &str| {
220
if first_path_has_scheme {
221
format!("{scheme}://{bucket}/{location}")
222
} else {
223
format!("/{location}")
224
}
225
};
226
227
use polars_utils::_limit_path_len_io_err;
228
229
use crate::cloud::object_path_from_str;
230
let path_str = path.as_str();
231
232
let (cloud_location, store) =
233
crate::cloud::build_object_store(path.clone(), cloud_options, glob).await?;
234
let prefix = object_path_from_str(&cloud_location.prefix)?;
235
236
let out = if !path_str.ends_with("/") && (!glob || cloud_location.expansion.is_none()) && {
237
// We need to check if it is a directory for local paths (we can be here due
238
// to FORCE_ASYNC). For cloud paths the convention is that the user must add
239
// a trailing slash `/` to scan directories. We don't infer it as that would
240
// mean sending one network request per path serially (very slow).
241
path.has_scheme() || path.as_std_path().is_file()
242
} {
243
(
244
0,
245
vec![PlRefPath::new(format_path(
246
cloud_location.scheme,
247
&cloud_location.bucket,
248
prefix.as_ref(),
249
))],
250
)
251
} else {
252
use futures::TryStreamExt;
253
254
if !path.has_scheme() {
255
// FORCE_ASYNC in the test suite wants us to raise a proper error message
256
// for non-existent file paths. Note we can't do this for cloud paths as
257
// there is no concept of a "directory" - a non-existent path is
258
// indistinguishable from an empty directory.
259
path.as_std_path()
260
.metadata()
261
.map_err(|err| _limit_path_len_io_err(path.as_std_path(), err))?;
262
}
263
264
let cloud_location = &cloud_location;
265
let prefix_ref = &prefix;
266
267
let mut paths = store
268
.exec_with_rebuild_retry_on_err(|s| async move {
269
let out = s
270
.list(Some(prefix_ref))
271
.try_filter_map(|x| async move {
272
let out = (x.size > 0).then(|| {
273
PlRefPath::new({
274
format_path(
275
cloud_location.scheme,
276
&cloud_location.bucket,
277
x.location.as_ref(),
278
)
279
})
280
});
281
Ok(out)
282
})
283
.try_collect::<Vec<_>>()
284
.await?;
285
286
Ok(out)
287
})
288
.await?;
289
290
// Since Path::parse() removes any trailing slash ('/'), we may need to restore it
291
// to calculate the right byte offset
292
let mut prefix = prefix.to_string();
293
if path_str.ends_with('/') && !prefix.ends_with('/') {
294
prefix.push('/')
295
};
296
297
paths.sort_unstable();
298
299
(
300
format_path(
301
cloud_location.scheme,
302
&cloud_location.bucket,
303
prefix.as_ref(),
304
)
305
.len(),
306
paths,
307
)
308
};
309
310
PolarsResult::Ok(out)
311
}
312
313
/// Recursively traverses directories and expands globs if `glob` is `true`.
314
/// Returns the expanded paths and the index at which to start parsing hive
315
/// partitions from the path.
316
pub async fn expand_paths_hive(
317
paths: &[PlRefPath],
318
glob: bool,
319
hidden_file_prefix: &[PlSmallStr],
320
#[allow(unused_variables)] cloud_options: &mut Option<CloudOptions>,
321
check_directory_level: bool,
322
) -> PolarsResult<(Buffer<PlRefPath>, usize)> {
323
let Some(first_path) = paths.first() else {
324
return Ok((vec![].into(), 0));
325
};
326
327
let first_path_has_scheme = first_path.has_scheme();
328
329
let is_hidden_file = move |path: &PlRefPath| {
330
path.file_name()
331
.and_then(|x| x.to_str())
332
.is_some_and(|file_name| {
333
hidden_file_prefix
334
.iter()
335
.any(|x| file_name.starts_with(x.as_str()))
336
})
337
};
338
339
let mut out_paths = OutPaths {
340
paths: vec![],
341
exts: [None, None],
342
is_hidden_file: &is_hidden_file,
343
};
344
345
let mut hive_idx_tracker = HiveIdxTracker {
346
idx: usize::MAX,
347
paths,
348
check_directory_level,
349
};
350
351
if first_path_has_scheme || {
352
cfg!(not(target_family = "windows")) && polars_config::config().force_async()
353
} {
354
#[cfg(feature = "cloud")]
355
{
356
if first_path.scheme() == Some(CloudScheme::Hf) {
357
let (expand_start_idx, paths) = hugging_face::expand_paths_hf(
358
paths,
359
check_directory_level,
360
cloud_options,
361
glob,
362
)
363
.await?;
364
365
return Ok((paths.into(), expand_start_idx));
366
}
367
368
for (path_idx, path) in paths.iter().enumerate() {
369
use std::borrow::Cow;
370
371
let mut path = Cow::Borrowed(path);
372
373
if matches!(path.scheme(), Some(CloudScheme::Http | CloudScheme::Https)) {
374
let mut rewrite_aws = false;
375
376
#[cfg(feature = "aws")]
377
if let Some(p) = (|| {
378
use crate::cloud::CloudConfig;
379
380
// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html#virtual-hosted-style-access
381
// Path format: https://bucket-name.s3.region-code.amazonaws.com/key-name
382
let after_scheme = path.strip_scheme();
383
384
let bucket_end = after_scheme.find(".s3.")?;
385
let offset = bucket_end + 4;
386
// Search after offset to prevent matching `.s3.amazonaws.com` (legacy global endpoint URL without region).
387
let region_end = offset + after_scheme[offset..].find(".amazonaws.com/")?;
388
389
// Do not convert if '?' (this can be query parameters for AWS presigned URLs).
390
if after_scheme[..region_end].contains('/') || after_scheme.contains('?') {
391
return None;
392
}
393
394
let bucket = &after_scheme[..bucket_end];
395
let region = &after_scheme[bucket_end + 4..region_end];
396
let key = &after_scheme[region_end + 15..];
397
398
if let CloudConfig::Aws(configs) = cloud_options
399
.get_or_insert_default()
400
.config
401
.get_or_insert_with(|| CloudConfig::Aws(Vec::with_capacity(1)))
402
{
403
use object_store::aws::AmazonS3ConfigKey;
404
405
if !matches!(configs.last(), Some((AmazonS3ConfigKey::Region, _))) {
406
configs.push((AmazonS3ConfigKey::Region, region.into()))
407
}
408
}
409
410
Some(format!("s3://{bucket}/{key}"))
411
})() {
412
path = Cow::Owned(PlRefPath::new(p));
413
rewrite_aws = true;
414
}
415
416
if !rewrite_aws {
417
out_paths.push(path.into_owned());
418
hive_idx_tracker.update(0, path_idx)?;
419
continue;
420
}
421
}
422
423
let sort_start_idx = out_paths.paths.len();
424
425
if glob && has_glob(path.as_bytes()) {
426
hive_idx_tracker.update(0, path_idx)?;
427
428
let iter = crate::pl_async::get_runtime().block_in_place_on(
429
crate::async_glob(path.into_owned(), cloud_options.as_ref()),
430
)?;
431
432
if first_path_has_scheme {
433
out_paths.extend(iter.into_iter().map(PlRefPath::new))
434
} else {
435
// FORCE_ASYNC, remove leading file:// as the caller may not be expecting a
436
// URI result.
437
out_paths.extend(iter.iter().map(|x| &x[7..]).map(PlRefPath::new))
438
};
439
} else {
440
let (expand_start_idx, paths) = expand_path_cloud(
441
path.into_owned(),
442
cloud_options.as_ref(),
443
glob,
444
first_path_has_scheme,
445
)
446
.await?;
447
out_paths.extend_from_slice(&paths);
448
hive_idx_tracker.update(expand_start_idx, path_idx)?;
449
};
450
451
if let Some(mut_slice) = out_paths.paths.get_mut(sort_start_idx..) {
452
<[PlRefPath]>::sort_unstable(mut_slice);
453
}
454
}
455
}
456
#[cfg(not(feature = "cloud"))]
457
panic!("Feature `cloud` must be enabled to use globbing patterns with cloud urls.")
458
} else {
459
let mut stack: VecDeque<Cow<'_, Path>> = VecDeque::new();
460
let mut paths_scratch: Vec<PathBuf> = vec![];
461
462
for (path_idx, path) in paths.iter().enumerate() {
463
stack.clear();
464
let sort_start_idx = out_paths.paths.len();
465
466
if path.as_std_path().is_dir() {
467
let i = path.as_str().len();
468
469
hive_idx_tracker.update(i, path_idx)?;
470
471
stack.push_back(Cow::Borrowed(path.as_std_path()));
472
473
while let Some(dir) = stack.pop_front() {
474
let mut last_err = Ok(());
475
476
paths_scratch.clear();
477
paths_scratch.extend(std::fs::read_dir(dir)?.map_while(|x| {
478
match x.map(|x| x.path()) {
479
Ok(v) => Some(v),
480
Err(e) => {
481
last_err = Err(e);
482
None
483
},
484
}
485
}));
486
487
last_err?;
488
489
for path in paths_scratch.drain(..) {
490
let md = path.metadata()?;
491
492
if md.is_dir() {
493
stack.push_back(Cow::Owned(path));
494
} else if md.len() > 0 {
495
out_paths.push(PlRefPath::try_from_path(&path)?);
496
}
497
}
498
}
499
} else if glob && has_glob(path.as_bytes()) {
500
hive_idx_tracker.update(0, path_idx)?;
501
502
let Ok(paths) = glob::glob(path.as_str()) else {
503
polars_bail!(ComputeError: "invalid glob pattern given")
504
};
505
506
for path in paths {
507
let path = path.map_err(to_compute_err)?;
508
let md = path.metadata()?;
509
if !md.is_dir() && md.len() > 0 {
510
out_paths.push(PlRefPath::try_from_path(&path)?);
511
}
512
}
513
} else {
514
hive_idx_tracker.update(0, path_idx)?;
515
out_paths.push(path.clone());
516
};
517
518
if let Some(mut_slice) = out_paths.paths.get_mut(sort_start_idx..) {
519
<[PlRefPath]>::sort_unstable(mut_slice);
520
}
521
}
522
}
523
524
if expanded_from_single_directory(paths, out_paths.paths.as_slice()) {
525
if let [Some((_, p1)), Some((_, p2))] = out_paths.exts {
526
polars_bail!(
527
InvalidOperation: "directory contained paths with different file extensions: \
528
first path: {}, second path: {}. Please use a glob pattern to explicitly specify \
529
which files to read (e.g. 'dir/**/*', 'dir/**/*.parquet')",
530
&p1, &p2
531
)
532
}
533
}
534
535
return Ok((out_paths.paths.into(), hive_idx_tracker.idx));
536
537
/// Wrapper around `Vec<PathBuf>` that also tracks file extensions, so that
538
/// we don't have to traverse the entire list again to validate extensions.
539
struct OutPaths<'a, F: Fn(&PlRefPath) -> bool> {
540
paths: Vec<PlRefPath>,
541
exts: [Option<(PlSmallStr, PlRefPath)>; 2],
542
is_hidden_file: &'a F,
543
}
544
545
impl<F> OutPaths<'_, F>
546
where
547
F: Fn(&PlRefPath) -> bool,
548
{
549
fn push(&mut self, value: PlRefPath) {
550
if (self.is_hidden_file)(&value) {
551
return;
552
}
553
554
let exts = &mut self.exts;
555
Self::update_ext_status(exts, &value);
556
557
self.paths.push(value)
558
}
559
560
fn extend(&mut self, values: impl IntoIterator<Item = PlRefPath>) {
561
let exts = &mut self.exts;
562
563
self.paths.extend(
564
values
565
.into_iter()
566
.filter(|x| !(self.is_hidden_file)(x))
567
.inspect(|x| {
568
Self::update_ext_status(exts, x);
569
}),
570
)
571
}
572
573
fn extend_from_slice(&mut self, values: &[PlRefPath]) {
574
self.extend(values.iter().cloned())
575
}
576
577
fn update_ext_status(exts: &mut [Option<(PlSmallStr, PlRefPath)>; 2], value: &PlRefPath) {
578
let ext = value
579
.extension()
580
.map_or(PlSmallStr::EMPTY, PlSmallStr::from);
581
582
if exts[0].is_none() {
583
exts[0] = Some((ext, value.clone()));
584
} else if exts[1].is_none() && ext != exts[0].as_ref().unwrap().0 {
585
exts[1] = Some((ext, value.clone()));
586
}
587
}
588
}
589
}
590
591
/// Ignores errors from `std::fs::create_dir_all` if the directory exists.
592
#[cfg(feature = "file_cache")]
593
pub(crate) fn ensure_directory_init(path: &Path) -> std::io::Result<()> {
594
let result = std::fs::create_dir_all(path);
595
596
if path.is_dir() { Ok(()) } else { result }
597
}
598
599
#[cfg(test)]
600
mod tests {
601
use std::path::PathBuf;
602
603
use polars_utils::pl_path::PlRefPath;
604
605
use super::resolve_homedir;
606
use crate::pl_async::get_runtime;
607
608
#[cfg(not(target_os = "windows"))]
609
#[test]
610
fn test_resolve_homedir() {
611
let paths: Vec<PathBuf> = vec![
612
"~/dir1/dir2/test.csv".into(),
613
"/abs/path/test.csv".into(),
614
"rel/path/test.csv".into(),
615
"/".into(),
616
"~".into(),
617
];
618
619
let resolved: Vec<PathBuf> = paths
620
.iter()
621
.map(resolve_homedir)
622
.map(|x| x.into_owned())
623
.collect();
624
625
assert_eq!(resolved[0].file_name(), paths[0].file_name());
626
assert!(resolved[0].is_absolute());
627
assert_eq!(resolved[1], paths[1]);
628
assert_eq!(resolved[2], paths[2]);
629
assert_eq!(resolved[3], paths[3]);
630
assert!(resolved[4].is_absolute());
631
}
632
633
#[cfg(target_os = "windows")]
634
#[test]
635
fn test_resolve_homedir_windows() {
636
let paths: Vec<PathBuf> = vec![
637
r#"c:\Users\user1\test.csv"#.into(),
638
r#"~\user1\test.csv"#.into(),
639
"~".into(),
640
];
641
642
let resolved: Vec<PathBuf> = paths
643
.iter()
644
.map(resolve_homedir)
645
.map(|x| x.into_owned())
646
.collect();
647
648
assert_eq!(resolved[0], paths[0]);
649
assert_eq!(resolved[1].file_name(), paths[1].file_name());
650
assert!(resolved[1].is_absolute());
651
assert!(resolved[2].is_absolute());
652
}
653
654
#[test]
655
fn test_http_path_with_query_parameters_is_not_expanded_as_glob() {
656
// Don't confuse HTTP URL's with query parameters for globs.
657
// See https://github.com/pola-rs/polars/pull/17774
658
659
use super::expand_paths;
660
661
let path = "https://pola.rs/test.csv?token=bear";
662
let paths = &[PlRefPath::new(path)];
663
let out = get_runtime()
664
.block_on(expand_paths(paths, true, &[], &mut None))
665
.unwrap();
666
assert_eq!(out.as_ref(), paths);
667
}
668
}
669
670