Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/dsl/scan_sources.rs
6939 views
1
use std::fmt::{Debug, Formatter};
2
use std::fs::File;
3
use std::sync::Arc;
4
5
use polars_core::error::{PolarsResult, feature_gated};
6
use polars_error::polars_err;
7
use polars_io::cloud::CloudOptions;
8
#[cfg(feature = "cloud")]
9
use polars_io::file_cache::FileCacheEntry;
10
#[cfg(feature = "cloud")]
11
use polars_io::utils::byte_source::{DynByteSource, DynByteSourceBuilder};
12
use polars_io::{expand_paths, expand_paths_hive, expanded_from_single_directory};
13
use polars_utils::mmap::MemSlice;
14
use polars_utils::pl_str::PlSmallStr;
15
use polars_utils::plpath::{PlPath, PlPathRef};
16
17
use super::UnifiedScanArgs;
18
19
/// Set of sources to scan from
20
///
21
/// This can either be a list of paths to files, opened files or in-memory buffers. Mixing of
22
/// buffers is not currently possible.
23
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
24
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
25
#[derive(Clone)]
26
pub enum ScanSources {
27
Paths(Arc<[PlPath]>),
28
29
#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]
30
Files(Arc<[File]>),
31
#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]
32
Buffers(Arc<[MemSlice]>),
33
}
34
35
impl Debug for ScanSources {
36
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
37
match self {
38
Self::Paths(p) => write!(f, "paths: {:?}", p.as_ref()),
39
Self::Files(p) => write!(f, "files: {} files", p.len()),
40
Self::Buffers(b) => write!(f, "buffers: {} in-memory-buffers", b.len()),
41
}
42
}
43
}
44
45
/// A reference to a single item in [`ScanSources`]
46
#[derive(Debug, Clone, Copy)]
47
pub enum ScanSourceRef<'a> {
48
Path(PlPathRef<'a>),
49
File(&'a File),
50
Buffer(&'a MemSlice),
51
}
52
53
/// A single source to scan from
54
#[derive(Debug, Clone)]
55
pub enum ScanSource {
56
Path(PlPath),
57
File(Arc<File>),
58
Buffer(MemSlice),
59
}
60
61
impl ScanSource {
62
pub fn from_sources(sources: ScanSources) -> Result<Self, ScanSources> {
63
if sources.len() == 1 {
64
match sources {
65
ScanSources::Paths(ps) => Ok(Self::Path(ps.as_ref()[0].clone())),
66
ScanSources::Files(fs) => {
67
assert_eq!(fs.len(), 1);
68
let ptr: *const File = Arc::into_raw(fs) as *const File;
69
// SAFETY: A [T] with length 1 can be interpreted as T
70
let f: Arc<File> = unsafe { Arc::from_raw(ptr) };
71
72
Ok(Self::File(f))
73
},
74
ScanSources::Buffers(bs) => Ok(Self::Buffer(bs.as_ref()[0].clone())),
75
}
76
} else {
77
Err(sources)
78
}
79
}
80
81
pub fn into_sources(self) -> ScanSources {
82
match self {
83
ScanSource::Path(p) => ScanSources::Paths([p].into()),
84
ScanSource::File(f) => {
85
let ptr: *const [File] = std::ptr::slice_from_raw_parts(Arc::into_raw(f), 1);
86
// SAFETY: A T can be interpreted as [T] with length 1.
87
let fs: Arc<[File]> = unsafe { Arc::from_raw(ptr) };
88
ScanSources::Files(fs)
89
},
90
ScanSource::Buffer(m) => ScanSources::Buffers([m].into()),
91
}
92
}
93
94
pub fn as_scan_source_ref(&self) -> ScanSourceRef<'_> {
95
match self {
96
ScanSource::Path(path) => ScanSourceRef::Path(path.as_ref()),
97
ScanSource::File(file) => ScanSourceRef::File(file.as_ref()),
98
ScanSource::Buffer(mem_slice) => ScanSourceRef::Buffer(mem_slice),
99
}
100
}
101
102
pub fn run_async(&self) -> bool {
103
self.as_scan_source_ref().run_async()
104
}
105
106
pub fn is_cloud_url(&self) -> bool {
107
if let ScanSource::Path(path) = self {
108
path.is_cloud_url()
109
} else {
110
false
111
}
112
}
113
}
114
115
/// An iterator for [`ScanSources`]
116
pub struct ScanSourceIter<'a> {
117
sources: &'a ScanSources,
118
offset: usize,
119
}
120
121
impl Default for ScanSources {
122
fn default() -> Self {
123
// We need to use `Paths` here to avoid erroring when doing hive-partitioned scans of empty
124
// file lists.
125
Self::Paths(Arc::default())
126
}
127
}
128
129
impl std::hash::Hash for ScanSources {
130
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
131
std::mem::discriminant(self).hash(state);
132
133
// @NOTE: This is a bit crazy
134
//
135
// We don't really want to hash the file descriptors or the whole buffers so for now we
136
// just settle with the fact that the memory behind Arc's does not really move. Therefore,
137
// we can just hash the pointer.
138
match self {
139
Self::Paths(paths) => paths.hash(state),
140
Self::Files(files) => files.as_ptr().hash(state),
141
Self::Buffers(buffers) => buffers.as_ptr().hash(state),
142
}
143
}
144
}
145
146
impl PartialEq for ScanSources {
147
fn eq(&self, other: &Self) -> bool {
148
match (self, other) {
149
(ScanSources::Paths(l), ScanSources::Paths(r)) => l == r,
150
(ScanSources::Files(l), ScanSources::Files(r)) => std::ptr::eq(l.as_ptr(), r.as_ptr()),
151
(ScanSources::Buffers(l), ScanSources::Buffers(r)) => {
152
std::ptr::eq(l.as_ptr(), r.as_ptr())
153
},
154
_ => false,
155
}
156
}
157
}
158
159
impl Eq for ScanSources {}
160
161
impl ScanSources {
162
pub fn expand_paths(
163
&self,
164
scan_args: &UnifiedScanArgs,
165
#[allow(unused_variables)] cloud_options: Option<&CloudOptions>,
166
) -> PolarsResult<Self> {
167
match self {
168
Self::Paths(paths) => Ok(Self::Paths(expand_paths(
169
paths,
170
scan_args.glob,
171
cloud_options,
172
)?)),
173
v => Ok(v.clone()),
174
}
175
}
176
177
/// This will update `scan_args.hive_options.enabled` to `true` if the existing value is `None`
178
/// and the paths are expanded from a single directory. Otherwise the existing value is maintained.
179
#[cfg(any(feature = "ipc", feature = "parquet"))]
180
pub fn expand_paths_with_hive_update(
181
&self,
182
scan_args: &mut UnifiedScanArgs,
183
#[allow(unused_variables)] cloud_options: Option<&CloudOptions>,
184
) -> PolarsResult<Self> {
185
match self {
186
Self::Paths(paths) => {
187
let (expanded_paths, hive_start_idx) = expand_paths_hive(
188
paths,
189
scan_args.glob,
190
cloud_options,
191
scan_args.hive_options.enabled.unwrap_or(false),
192
)?;
193
194
if scan_args.hive_options.enabled.is_none()
195
&& expanded_from_single_directory(paths, expanded_paths.as_ref())
196
{
197
scan_args.hive_options.enabled = Some(true);
198
}
199
scan_args.hive_options.hive_start_idx = hive_start_idx;
200
201
Ok(Self::Paths(expanded_paths))
202
},
203
v => Ok(v.clone()),
204
}
205
}
206
207
pub fn iter(&self) -> ScanSourceIter<'_> {
208
ScanSourceIter {
209
sources: self,
210
offset: 0,
211
}
212
}
213
214
/// Are the sources all paths?
215
pub fn is_paths(&self) -> bool {
216
matches!(self, Self::Paths(_))
217
}
218
219
/// Try cast the scan sources to [`ScanSources::Paths`]
220
pub fn as_paths(&self) -> Option<&[PlPath]> {
221
match self {
222
Self::Paths(paths) => Some(paths.as_ref()),
223
Self::Files(_) | Self::Buffers(_) => None,
224
}
225
}
226
227
/// Try cast the scan sources to [`ScanSources::Paths`] with a clone
228
pub fn into_paths(&self) -> Option<Arc<[PlPath]>> {
229
match self {
230
Self::Paths(paths) => Some(paths.clone()),
231
Self::Files(_) | Self::Buffers(_) => None,
232
}
233
}
234
235
/// Try get the first path in the scan sources
236
pub fn first_path(&self) -> Option<PlPathRef<'_>> {
237
match self {
238
Self::Paths(paths) => paths.first().map(|p| p.as_ref()),
239
Self::Files(_) | Self::Buffers(_) => None,
240
}
241
}
242
243
/// Is the first path a cloud URL?
244
pub fn is_cloud_url(&self) -> bool {
245
self.first_path().is_some_and(|path| path.is_cloud_url())
246
}
247
248
pub fn len(&self) -> usize {
249
match self {
250
Self::Paths(s) => s.len(),
251
Self::Files(s) => s.len(),
252
Self::Buffers(s) => s.len(),
253
}
254
}
255
256
pub fn is_empty(&self) -> bool {
257
self.len() == 0
258
}
259
260
pub fn first(&self) -> Option<ScanSourceRef<'_>> {
261
self.get(0)
262
}
263
264
pub fn first_or_empty_expand_err(
265
&self,
266
failed_message: &'static str,
267
sources_before_expansion: &ScanSources,
268
glob: bool,
269
hint: &'static str,
270
) -> PolarsResult<ScanSourceRef<'_>> {
271
let hint_padding = if hint.is_empty() { "" } else { " Hint: " };
272
273
self.first().ok_or_else(|| match self {
274
Self::Paths(_) if !sources_before_expansion.is_empty() => polars_err!(
275
ComputeError:
276
"{}: expanded paths were empty \
277
(path expansion input: '{:?}', glob: {}).{}{}",
278
failed_message, sources_before_expansion, glob, hint_padding, hint
279
),
280
_ => polars_err!(
281
ComputeError:
282
"{}: empty input: {:?}.{}{}",
283
failed_message, self, hint_padding, hint
284
),
285
})
286
}
287
288
/// Turn the [`ScanSources`] into some kind of identifier
289
pub fn id(&self) -> PlSmallStr {
290
if self.is_empty() {
291
return PlSmallStr::from_static("EMPTY");
292
}
293
294
match self {
295
Self::Paths(paths) => PlSmallStr::from_str(paths.first().unwrap().to_str()),
296
Self::Files(_) => PlSmallStr::from_static("OPEN_FILES"),
297
Self::Buffers(_) => PlSmallStr::from_static("IN_MEMORY"),
298
}
299
}
300
301
/// Get the scan source at specific address
302
pub fn get(&self, idx: usize) -> Option<ScanSourceRef<'_>> {
303
match self {
304
Self::Paths(paths) => paths.get(idx).map(|p| ScanSourceRef::Path(p.as_ref())),
305
Self::Files(files) => files.get(idx).map(ScanSourceRef::File),
306
Self::Buffers(buffers) => buffers.get(idx).map(ScanSourceRef::Buffer),
307
}
308
}
309
310
/// Get the scan source at specific address
311
///
312
/// # Panics
313
///
314
/// If the `idx` is out of range.
315
#[track_caller]
316
pub fn at(&self, idx: usize) -> ScanSourceRef<'_> {
317
self.get(idx).unwrap()
318
}
319
}
320
321
impl ScanSourceRef<'_> {
322
/// Get the name for `include_paths`
323
pub fn to_include_path_name(&self) -> &str {
324
match self {
325
Self::Path(path) => path.to_str(),
326
Self::File(_) => "open-file",
327
Self::Buffer(_) => "in-mem",
328
}
329
}
330
331
// @TODO: I would like to remove this function eventually.
332
pub fn into_owned(&self) -> PolarsResult<ScanSource> {
333
Ok(match self {
334
ScanSourceRef::Path(path) => ScanSource::Path((*path).into_owned()),
335
ScanSourceRef::File(file) => {
336
if let Ok(file) = file.try_clone() {
337
ScanSource::File(Arc::new(file))
338
} else {
339
ScanSource::Buffer(self.to_memslice()?)
340
}
341
},
342
ScanSourceRef::Buffer(buffer) => ScanSource::Buffer((*buffer).clone()),
343
})
344
}
345
346
pub fn as_path(&self) -> Option<PlPathRef<'_>> {
347
match self {
348
Self::Path(path) => Some(*path),
349
Self::File(_) | Self::Buffer(_) => None,
350
}
351
}
352
353
pub fn is_cloud_url(&self) -> bool {
354
self.as_path().is_some_and(|x| x.is_cloud_url())
355
}
356
357
/// Turn the scan source into a memory slice
358
pub fn to_memslice(&self) -> PolarsResult<MemSlice> {
359
self.to_memslice_possibly_async(false, None, 0)
360
}
361
362
#[allow(clippy::wrong_self_convention)]
363
#[cfg(feature = "cloud")]
364
fn to_memslice_async<F: Fn(Arc<FileCacheEntry>) -> PolarsResult<std::fs::File>>(
365
&self,
366
open_cache_entry: F,
367
run_async: bool,
368
) -> PolarsResult<MemSlice> {
369
match self {
370
ScanSourceRef::Path(path) => {
371
let file = if run_async {
372
open_cache_entry(polars_io::file_cache::FILE_CACHE.get_entry(*path).unwrap())?
373
} else {
374
polars_utils::open_file(path.as_local_path().unwrap())?
375
};
376
377
MemSlice::from_file(&file)
378
},
379
ScanSourceRef::File(file) => MemSlice::from_file(file),
380
ScanSourceRef::Buffer(buff) => Ok((*buff).clone()),
381
}
382
}
383
384
#[cfg(feature = "cloud")]
385
pub fn to_memslice_async_assume_latest(&self, run_async: bool) -> PolarsResult<MemSlice> {
386
self.to_memslice_async(|entry| entry.try_open_assume_latest(), run_async)
387
}
388
389
#[cfg(feature = "cloud")]
390
pub fn to_memslice_async_check_latest(&self, run_async: bool) -> PolarsResult<MemSlice> {
391
self.to_memslice_async(|entry| entry.try_open_check_latest(), run_async)
392
}
393
394
#[cfg(not(feature = "cloud"))]
395
#[allow(clippy::wrong_self_convention)]
396
fn to_memslice_async(&self, run_async: bool) -> PolarsResult<MemSlice> {
397
match self {
398
ScanSourceRef::Path(path) => {
399
let file = polars_utils::open_file(path.as_local_path().unwrap())?;
400
MemSlice::from_file(&file)
401
},
402
ScanSourceRef::File(file) => MemSlice::from_file(file),
403
ScanSourceRef::Buffer(buff) => Ok((*buff).clone()),
404
}
405
}
406
407
#[cfg(not(feature = "cloud"))]
408
pub fn to_memslice_async_assume_latest(&self, run_async: bool) -> PolarsResult<MemSlice> {
409
self.to_memslice_async(run_async)
410
}
411
412
#[cfg(not(feature = "cloud"))]
413
pub fn to_memslice_async_check_latest(&self, run_async: bool) -> PolarsResult<MemSlice> {
414
self.to_memslice_async(run_async)
415
}
416
417
pub fn to_memslice_possibly_async(
418
&self,
419
run_async: bool,
420
#[cfg(feature = "cloud")] cache_entries: Option<
421
&Vec<Arc<polars_io::file_cache::FileCacheEntry>>,
422
>,
423
#[cfg(not(feature = "cloud"))] cache_entries: Option<&()>,
424
index: usize,
425
) -> PolarsResult<MemSlice> {
426
match self {
427
Self::Path(path) => {
428
let file = if run_async {
429
feature_gated!("cloud", {
430
cache_entries.unwrap()[index].try_open_check_latest()?
431
})
432
} else {
433
polars_utils::open_file(path.as_local_path().unwrap())?
434
};
435
436
MemSlice::from_file(&file)
437
},
438
Self::File(file) => MemSlice::from_file(file),
439
Self::Buffer(buff) => Ok((*buff).clone()),
440
}
441
}
442
443
#[cfg(feature = "cloud")]
444
pub async fn to_dyn_byte_source(
445
&self,
446
builder: &DynByteSourceBuilder,
447
cloud_options: Option<&CloudOptions>,
448
) -> PolarsResult<DynByteSource> {
449
match self {
450
Self::Path(path) => {
451
builder
452
.try_build_from_path(path.to_str(), cloud_options)
453
.await
454
},
455
Self::File(file) => Ok(DynByteSource::from(MemSlice::from_file(file)?)),
456
Self::Buffer(buff) => Ok(DynByteSource::from((*buff).clone())),
457
}
458
}
459
460
pub fn run_async(&self) -> bool {
461
matches!(self, Self::Path(p) if p.is_cloud_url() || polars_core::config::force_async())
462
}
463
}
464
465
impl<'a> Iterator for ScanSourceIter<'a> {
466
type Item = ScanSourceRef<'a>;
467
468
fn next(&mut self) -> Option<Self::Item> {
469
let item = match self.sources {
470
ScanSources::Paths(paths) => ScanSourceRef::Path(paths.get(self.offset)?.as_ref()),
471
ScanSources::Files(files) => ScanSourceRef::File(files.get(self.offset)?),
472
ScanSources::Buffers(buffers) => ScanSourceRef::Buffer(buffers.get(self.offset)?),
473
};
474
475
self.offset += 1;
476
Some(item)
477
}
478
479
fn size_hint(&self) -> (usize, Option<usize>) {
480
let len = self.sources.len() - self.offset;
481
(len, Some(len))
482
}
483
}
484
485
impl ExactSizeIterator for ScanSourceIter<'_> {}
486
487