Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/path_utils/mod.rs
7884 views
1
use std::collections::VecDeque;
2
use std::path::{Path, PathBuf};
3
use std::sync::LazyLock;
4
5
use arrow::buffer::Buffer;
6
use polars_core::config;
7
use polars_core::error::{PolarsResult, polars_bail, to_compute_err};
8
use polars_utils::pl_str::PlSmallStr;
9
use polars_utils::plpath::{CloudScheme, PlPath, PlPathRef};
10
11
#[cfg(feature = "cloud")]
12
mod hugging_face;
13
14
use crate::cloud::CloudOptions;
15
16
#[allow(clippy::bind_instead_of_map)]
17
pub static POLARS_TEMP_DIR_BASE_PATH: LazyLock<Box<Path>> = LazyLock::new(|| {
18
(|| {
19
let verbose = config::verbose();
20
21
let path = if let Ok(v) = std::env::var("POLARS_TEMP_DIR").map(PathBuf::from) {
22
if verbose {
23
eprintln!("init_temp_dir: sourced from POLARS_TEMP_DIR")
24
}
25
v
26
} else if cfg!(target_family = "unix") {
27
let id = std::env::var("USER")
28
.inspect(|_| {
29
if verbose {
30
eprintln!("init_temp_dir: sourced $USER")
31
}
32
})
33
.or_else(|_e| {
34
// We shouldn't hit here, but we can fallback to hashing $HOME if blake3 is
35
// available (it is available when file_cache is activated).
36
#[cfg(feature = "file_cache")]
37
{
38
std::env::var("HOME")
39
.inspect(|_| {
40
if verbose {
41
eprintln!("init_temp_dir: sourced $HOME")
42
}
43
})
44
.map(|x| blake3::hash(x.as_bytes()).to_hex()[..32].to_string())
45
}
46
#[cfg(not(feature = "file_cache"))]
47
{
48
Err(_e)
49
}
50
});
51
52
if let Ok(v) = id {
53
std::env::temp_dir().join(format!("polars-{v}/"))
54
} else {
55
return Err(std::io::Error::other(
56
"could not load $USER or $HOME environment variables",
57
));
58
}
59
} else if cfg!(target_family = "windows") {
60
// Setting permissions on Windows is not as easy compared to Unix, but fortunately
61
// the default temporary directory location is underneath the user profile, so we
62
// shouldn't need to do anything.
63
std::env::temp_dir().join("polars/")
64
} else {
65
std::env::temp_dir().join("polars/")
66
}
67
.into_boxed_path();
68
69
if let Err(err) = std::fs::create_dir_all(path.as_ref()) {
70
if !path.is_dir() {
71
panic!(
72
"failed to create temporary directory: {} (path = {:?})",
73
err,
74
path.as_ref()
75
);
76
}
77
}
78
79
#[cfg(target_family = "unix")]
80
{
81
use std::os::unix::fs::PermissionsExt;
82
83
let result = (|| {
84
std::fs::set_permissions(path.as_ref(), std::fs::Permissions::from_mode(0o700))?;
85
let perms = std::fs::metadata(path.as_ref())?.permissions();
86
87
if (perms.mode() % 0o1000) != 0o700 {
88
std::io::Result::Err(std::io::Error::other(format!(
89
"permission mismatch: {perms:?}"
90
)))
91
} else {
92
std::io::Result::Ok(())
93
}
94
})()
95
.map_err(|e| {
96
std::io::Error::new(
97
e.kind(),
98
format!(
99
"error setting temporary directory permissions: {} (path = {:?})",
100
e,
101
path.as_ref()
102
),
103
)
104
});
105
106
if std::env::var("POLARS_ALLOW_UNSECURED_TEMP_DIR").as_deref() != Ok("1") {
107
result?;
108
}
109
}
110
111
std::io::Result::Ok(path)
112
})()
113
.map_err(|e| {
114
std::io::Error::new(
115
e.kind(),
116
format!(
117
"error initializing temporary directory: {e} \
118
consider explicitly setting POLARS_TEMP_DIR"
119
),
120
)
121
})
122
.unwrap()
123
});
124
125
/// Replaces a "~" in the Path with the home directory.
126
pub fn resolve_homedir(path: &dyn AsRef<Path>) -> PathBuf {
127
let path = path.as_ref();
128
129
if path.starts_with("~") {
130
// home crate does not compile on wasm https://github.com/rust-lang/cargo/issues/12297
131
#[cfg(not(target_family = "wasm"))]
132
if let Some(homedir) = home::home_dir() {
133
return homedir.join(path.strip_prefix("~").unwrap());
134
}
135
}
136
137
path.into()
138
}
139
140
/// Get the index of the first occurrence of a glob symbol.
141
pub fn get_glob_start_idx(path: &[u8]) -> Option<usize> {
142
memchr::memchr3(b'*', b'?', b'[', path)
143
}
144
145
/// Returns `true` if `expanded_paths` were expanded from a single directory
146
pub fn expanded_from_single_directory(paths: &[PlPath], expanded_paths: &[PlPath]) -> bool {
147
// Single input that isn't a glob
148
paths.len() == 1 && get_glob_start_idx(paths[0].as_ref().strip_scheme().as_bytes()).is_none()
149
// And isn't a file
150
&& {
151
(
152
// For local paths, we can just use `is_dir`
153
paths[0].as_ref().as_local_path().is_some_and(|p| p.is_dir())
154
)
155
|| (
156
// For cloud paths, we determine that the input path isn't a file by checking that the
157
// output path differs.
158
expanded_paths.is_empty() || (paths[0] != expanded_paths[0])
159
)
160
}
161
}
162
163
/// Recursively traverses directories and expands globs if `glob` is `true`.
164
pub fn expand_paths(
165
paths: &[PlPath],
166
glob: bool,
167
hidden_file_prefix: &[PlSmallStr],
168
#[allow(unused_variables)] cloud_options: &mut Option<CloudOptions>,
169
) -> PolarsResult<Buffer<PlPath>> {
170
expand_paths_hive(paths, glob, hidden_file_prefix, cloud_options, false).map(|x| x.0)
171
}
172
173
struct HiveIdxTracker<'a> {
174
idx: usize,
175
paths: &'a [PlPath],
176
check_directory_level: bool,
177
}
178
179
impl HiveIdxTracker<'_> {
180
fn update(&mut self, i: usize, path_idx: usize) -> PolarsResult<()> {
181
let check_directory_level = self.check_directory_level;
182
let paths = self.paths;
183
184
if check_directory_level
185
&& ![usize::MAX, i].contains(&self.idx)
186
// They could still be the same directory level, just with different name length
187
&& (path_idx > 0 && paths[path_idx].as_ref().parent() != paths[path_idx - 1].as_ref().parent())
188
{
189
polars_bail!(
190
InvalidOperation:
191
"attempted to read from different directory levels with hive partitioning enabled: \
192
first path: {}, second path: {}",
193
paths[path_idx - 1].display(),
194
paths[path_idx].display(),
195
)
196
} else {
197
self.idx = std::cmp::min(self.idx, i);
198
Ok(())
199
}
200
}
201
}
202
203
/// Recursively traverses directories and expands globs if `glob` is `true`.
204
/// Returns the expanded paths and the index at which to start parsing hive
205
/// partitions from the path.
206
pub fn expand_paths_hive(
207
paths: &[PlPath],
208
glob: bool,
209
hidden_file_prefix: &[PlSmallStr],
210
#[allow(unused_variables)] cloud_options: &mut Option<CloudOptions>,
211
check_directory_level: bool,
212
) -> PolarsResult<(Buffer<PlPath>, usize)> {
213
let Some(first_path) = paths.first() else {
214
return Ok((vec![].into(), 0));
215
};
216
217
let is_cloud = first_path.as_ref().is_cloud_url();
218
219
let is_hidden_file = move |path: &PlPath| {
220
path.as_ref()
221
.file_name()
222
.and_then(|x| x.to_str())
223
.is_some_and(|file_name| {
224
hidden_file_prefix
225
.iter()
226
.any(|x| file_name.starts_with(x.as_str()))
227
})
228
};
229
230
let mut out_paths = OutPaths {
231
paths: vec![],
232
exts: [None, None],
233
current_idx: 0,
234
is_hidden_file: &is_hidden_file,
235
};
236
237
let mut hive_idx_tracker = HiveIdxTracker {
238
idx: usize::MAX,
239
paths,
240
check_directory_level,
241
};
242
243
if is_cloud || { cfg!(not(target_family = "windows")) && config::force_async() } {
244
#[cfg(feature = "cloud")]
245
{
246
use polars_utils::_limit_path_len_io_err;
247
248
use crate::cloud::object_path_from_str;
249
250
if first_path.cloud_scheme() == Some(CloudScheme::Hf) {
251
let (expand_start_idx, paths) = crate::pl_async::get_runtime().block_in_place_on(
252
hugging_face::expand_paths_hf(
253
paths,
254
check_directory_level,
255
cloud_options,
256
glob,
257
),
258
)?;
259
260
return Ok((paths.into(), expand_start_idx));
261
}
262
263
let format_path = |scheme: &str, bucket: &str, location: &str| {
264
if is_cloud {
265
format!("{scheme}://{bucket}/{location}")
266
} else {
267
format!("/{location}")
268
}
269
};
270
271
let expand_path_cloud = |path: PlPathRef<'_>,
272
cloud_options: Option<&CloudOptions>|
273
-> PolarsResult<(usize, Vec<PlPath>)> {
274
crate::pl_async::get_runtime().block_in_place_on(async {
275
let path_str = path.to_str();
276
277
let (cloud_location, store) =
278
crate::cloud::build_object_store(path, cloud_options, glob).await?;
279
let prefix = object_path_from_str(&cloud_location.prefix)?;
280
281
let out = if !path_str.ends_with("/")
282
&& (!glob || cloud_location.expansion.is_none())
283
&& {
284
// We need to check if it is a directory for local paths (we can be here due
285
// to FORCE_ASYNC). For cloud paths the convention is that the user must add
286
// a trailing slash `/` to scan directories. We don't infer it as that would
287
// mean sending one network request per path serially (very slow).
288
path.is_cloud_url() || path.as_local_path().unwrap().is_file()
289
} {
290
(
291
0,
292
vec![PlPath::from_string(format_path(
293
cloud_location.scheme,
294
&cloud_location.bucket,
295
prefix.as_ref(),
296
))],
297
)
298
} else {
299
use futures::TryStreamExt;
300
301
if let Some(path) = path.as_local_path() {
302
// FORCE_ASYNC in the test suite wants us to raise a proper error message
303
// for non-existent file paths. Note we can't do this for cloud paths as
304
// there is no concept of a "directory" - a non-existent path is
305
// indistinguishable from an empty directory.
306
if !path.is_dir() {
307
path.metadata()
308
.map_err(|err| _limit_path_len_io_err(path, err))?;
309
}
310
}
311
312
let cloud_location = &cloud_location;
313
314
let mut paths = store
315
.try_exec_rebuild_on_err(|store| {
316
let st = store.clone();
317
318
async {
319
let store = st;
320
let out = store
321
.list(Some(&prefix))
322
.try_filter_map(|x| async move {
323
let out = (x.size > 0).then(|| {
324
PlPath::from_string({
325
format_path(
326
cloud_location.scheme,
327
&cloud_location.bucket,
328
x.location.as_ref(),
329
)
330
})
331
});
332
Ok(out)
333
})
334
.try_collect::<Vec<_>>()
335
.await?;
336
337
Ok(out)
338
}
339
})
340
.await?;
341
342
// Since Path::parse() removes any trailing slash ('/'), we may need to restore it
343
// to calculate the right byte offset
344
let mut prefix = prefix.to_string();
345
if path_str.ends_with('/') && !prefix.ends_with('/') {
346
prefix.push('/')
347
};
348
349
paths.sort_unstable();
350
351
(
352
format_path(
353
cloud_location.scheme,
354
&cloud_location.bucket,
355
prefix.as_ref(),
356
)
357
.len(),
358
paths,
359
)
360
};
361
362
PolarsResult::Ok(out)
363
})
364
};
365
366
for (path_idx, path) in paths.iter().enumerate() {
367
use std::borrow::Cow;
368
369
let mut path = Cow::Borrowed(path);
370
371
if matches!(
372
path.cloud_scheme(),
373
Some(CloudScheme::Http | CloudScheme::Https)
374
) {
375
let mut rewrite_aws = false;
376
377
#[cfg(feature = "aws")]
378
if let Some(p) = (|| {
379
use crate::cloud::CloudConfig;
380
381
// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html#virtual-hosted-style-access
382
// Path format: https://bucket-name.s3.region-code.amazonaws.com/key-name
383
let p = path.as_ref().as_ref();
384
let after_scheme = p.strip_scheme();
385
386
let bucket_end = after_scheme.find(".s3.")?;
387
let offset = bucket_end + 4;
388
// Search after offset to prevent matching `.s3.amazonaws.com` (legacy global endpoint URL without region).
389
let region_end = offset + after_scheme[offset..].find(".amazonaws.com/")?;
390
391
// Do not convert if '?' (this can be query parameters for AWS presigned URLs).
392
if after_scheme[..region_end].contains('/') || after_scheme.contains('?') {
393
return None;
394
}
395
396
let bucket = &after_scheme[..bucket_end];
397
let region = &after_scheme[bucket_end + 4..region_end];
398
let key = &after_scheme[region_end + 15..];
399
400
if let CloudConfig::Aws(configs) = cloud_options
401
.get_or_insert_default()
402
.config
403
.get_or_insert_with(|| CloudConfig::Aws(Vec::with_capacity(1)))
404
{
405
use object_store::aws::AmazonS3ConfigKey;
406
407
if !matches!(configs.last(), Some((AmazonS3ConfigKey::Region, _))) {
408
configs.push((AmazonS3ConfigKey::Region, region.into()))
409
}
410
}
411
412
Some(format!("s3://{bucket}/{key}"))
413
})() {
414
path = Cow::Owned(PlPath::from_string(p));
415
rewrite_aws = true;
416
}
417
418
if !rewrite_aws {
419
out_paths.push(path.into_owned());
420
hive_idx_tracker.update(0, path_idx)?;
421
continue;
422
}
423
}
424
425
let glob_start_idx = get_glob_start_idx(path.to_str().as_bytes());
426
427
let path = if glob && glob_start_idx.is_some() {
428
path.clone()
429
} else {
430
let (expand_start_idx, paths) =
431
expand_path_cloud(path.as_ref().as_ref(), cloud_options.as_ref())?;
432
out_paths.extend_from_slice(&paths);
433
hive_idx_tracker.update(expand_start_idx, path_idx)?;
434
continue;
435
};
436
437
hive_idx_tracker.update(0, path_idx)?;
438
439
let iter = crate::pl_async::get_runtime().block_in_place_on(crate::async_glob(
440
path.as_ref().as_ref(),
441
cloud_options.as_ref(),
442
))?;
443
444
if is_cloud {
445
out_paths.extend(iter.into_iter().map(PlPath::from_string));
446
} else {
447
// FORCE_ASYNC, remove leading file:// as not all readers support it.
448
out_paths.extend(
449
iter.iter()
450
.map(|x| &x[7..])
451
.map(|s| PlPathRef::new(s).into_owned()),
452
)
453
}
454
}
455
}
456
#[cfg(not(feature = "cloud"))]
457
panic!("Feature `cloud` must be enabled to use globbing patterns with cloud urls.")
458
} else {
459
let mut stack = VecDeque::new();
460
let mut paths_scratch = vec![];
461
462
for (path_idx, path) in paths.iter().enumerate() {
463
let path = path.as_ref();
464
let path = path.as_local_path().unwrap();
465
stack.clear();
466
467
if path.is_dir() {
468
let path = path.to_path_buf();
469
470
let i = path.to_str().unwrap().len();
471
472
hive_idx_tracker.update(i, path_idx)?;
473
474
stack.push_back(path.clone());
475
476
while let Some(dir) = stack.pop_front() {
477
let mut last_err = Ok(());
478
479
paths_scratch.clear();
480
paths_scratch.extend(std::fs::read_dir(dir)?.map_while(|x| match x {
481
Ok(v) => Some(v.path()),
482
Err(e) => {
483
last_err = Err(e);
484
None
485
},
486
}));
487
488
last_err?;
489
490
paths_scratch.sort_unstable();
491
492
for path in paths_scratch.drain(..) {
493
if path.is_dir() {
494
stack.push_back(path);
495
} else if path.metadata()?.len() > 0 {
496
out_paths.push(PlPath::Local(path.into()));
497
}
498
}
499
}
500
501
continue;
502
}
503
504
let i = get_glob_start_idx(path.to_str().unwrap().as_bytes());
505
506
if glob && i.is_some() {
507
hive_idx_tracker.update(0, path_idx)?;
508
509
let Ok(paths) = glob::glob(path.to_str().unwrap()) else {
510
polars_bail!(ComputeError: "invalid glob pattern given")
511
};
512
513
for path in paths {
514
let path = path.map_err(to_compute_err)?;
515
if !path.is_dir() && path.metadata()?.len() > 0 {
516
out_paths.push(PlPath::Local(path.into()));
517
}
518
}
519
} else {
520
hive_idx_tracker.update(0, path_idx)?;
521
out_paths.push(PlPath::Local(path.into()));
522
}
523
}
524
}
525
526
assert_eq!(out_paths.current_idx, out_paths.paths.len());
527
528
if expanded_from_single_directory(paths, out_paths.paths.as_slice()) {
529
if let [Some((_, i1)), Some((_, i2))] = out_paths.exts {
530
polars_bail!(
531
InvalidOperation: "directory contained paths with different file extensions: \
532
first path: {}, second path: {}. Please use a glob pattern to explicitly specify \
533
which files to read (e.g. 'dir/**/*', 'dir/**/*.parquet')",
534
&out_paths.paths[i1].display(), &out_paths.paths[i2].display()
535
)
536
}
537
}
538
539
return Ok((out_paths.paths.into(), hive_idx_tracker.idx));
540
541
/// Wrapper around `Vec<PathBuf>` that also tracks file extensions, so that
542
/// we don't have to traverse the entire list again to validate extensions.
543
struct OutPaths<'a, F: Fn(&PlPath) -> bool> {
544
paths: Vec<PlPath>,
545
exts: [Option<(PlSmallStr, usize)>; 2],
546
current_idx: usize,
547
is_hidden_file: &'a F,
548
}
549
550
impl<F> OutPaths<'_, F>
551
where
552
F: Fn(&PlPath) -> bool,
553
{
554
fn push(&mut self, value: PlPath) {
555
if (self.is_hidden_file)(&value) {
556
return;
557
}
558
559
let current_idx = &mut self.current_idx;
560
let exts = &mut self.exts;
561
Self::update_ext_status(current_idx, exts, value.as_ref());
562
563
self.paths.push(value)
564
}
565
566
fn extend(&mut self, values: impl IntoIterator<Item = PlPath>) {
567
let current_idx = &mut self.current_idx;
568
let exts = &mut self.exts;
569
570
self.paths.extend(
571
values
572
.into_iter()
573
.filter(|x| !(self.is_hidden_file)(x))
574
.inspect(|x| {
575
Self::update_ext_status(current_idx, exts, x.as_ref());
576
}),
577
)
578
}
579
580
fn extend_from_slice(&mut self, values: &[PlPath]) {
581
self.extend(values.iter().cloned())
582
}
583
584
fn update_ext_status(
585
current_idx: &mut usize,
586
exts: &mut [Option<(PlSmallStr, usize)>; 2],
587
value: PlPathRef,
588
) {
589
let ext = value
590
.extension()
591
.map(PlSmallStr::from)
592
.unwrap_or(PlSmallStr::EMPTY);
593
594
if exts[0].is_none() {
595
exts[0] = Some((ext, *current_idx));
596
} else if exts[1].is_none() && ext != exts[0].as_ref().unwrap().0 {
597
exts[1] = Some((ext, *current_idx));
598
}
599
600
*current_idx += 1;
601
}
602
}
603
}
604
605
/// Ignores errors from `std::fs::create_dir_all` if the directory exists.
606
#[cfg(feature = "file_cache")]
607
pub(crate) fn ensure_directory_init(path: &Path) -> std::io::Result<()> {
608
let result = std::fs::create_dir_all(path);
609
610
if path.is_dir() { Ok(()) } else { result }
611
}
612
613
#[cfg(test)]
614
mod tests {
615
use std::path::PathBuf;
616
617
use polars_utils::plpath::PlPath;
618
619
use super::resolve_homedir;
620
621
#[cfg(not(target_os = "windows"))]
622
#[test]
623
fn test_resolve_homedir() {
624
let paths: Vec<PathBuf> = vec![
625
"~/dir1/dir2/test.csv".into(),
626
"/abs/path/test.csv".into(),
627
"rel/path/test.csv".into(),
628
"/".into(),
629
"~".into(),
630
];
631
632
let resolved: Vec<PathBuf> = paths.iter().map(|x| resolve_homedir(x)).collect();
633
634
assert_eq!(resolved[0].file_name(), paths[0].file_name());
635
assert!(resolved[0].is_absolute());
636
assert_eq!(resolved[1], paths[1]);
637
assert_eq!(resolved[2], paths[2]);
638
assert_eq!(resolved[3], paths[3]);
639
assert!(resolved[4].is_absolute());
640
}
641
642
#[cfg(target_os = "windows")]
643
#[test]
644
fn test_resolve_homedir_windows() {
645
let paths: Vec<PathBuf> = vec![
646
r#"c:\Users\user1\test.csv"#.into(),
647
r#"~\user1\test.csv"#.into(),
648
"~".into(),
649
];
650
651
let resolved: Vec<PathBuf> = paths.iter().map(|x| resolve_homedir(x)).collect();
652
653
assert_eq!(resolved[0], paths[0]);
654
assert_eq!(resolved[1].file_name(), paths[1].file_name());
655
assert!(resolved[1].is_absolute());
656
assert!(resolved[2].is_absolute());
657
}
658
659
#[test]
660
fn test_http_path_with_query_parameters_is_not_expanded_as_glob() {
661
// Don't confuse HTTP URL's with query parameters for globs.
662
// See https://github.com/pola-rs/polars/pull/17774
663
664
use super::expand_paths;
665
666
let path = "https://pola.rs/test.csv?token=bear";
667
let paths = &[PlPath::new(path)];
668
let out = expand_paths(paths, true, &[], &mut None).unwrap();
669
assert_eq!(out.as_ref(), paths);
670
}
671
}
672
673