Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/cloud/glob.rs
6939 views
1
use std::borrow::Cow;
2
3
use futures::TryStreamExt;
4
use object_store::path::Path;
5
use polars_core::error::to_compute_err;
6
use polars_error::{PolarsResult, polars_bail};
7
use polars_utils::format_pl_smallstr;
8
use polars_utils::pl_str::PlSmallStr;
9
use regex::Regex;
10
use url::Url;
11
12
use super::{CloudOptions, parse_url};
13
14
const DELIMITER: char = '/';
15
16
/// Converts a glob to regex form.
17
///
18
/// # Returns
19
/// 1. the prefix part (all path components until the first one with '*')
20
/// 2. a regular expression representation of the rest.
21
pub(crate) fn extract_prefix_expansion(url: &str) -> PolarsResult<(Cow<'_, str>, Option<String>)> {
22
let url = url.strip_prefix('/').unwrap_or(url);
23
// (offset, len, replacement)
24
let mut replacements: Vec<(usize, usize, &[u8])> = vec![];
25
26
// The position after the last slash before glob characters begin.
27
// `a/b/c*/`
28
// ^
29
let mut pos: usize = if let Some(after_last_slash) = memchr::memchr2(b'*', b'[', url.as_bytes())
30
.map(|i| {
31
url.as_bytes()[..i]
32
.iter()
33
.rposition(|x| *x == b'/')
34
.map_or(0, |x| 1 + x)
35
}) {
36
// First value is used as the starting point later.
37
replacements.push((after_last_slash, 0, &[]));
38
after_last_slash
39
} else {
40
usize::MAX
41
};
42
43
while pos < url.len() {
44
match memchr::memchr2(b'*', b'.', &url.as_bytes()[pos..]) {
45
None => break,
46
Some(i) => pos += i,
47
}
48
49
let (len, replace): (usize, &[u8]) = match &url[pos..] {
50
// Accept:
51
// - `**/`
52
// - `**` only if it is the end of the url
53
v if v.starts_with("**") && (v.len() == 2 || v.as_bytes()[2] == b'/') => {
54
// Wrapping in a capture group ensures we also match non-nested paths.
55
(3, b"(.*/)?" as _)
56
},
57
v if v.starts_with("**") => {
58
polars_bail!(ComputeError: "invalid ** glob pattern")
59
},
60
v if v.starts_with('*') => (1, b"[^/]*" as _),
61
// Dots need to be escaped in regex.
62
v if v.starts_with('.') => (1, b"\\." as _),
63
_ => {
64
pos += 1;
65
continue;
66
},
67
};
68
69
replacements.push((pos, len, replace));
70
pos += len;
71
}
72
73
if replacements.is_empty() {
74
return Ok((Cow::Borrowed(url), None));
75
}
76
77
let prefix = Cow::Borrowed(&url[..replacements[0].0]);
78
79
let mut pos = replacements[0].0;
80
let mut expansion = Vec::with_capacity(url.len() - pos);
81
expansion.push(b'^');
82
83
for (offset, len, replace) in replacements {
84
expansion.extend_from_slice(&url.as_bytes()[pos..offset]);
85
expansion.extend_from_slice(replace);
86
pos = offset + len;
87
}
88
89
if pos < url.len() {
90
expansion.extend_from_slice(&url.as_bytes()[pos..]);
91
}
92
93
expansion.push(b'$');
94
95
Ok((prefix, Some(String::from_utf8(expansion).unwrap())))
96
}
97
98
/// A location on cloud storage, may have wildcards.
99
#[derive(PartialEq, Debug, Default)]
100
pub struct CloudLocation {
101
/// The scheme (s3, ...).
102
pub scheme: PlSmallStr,
103
/// The bucket name.
104
pub bucket: PlSmallStr,
105
/// The prefix inside the bucket, this will be the full key when wildcards are not used.
106
pub prefix: String,
107
/// The path components that need to be expanded.
108
pub expansion: Option<PlSmallStr>,
109
}
110
111
impl CloudLocation {
112
pub fn from_url(parsed: &Url, glob: bool) -> PolarsResult<CloudLocation> {
113
let is_local = parsed.scheme() == "file";
114
let (bucket, key) = if is_local {
115
("".into(), parsed.path())
116
} else {
117
if parsed.scheme().starts_with("http") {
118
return Ok(CloudLocation {
119
scheme: parsed.scheme().into(),
120
..Default::default()
121
});
122
}
123
124
let key = parsed.path();
125
126
let bucket = format_pl_smallstr!(
127
"{}",
128
&parsed[url::Position::BeforeUsername..url::Position::AfterPort]
129
);
130
131
if bucket.is_empty() {
132
polars_bail!(ComputeError: "CloudLocation::from_url(): empty bucket: {}", parsed);
133
}
134
135
(bucket, key)
136
};
137
138
let key = percent_encoding::percent_decode_str(key)
139
.decode_utf8()
140
.map_err(to_compute_err)?;
141
let (prefix, expansion) = if glob {
142
let (prefix, expansion) = extract_prefix_expansion(&key)?;
143
let mut prefix = prefix.into_owned();
144
if is_local && key.starts_with(DELIMITER) && !prefix.starts_with(DELIMITER) {
145
prefix.insert(0, DELIMITER);
146
}
147
(prefix, expansion.map(|x| x.into()))
148
} else {
149
(key.as_ref().into(), None)
150
};
151
152
Ok(CloudLocation {
153
scheme: parsed.scheme().into(),
154
bucket,
155
prefix,
156
expansion,
157
})
158
}
159
160
/// Parse a CloudLocation from an url.
161
pub fn new(url: &str, glob: bool) -> PolarsResult<CloudLocation> {
162
let parsed = parse_url(url).map_err(to_compute_err)?;
163
Self::from_url(&parsed, glob)
164
}
165
}
166
167
/// Return a full url from a key relative to the given location.
168
fn full_url(scheme: &str, bucket: &str, key: Path) -> String {
169
format!("{scheme}://{bucket}/{key}")
170
}
171
172
/// A simple matcher, if more is required consider depending on https://crates.io/crates/globset.
173
/// The Cloud list api returns a list of all the file names under a prefix, there is no additional cost of `readdir`.
174
pub(crate) struct Matcher {
175
prefix: String,
176
re: Option<Regex>,
177
}
178
179
impl Matcher {
180
/// Build a Matcher for the given prefix and expansion.
181
pub(crate) fn new(prefix: String, expansion: Option<&str>) -> PolarsResult<Matcher> {
182
// Cloud APIs accept a prefix without any expansion, extract it.
183
let re = expansion
184
.map(polars_utils::regex_cache::compile_regex)
185
.transpose()?;
186
Ok(Matcher { prefix, re })
187
}
188
189
pub(crate) fn is_matching(&self, key: &str) -> bool {
190
if !key.starts_with(self.prefix.as_str()) {
191
// Prefix does not match, should not happen.
192
return false;
193
}
194
if self.re.is_none() {
195
return true;
196
}
197
let last = &key[self.prefix.len()..];
198
self.re.as_ref().unwrap().is_match(last.as_ref())
199
}
200
}
201
202
/// List files with a prefix derived from the pattern.
203
pub async fn glob(url: &str, cloud_options: Option<&CloudOptions>) -> PolarsResult<Vec<String>> {
204
// Find the fixed prefix, up to the first '*'.
205
206
let (
207
CloudLocation {
208
scheme,
209
bucket,
210
prefix,
211
expansion,
212
},
213
store,
214
) = super::build_object_store(url, cloud_options, true).await?;
215
let matcher = &Matcher::new(
216
if scheme == "file" {
217
// For local paths the returned location has the leading slash stripped.
218
prefix[1..].into()
219
} else {
220
prefix.clone()
221
},
222
expansion.as_deref(),
223
)?;
224
225
let path = Path::from(prefix.as_str());
226
let path = Some(&path);
227
228
let mut locations = store
229
.try_exec_rebuild_on_err(|store| {
230
let st = store.clone();
231
232
async {
233
let store = st;
234
store
235
.list(path)
236
.try_filter_map(|x| async move {
237
let out = (x.size > 0 && matcher.is_matching(x.location.as_ref()))
238
.then_some(x.location);
239
Ok(out)
240
})
241
.try_collect::<Vec<_>>()
242
.await
243
.map_err(to_compute_err)
244
}
245
})
246
.await?;
247
248
locations.sort_unstable();
249
Ok(locations
250
.into_iter()
251
.map(|l| full_url(&scheme, &bucket, l))
252
.collect::<Vec<_>>())
253
}
254
255
#[cfg(test)]
256
mod test {
257
use super::*;
258
259
#[test]
260
fn test_cloud_location() {
261
assert_eq!(
262
CloudLocation::new("s3://a/b", true).unwrap(),
263
CloudLocation {
264
scheme: "s3".into(),
265
bucket: "a".into(),
266
prefix: "b".into(),
267
expansion: None,
268
}
269
);
270
assert_eq!(
271
CloudLocation::new("s3://a/b/*.c", true).unwrap(),
272
CloudLocation {
273
scheme: "s3".into(),
274
bucket: "a".into(),
275
prefix: "b/".into(),
276
expansion: Some("^[^/]*\\.c$".into()),
277
}
278
);
279
assert_eq!(
280
CloudLocation::new("file:///a/b", true).unwrap(),
281
CloudLocation {
282
scheme: "file".into(),
283
bucket: "".into(),
284
prefix: "/a/b".into(),
285
expansion: None,
286
}
287
);
288
}
289
290
#[test]
291
fn test_extract_prefix_expansion() {
292
assert!(extract_prefix_expansion("**url").is_err());
293
assert_eq!(
294
extract_prefix_expansion("a/b.c").unwrap(),
295
("a/b.c".into(), None)
296
);
297
assert_eq!(
298
extract_prefix_expansion("a/**").unwrap(),
299
("a/".into(), Some("^(.*/)?$".into()))
300
);
301
assert_eq!(
302
extract_prefix_expansion("a/**/b").unwrap(),
303
("a/".into(), Some("^(.*/)?b$".into()))
304
);
305
assert_eq!(
306
extract_prefix_expansion("a/**/*b").unwrap(),
307
("a/".into(), Some("^(.*/)?[^/]*b$".into()))
308
);
309
assert_eq!(
310
extract_prefix_expansion("a/**/data/*b").unwrap(),
311
("a/".into(), Some("^(.*/)?data/[^/]*b$".into()))
312
);
313
assert_eq!(
314
extract_prefix_expansion("a/*b").unwrap(),
315
("a/".into(), Some("^[^/]*b$".into()))
316
);
317
}
318
319
#[test]
320
fn test_matcher_file_name() {
321
let cloud_location = CloudLocation::new("s3://bucket/folder/*.parquet", true).unwrap();
322
let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
323
// Regular match.
324
assert!(a.is_matching(Path::from("folder/1.parquet").as_ref()));
325
// Require . in the file name.
326
assert!(!a.is_matching(Path::from("folder/1parquet").as_ref()));
327
// Intermediary folders are not allowed.
328
assert!(!a.is_matching(Path::from("folder/other/1.parquet").as_ref()));
329
}
330
331
#[test]
332
fn test_matcher_folders() {
333
let cloud_location = CloudLocation::new("s3://bucket/folder/**/*.parquet", true).unwrap();
334
335
let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
336
// Intermediary folders are optional.
337
assert!(a.is_matching(Path::from("folder/1.parquet").as_ref()));
338
// Intermediary folders are allowed.
339
assert!(a.is_matching(Path::from("folder/other/1.parquet").as_ref()));
340
341
let cloud_location =
342
CloudLocation::new("s3://bucket/folder/**/data/*.parquet", true).unwrap();
343
let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
344
345
// Required folder `data` is missing.
346
assert!(!a.is_matching(Path::from("folder/1.parquet").as_ref()));
347
// Required folder is present.
348
assert!(a.is_matching(Path::from("folder/data/1.parquet").as_ref()));
349
// Required folder is present and additional folders are allowed.
350
assert!(a.is_matching(Path::from("folder/other/data/1.parquet").as_ref()));
351
}
352
353
#[test]
354
fn test_cloud_location_no_glob() {
355
let cloud_location = CloudLocation::new("s3://bucket/[*", false).unwrap();
356
assert_eq!(
357
cloud_location,
358
CloudLocation {
359
scheme: "s3".into(),
360
bucket: "bucket".into(),
361
prefix: "/[*".into(),
362
expansion: None,
363
},
364
)
365
}
366
367
#[test]
368
fn test_cloud_location_percentages() {
369
use super::CloudLocation;
370
371
let path = "s3://bucket/%25";
372
let cloud_location = CloudLocation::new(path, true).unwrap();
373
374
assert_eq!(
375
cloud_location,
376
CloudLocation {
377
scheme: "s3".into(),
378
bucket: "bucket".into(),
379
prefix: "%25".into(),
380
expansion: None,
381
}
382
);
383
384
let path = "https://pola.rs/%25";
385
let cloud_location = CloudLocation::new(path, true).unwrap();
386
387
assert_eq!(
388
cloud_location,
389
CloudLocation {
390
scheme: "https".into(),
391
bucket: "".into(),
392
prefix: "".into(),
393
expansion: None,
394
}
395
);
396
}
397
398
#[test]
399
fn test_glob_wildcard_21736() {
400
let url = "s3://bucket/folder/**/data.parquet";
401
let cloud_location = CloudLocation::new(url, true).unwrap();
402
403
let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
404
405
assert!(!a.is_matching("folder/_data.parquet"));
406
407
assert!(a.is_matching("folder/data.parquet"));
408
assert!(a.is_matching("folder/abc/data.parquet"));
409
assert!(a.is_matching("folder/abc/def/data.parquet"));
410
411
let url = "s3://bucket/folder/data_*.parquet";
412
let cloud_location = CloudLocation::new(url, true).unwrap();
413
414
let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
415
416
assert!(!a.is_matching("folder/data_1.ipc"))
417
}
418
}
419
420