Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/cloud/glob.rs
8424 views
1
use std::borrow::Cow;
2
3
use futures::TryStreamExt;
4
use object_store::path::Path;
5
use polars_error::{PolarsResult, polars_bail, polars_err};
6
use polars_utils::pl_path::{CloudScheme, PlRefPath};
7
use polars_utils::pl_str::PlSmallStr;
8
use regex::Regex;
9
10
use super::CloudOptions;
11
12
/// Converts a glob to regex form.
13
///
14
/// # Returns
15
/// 1. the prefix part (all path components until the first one with '*')
16
/// 2. a regular expression representation of the rest.
17
pub(crate) fn extract_prefix_expansion(path: &str) -> PolarsResult<(Cow<'_, str>, Option<String>)> {
18
// (offset, len, replacement)
19
let mut replacements: Vec<(usize, usize, &[u8])> = vec![];
20
21
// The position after the last slash before glob characters begin.
22
// `a/b/c*/`
23
// ^
24
let mut pos: usize = if let Some(after_last_slash) =
25
memchr::memchr2(b'*', b'[', path.as_bytes()).map(|i| {
26
path.as_bytes()[..i]
27
.iter()
28
.rposition(|x| *x == b'/')
29
.map_or(0, |x| 1 + x)
30
}) {
31
// First value is used as the starting point later.
32
replacements.push((after_last_slash, 0, &[]));
33
after_last_slash
34
} else {
35
usize::MAX
36
};
37
38
while pos < path.len() {
39
match memchr::memchr2(b'*', b'.', &path.as_bytes()[pos..]) {
40
None => break,
41
Some(i) => pos += i,
42
}
43
44
let (len, replace): (usize, &[u8]) = match &path[pos..] {
45
// Accept:
46
// - `**/`
47
// - `**` only if it is the end of the path
48
v if v.starts_with("**") && (v.len() == 2 || v.as_bytes()[2] == b'/') => {
49
// Wrapping in a capture group ensures we also match non-nested paths.
50
(3, b"(.*/)?" as _)
51
},
52
v if v.starts_with("**") => {
53
polars_bail!(ComputeError: "invalid ** glob pattern")
54
},
55
v if v.starts_with('*') => (1, b"[^/]*" as _),
56
// Dots need to be escaped in regex.
57
v if v.starts_with('.') => (1, b"\\." as _),
58
_ => {
59
pos += 1;
60
continue;
61
},
62
};
63
64
replacements.push((pos, len, replace));
65
pos += len;
66
}
67
68
if replacements.is_empty() {
69
return Ok((Cow::Borrowed(path), None));
70
}
71
72
let prefix = Cow::Borrowed(&path[..replacements[0].0]);
73
74
let mut pos = replacements[0].0;
75
let mut expansion = Vec::with_capacity(path.len() - pos);
76
expansion.push(b'^');
77
78
for (offset, len, replace) in replacements {
79
expansion.extend_from_slice(&path.as_bytes()[pos..offset]);
80
expansion.extend_from_slice(replace);
81
pos = offset + len;
82
}
83
84
if pos < path.len() {
85
expansion.extend_from_slice(&path.as_bytes()[pos..]);
86
}
87
88
expansion.push(b'$');
89
90
Ok((prefix, Some(String::from_utf8(expansion).unwrap())))
91
}
92
93
/// A location on cloud storage, may have wildcards.
94
#[derive(PartialEq, Debug, Default)]
95
pub struct CloudLocation {
96
/// The scheme (s3, ...).
97
pub scheme: &'static str,
98
/// The bucket name.
99
pub bucket: PlSmallStr,
100
/// The prefix inside the bucket, this will be the full key when wildcards are not used.
101
pub prefix: String,
102
/// The path components that need to be expanded.
103
pub expansion: Option<PlSmallStr>,
104
}
105
106
impl CloudLocation {
107
pub fn new(path: PlRefPath, glob: bool) -> PolarsResult<Self> {
108
if let Some(scheme @ CloudScheme::Http | scheme @ CloudScheme::Https) = path.scheme() {
109
// Http/s does not use this
110
return Ok(CloudLocation {
111
scheme: scheme.as_str(),
112
..Default::default()
113
});
114
}
115
116
let path_is_local = matches!(
117
path.scheme(),
118
None | Some(CloudScheme::File | CloudScheme::FileNoHostname)
119
);
120
121
let (bucket, key) = path
122
.strip_scheme_split_authority()
123
.ok_or(Cow::Borrowed(
124
"could not extract bucket/key (path did not contain '/')",
125
))
126
.and_then(|x @ (bucket, _)| {
127
let bucket_is_empty = bucket.is_empty();
128
129
if path_is_local && !bucket_is_empty {
130
Err(Cow::Owned(format!(
131
"unsupported: non-empty hostname for 'file:' URI: '{bucket}'",
132
)))
133
} else if bucket_is_empty && !path_is_local {
134
Err(Cow::Borrowed("empty bucket name"))
135
} else {
136
Ok(x)
137
}
138
})
139
.map_err(|failed_reason| {
140
polars_err!(
141
ComputeError:
142
"failed to create CloudLocation: {} (path: '{}')",
143
failed_reason,
144
path,
145
)
146
})?;
147
148
let key = if path_is_local {
149
key
150
} else {
151
key.strip_prefix('/').unwrap_or(key)
152
};
153
154
let (prefix, expansion) = if glob {
155
let (prefix, expansion) = extract_prefix_expansion(key)?;
156
157
assert_eq!(prefix.starts_with('/'), key.starts_with('/'));
158
159
(prefix, expansion.map(|x| x.into()))
160
} else {
161
(key.into(), None)
162
};
163
164
Ok(CloudLocation {
165
scheme: path.scheme().unwrap_or(CloudScheme::File).as_str(),
166
bucket: PlSmallStr::from_str(bucket),
167
prefix: prefix.into_owned(),
168
expansion,
169
})
170
}
171
}
172
173
/// Return a full url from a key relative to the given location.
174
fn full_url(scheme: &str, bucket: &str, key: Path) -> String {
175
format!("{scheme}://{bucket}/{key}")
176
}
177
178
/// A simple matcher, if more is required consider depending on https://crates.io/crates/globset.
179
/// The Cloud list api returns a list of all the file names under a prefix, there is no additional cost of `readdir`.
180
pub(crate) struct Matcher {
181
prefix: String,
182
re: Option<Regex>,
183
}
184
185
impl Matcher {
186
/// Build a Matcher for the given prefix and expansion.
187
pub(crate) fn new(prefix: String, expansion: Option<&str>) -> PolarsResult<Matcher> {
188
// Cloud APIs accept a prefix without any expansion, extract it.
189
let re = expansion
190
.map(polars_utils::regex_cache::compile_regex)
191
.transpose()?;
192
Ok(Matcher { prefix, re })
193
}
194
195
pub(crate) fn is_matching(&self, key: &str) -> bool {
196
if !key.starts_with(self.prefix.as_str()) {
197
// Prefix does not match, should not happen.
198
return false;
199
}
200
if self.re.is_none() {
201
return true;
202
}
203
let last = &key[self.prefix.len()..];
204
self.re.as_ref().unwrap().is_match(last.as_ref())
205
}
206
}
207
208
/// List files with a prefix derived from the pattern.
209
pub async fn glob(
210
url: PlRefPath,
211
cloud_options: Option<&CloudOptions>,
212
) -> PolarsResult<Vec<String>> {
213
// Find the fixed prefix, up to the first '*'.
214
215
let (
216
CloudLocation {
217
scheme,
218
bucket,
219
prefix,
220
expansion,
221
},
222
store,
223
) = super::build_object_store(url, cloud_options, true).await?;
224
let matcher = &Matcher::new(
225
if scheme == "file" {
226
// For local paths the returned location has the leading slash stripped.
227
prefix[1..].into()
228
} else {
229
prefix.clone()
230
},
231
expansion.as_deref(),
232
)?;
233
234
let path = Path::from(prefix.as_str());
235
let path = Some(&path);
236
237
let mut locations = store
238
.exec_with_rebuild_retry_on_err(|store| async move {
239
store
240
.list(path)
241
.try_filter_map(|x| async move {
242
let out = (x.size > 0 && matcher.is_matching(x.location.as_ref()))
243
.then_some(x.location);
244
Ok(out)
245
})
246
.try_collect::<Vec<_>>()
247
.await
248
})
249
.await?;
250
251
locations.sort_unstable();
252
Ok(locations
253
.into_iter()
254
.map(|l| full_url(scheme, &bucket, l))
255
.collect::<Vec<_>>())
256
}
257
258
#[cfg(test)]
259
mod test {
260
use super::*;
261
262
#[test]
263
fn test_cloud_location() {
264
assert_eq!(
265
CloudLocation::new(PlRefPath::new("s3://a/b"), true).unwrap(),
266
CloudLocation {
267
scheme: "s3",
268
bucket: "a".into(),
269
prefix: "b".into(),
270
expansion: None,
271
}
272
);
273
assert_eq!(
274
CloudLocation::new(PlRefPath::new("s3://a/b/*.c"), true).unwrap(),
275
CloudLocation {
276
scheme: "s3",
277
bucket: "a".into(),
278
prefix: "b/".into(),
279
expansion: Some("^[^/]*\\.c$".into()),
280
}
281
);
282
assert_eq!(
283
CloudLocation::new(PlRefPath::new("file:///a/b"), true).unwrap(),
284
CloudLocation {
285
scheme: "file",
286
bucket: "".into(),
287
prefix: "/a/b".into(),
288
expansion: None,
289
}
290
);
291
assert_eq!(
292
CloudLocation::new(PlRefPath::new("file:/a/b"), true).unwrap(),
293
CloudLocation {
294
scheme: "file",
295
bucket: "".into(),
296
prefix: "/a/b".into(),
297
expansion: None,
298
}
299
);
300
}
301
302
#[test]
303
fn test_extract_prefix_expansion() {
304
assert!(extract_prefix_expansion("**url").is_err());
305
assert_eq!(
306
extract_prefix_expansion("a/b.c").unwrap(),
307
("a/b.c".into(), None)
308
);
309
assert_eq!(
310
extract_prefix_expansion("a/**").unwrap(),
311
("a/".into(), Some("^(.*/)?$".into()))
312
);
313
assert_eq!(
314
extract_prefix_expansion("a/**/b").unwrap(),
315
("a/".into(), Some("^(.*/)?b$".into()))
316
);
317
assert_eq!(
318
extract_prefix_expansion("a/**/*b").unwrap(),
319
("a/".into(), Some("^(.*/)?[^/]*b$".into()))
320
);
321
assert_eq!(
322
extract_prefix_expansion("a/**/data/*b").unwrap(),
323
("a/".into(), Some("^(.*/)?data/[^/]*b$".into()))
324
);
325
assert_eq!(
326
extract_prefix_expansion("a/*b").unwrap(),
327
("a/".into(), Some("^[^/]*b$".into()))
328
);
329
}
330
331
#[test]
332
fn test_matcher_file_name() {
333
let cloud_location =
334
CloudLocation::new(PlRefPath::new("s3://bucket/folder/*.parquet"), true).unwrap();
335
let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
336
// Regular match.
337
assert!(a.is_matching(Path::from("folder/1.parquet").as_ref()));
338
// Require . in the file name.
339
assert!(!a.is_matching(Path::from("folder/1parquet").as_ref()));
340
// Intermediary folders are not allowed.
341
assert!(!a.is_matching(Path::from("folder/other/1.parquet").as_ref()));
342
}
343
344
#[test]
345
fn test_matcher_folders() {
346
let cloud_location =
347
CloudLocation::new(PlRefPath::new("s3://bucket/folder/**/*.parquet"), true).unwrap();
348
349
let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
350
// Intermediary folders are optional.
351
assert!(a.is_matching(Path::from("folder/1.parquet").as_ref()));
352
// Intermediary folders are allowed.
353
assert!(a.is_matching(Path::from("folder/other/1.parquet").as_ref()));
354
355
let cloud_location =
356
CloudLocation::new(PlRefPath::new("s3://bucket/folder/**/data/*.parquet"), true)
357
.unwrap();
358
let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
359
360
// Required folder `data` is missing.
361
assert!(!a.is_matching(Path::from("folder/1.parquet").as_ref()));
362
// Required folder is present.
363
assert!(a.is_matching(Path::from("folder/data/1.parquet").as_ref()));
364
// Required folder is present and additional folders are allowed.
365
assert!(a.is_matching(Path::from("folder/other/data/1.parquet").as_ref()));
366
}
367
368
#[test]
369
fn test_cloud_location_no_glob() {
370
let cloud_location = CloudLocation::new(PlRefPath::new("s3://bucket/[*"), false).unwrap();
371
assert_eq!(
372
cloud_location,
373
CloudLocation {
374
scheme: "s3",
375
bucket: "bucket".into(),
376
prefix: "[*".into(),
377
expansion: None,
378
},
379
)
380
}
381
382
#[test]
383
fn test_cloud_location_percentages() {
384
use super::CloudLocation;
385
386
let path = "s3://bucket/%25";
387
let cloud_location = CloudLocation::new(PlRefPath::new(path), true).unwrap();
388
389
assert_eq!(
390
cloud_location,
391
CloudLocation {
392
scheme: "s3",
393
bucket: "bucket".into(),
394
prefix: "%25".into(),
395
expansion: None,
396
}
397
);
398
399
let path = "https://pola.rs/%25";
400
let cloud_location = CloudLocation::new(PlRefPath::new(path), true).unwrap();
401
402
assert_eq!(
403
cloud_location,
404
CloudLocation {
405
scheme: "https",
406
bucket: "".into(),
407
prefix: "".into(),
408
expansion: None,
409
}
410
);
411
}
412
413
#[test]
414
fn test_glob_wildcard_21736() {
415
let path = "s3://bucket/folder/**/data.parquet";
416
let cloud_location = CloudLocation::new(PlRefPath::new(path), true).unwrap();
417
418
let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
419
420
assert!(!a.is_matching("folder/_data.parquet"));
421
422
assert!(a.is_matching("folder/data.parquet"));
423
assert!(a.is_matching("folder/abc/data.parquet"));
424
assert!(a.is_matching("folder/abc/def/data.parquet"));
425
426
let path = "s3://bucket/folder/data_*.parquet";
427
let cloud_location = CloudLocation::new(PlRefPath::new(path), true).unwrap();
428
429
let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
430
431
assert!(!a.is_matching("folder/data_1.ipc"))
432
}
433
}
434
435