Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/dsl/scan_sources.rs
8446 views
1
use std::fmt::{Debug, Formatter};
2
use std::fs::File;
3
use std::sync::Arc;
4
5
use polars_buffer::Buffer;
6
use polars_core::error::{PolarsResult, feature_gated};
7
use polars_error::polars_err;
8
use polars_io::cloud::CloudOptions;
9
#[cfg(feature = "cloud")]
10
use polars_io::file_cache::FileCacheEntry;
11
use polars_io::metrics::IOMetrics;
12
use polars_io::utils::byte_source::{DynByteSource, DynByteSourceBuilder};
13
use polars_io::{expand_paths, expand_paths_hive, expanded_from_single_directory};
14
use polars_utils::mmap::MMapSemaphore;
15
use polars_utils::pl_path::PlRefPath;
16
use polars_utils::pl_str::PlSmallStr;
17
#[cfg(feature = "serde")]
18
use serde::{Deserialize, Deserializer, Serialize, Serializer};
19
20
use super::UnifiedScanArgs;
21
22
#[cfg(feature = "serde")]
23
fn serialize_paths<S: Serializer>(paths: &Buffer<PlRefPath>, s: S) -> Result<S::Ok, S::Error> {
24
paths.as_slice().serialize(s)
25
}
26
27
#[cfg(feature = "serde")]
28
fn deserialize_paths<'de, D: Deserializer<'de>>(d: D) -> Result<Buffer<PlRefPath>, D::Error> {
29
let v: Vec<PlRefPath> = Deserialize::deserialize(d)?;
30
Ok(Buffer::from(v))
31
}
32
33
/// Set of sources to scan from
34
///
35
/// This can either be a list of paths to files, opened files or in-memory buffers. Mixing of
36
/// buffers is not currently possible.
37
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
38
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
39
#[derive(Clone)]
40
pub enum ScanSources {
41
#[cfg_attr(
42
feature = "serde",
43
serde(
44
serialize_with = "serialize_paths",
45
deserialize_with = "deserialize_paths"
46
)
47
)]
48
#[cfg_attr(feature = "dsl-schema", schemars(with = "Vec<PlRefPath>"))]
49
Paths(Buffer<PlRefPath>),
50
#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]
51
Files(Arc<[File]>),
52
#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]
53
Buffers(Arc<[Buffer<u8>]>),
54
}
55
56
impl Debug for ScanSources {
57
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
58
match self {
59
Self::Paths(p) => write!(f, "paths: {:?}", p.as_ref()),
60
Self::Files(p) => write!(f, "files: {} files", p.len()),
61
Self::Buffers(b) => write!(f, "buffers: {} in-memory-buffers", b.len()),
62
}
63
}
64
}
65
66
/// A reference to a single item in [`ScanSources`]
67
#[derive(Debug, Clone, Copy)]
68
pub enum ScanSourceRef<'a> {
69
Path(&'a PlRefPath),
70
File(&'a File),
71
Buffer(&'a Buffer<u8>),
72
}
73
74
/// A single source to scan from
75
#[derive(Debug, Clone)]
76
pub enum ScanSource {
77
Path(PlRefPath),
78
File(Arc<File>),
79
Buffer(Buffer<u8>),
80
}
81
82
impl ScanSource {
83
pub fn from_sources(sources: ScanSources) -> Result<Self, ScanSources> {
84
if sources.len() == 1 {
85
match sources {
86
ScanSources::Paths(ps) => Ok(Self::Path(ps.as_ref()[0].clone())),
87
ScanSources::Files(fs) => {
88
assert_eq!(fs.len(), 1);
89
let ptr: *const File = Arc::into_raw(fs) as *const File;
90
// SAFETY: A [T] with length 1 can be interpreted as T
91
let f: Arc<File> = unsafe { Arc::from_raw(ptr) };
92
93
Ok(Self::File(f))
94
},
95
ScanSources::Buffers(bs) => Ok(Self::Buffer(bs.as_ref()[0].clone())),
96
}
97
} else {
98
Err(sources)
99
}
100
}
101
102
pub fn into_sources(self) -> ScanSources {
103
match self {
104
ScanSource::Path(p) => ScanSources::Paths(Buffer::from_iter([p])),
105
ScanSource::File(f) => {
106
let ptr: *const [File] = std::ptr::slice_from_raw_parts(Arc::into_raw(f), 1);
107
// SAFETY: A T can be interpreted as [T] with length 1.
108
let fs: Arc<[File]> = unsafe { Arc::from_raw(ptr) };
109
ScanSources::Files(fs)
110
},
111
ScanSource::Buffer(m) => ScanSources::Buffers([m].into()),
112
}
113
}
114
115
pub fn as_scan_source_ref(&self) -> ScanSourceRef<'_> {
116
match self {
117
ScanSource::Path(path) => ScanSourceRef::Path(path),
118
ScanSource::File(file) => ScanSourceRef::File(file.as_ref()),
119
ScanSource::Buffer(mem_slice) => ScanSourceRef::Buffer(mem_slice),
120
}
121
}
122
123
pub fn run_async(&self) -> bool {
124
self.as_scan_source_ref().run_async()
125
}
126
127
pub fn is_cloud_url(&self) -> bool {
128
if let ScanSource::Path(path) = self {
129
path.has_scheme()
130
} else {
131
false
132
}
133
}
134
}
135
136
/// An iterator for [`ScanSources`]
137
pub struct ScanSourceIter<'a> {
138
sources: &'a ScanSources,
139
offset: usize,
140
}
141
142
impl Default for ScanSources {
143
fn default() -> Self {
144
// We need to use `Paths` here to avoid erroring when doing hive-partitioned scans of empty
145
// file lists.
146
Self::Paths(Buffer::new())
147
}
148
}
149
150
impl std::hash::Hash for ScanSources {
151
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
152
std::mem::discriminant(self).hash(state);
153
154
// @NOTE: This is a bit crazy
155
//
156
// We don't really want to hash the file descriptors or the whole buffers so for now we
157
// just settle with the fact that the memory behind Arc's does not really move. Therefore,
158
// we can just hash the pointer.
159
match self {
160
Self::Paths(paths) => paths.hash(state),
161
Self::Files(files) => files.as_ptr().hash(state),
162
Self::Buffers(buffers) => buffers.as_ptr().hash(state),
163
}
164
}
165
}
166
167
impl PartialEq for ScanSources {
168
fn eq(&self, other: &Self) -> bool {
169
match (self, other) {
170
(ScanSources::Paths(l), ScanSources::Paths(r)) => l == r,
171
(ScanSources::Files(l), ScanSources::Files(r)) => std::ptr::eq(l.as_ptr(), r.as_ptr()),
172
(ScanSources::Buffers(l), ScanSources::Buffers(r)) => {
173
std::ptr::eq(l.as_ptr(), r.as_ptr())
174
},
175
_ => false,
176
}
177
}
178
}
179
180
impl Eq for ScanSources {}
181
182
impl ScanSources {
183
pub async fn expand_paths(&self, scan_args: &mut UnifiedScanArgs) -> PolarsResult<Self> {
184
match self {
185
Self::Paths(paths) => Ok(Self::Paths(
186
expand_paths(
187
paths,
188
scan_args.glob,
189
scan_args.hidden_file_prefix.as_deref().unwrap_or_default(),
190
&mut scan_args.cloud_options,
191
)
192
.await?,
193
)),
194
v => Ok(v.clone()),
195
}
196
}
197
198
/// This will update `scan_args.hive_options.enabled` to `true` if the existing value is `None`
199
/// and the paths are expanded from a single directory. Otherwise the existing value is maintained.
200
#[cfg(any(feature = "ipc", feature = "parquet"))]
201
pub async fn expand_paths_with_hive_update(
202
&self,
203
scan_args: &mut UnifiedScanArgs,
204
) -> PolarsResult<Self> {
205
match self {
206
Self::Paths(paths) => {
207
let (expanded_paths, hive_start_idx) = expand_paths_hive(
208
paths,
209
scan_args.glob,
210
scan_args.hidden_file_prefix.as_deref().unwrap_or_default(),
211
&mut scan_args.cloud_options,
212
scan_args.hive_options.enabled.unwrap_or(false),
213
)
214
.await?;
215
216
if scan_args.hive_options.enabled.is_none()
217
&& expanded_from_single_directory(paths, expanded_paths.as_ref())
218
{
219
scan_args.hive_options.enabled = Some(true);
220
}
221
scan_args.hive_options.hive_start_idx = hive_start_idx;
222
223
Ok(Self::Paths(expanded_paths))
224
},
225
v => Ok(v.clone()),
226
}
227
}
228
229
pub fn iter(&self) -> ScanSourceIter<'_> {
230
ScanSourceIter {
231
sources: self,
232
offset: 0,
233
}
234
}
235
236
/// Are the sources all paths?
237
pub fn is_paths(&self) -> bool {
238
matches!(self, Self::Paths(_))
239
}
240
241
/// Try cast the scan sources to [`ScanSources::Paths`]
242
pub fn as_paths(&self) -> Option<&[PlRefPath]> {
243
match self {
244
Self::Paths(paths) => Some(paths.as_ref()),
245
Self::Files(_) | Self::Buffers(_) => None,
246
}
247
}
248
249
/// Try cast the scan sources to [`ScanSources::Paths`] with a clone
250
pub fn into_paths(&self) -> Option<Buffer<PlRefPath>> {
251
match self {
252
Self::Paths(paths) => Some(paths.clone()),
253
Self::Files(_) | Self::Buffers(_) => None,
254
}
255
}
256
257
/// Try get the first path in the scan sources
258
pub fn first_path(&self) -> Option<&PlRefPath> {
259
match self {
260
Self::Paths(paths) => paths.first(),
261
Self::Files(_) | Self::Buffers(_) => None,
262
}
263
}
264
265
/// Is the first path a cloud URL?
266
pub fn is_cloud_url(&self) -> bool {
267
self.first_path().is_some_and(|path| path.has_scheme())
268
}
269
270
pub fn len(&self) -> usize {
271
match self {
272
Self::Paths(s) => s.len(),
273
Self::Files(s) => s.len(),
274
Self::Buffers(s) => s.len(),
275
}
276
}
277
278
pub fn is_empty(&self) -> bool {
279
self.len() == 0
280
}
281
282
pub fn first(&self) -> Option<ScanSourceRef<'_>> {
283
self.get(0)
284
}
285
286
pub fn first_or_empty_expand_err(
287
&self,
288
failed_message: &'static str,
289
sources_before_expansion: &ScanSources,
290
glob: bool,
291
hint: &'static str,
292
) -> PolarsResult<ScanSourceRef<'_>> {
293
let hint_padding = if hint.is_empty() { "" } else { " Hint: " };
294
295
self.first().ok_or_else(|| match self {
296
Self::Paths(_) if !sources_before_expansion.is_empty() => polars_err!(
297
ComputeError:
298
"{}: expanded paths were empty \
299
(path expansion input: '{:?}', glob: {}).{}{}",
300
failed_message, sources_before_expansion, glob, hint_padding, hint
301
),
302
_ => polars_err!(
303
ComputeError:
304
"{}: empty input: {:?}.{}{}",
305
failed_message, self, hint_padding, hint
306
),
307
})
308
}
309
310
/// Turn the [`ScanSources`] into some kind of identifier
311
pub fn id(&self) -> PlSmallStr {
312
if self.is_empty() {
313
return PlSmallStr::from_static("EMPTY");
314
}
315
316
match self {
317
Self::Paths(paths) => PlSmallStr::from_str(paths.first().unwrap().as_str()),
318
Self::Files(_) => PlSmallStr::from_static("OPEN_FILES"),
319
Self::Buffers(_) => PlSmallStr::from_static("IN_MEMORY"),
320
}
321
}
322
323
/// Get the scan source at specific address
324
pub fn get(&self, idx: usize) -> Option<ScanSourceRef<'_>> {
325
match self {
326
Self::Paths(paths) => paths.get(idx).map(ScanSourceRef::Path),
327
Self::Files(files) => files.get(idx).map(ScanSourceRef::File),
328
Self::Buffers(buffers) => buffers.get(idx).map(ScanSourceRef::Buffer),
329
}
330
}
331
332
/// Get the scan source at specific address
333
///
334
/// # Panics
335
///
336
/// If the `idx` is out of range.
337
#[track_caller]
338
pub fn at(&self, idx: usize) -> ScanSourceRef<'_> {
339
self.get(idx).unwrap()
340
}
341
342
/// Returns `None` if `self` is a `::File` variant.
343
pub fn gather(&self, indices: impl Iterator<Item = usize>) -> Option<Self> {
344
Some(match self {
345
Self::Paths(paths) => Self::Paths(indices.map(|i| paths[i].clone()).collect()),
346
Self::Buffers(buffers) => Self::Buffers(indices.map(|i| buffers[i].clone()).collect()),
347
Self::Files(_) => return None,
348
})
349
}
350
}
351
352
impl ScanSourceRef<'_> {
353
/// Get the name for `include_paths`
354
pub fn to_include_path_name(&self) -> &str {
355
match self {
356
Self::Path(path) => path.as_str(),
357
Self::File(_) => "open-file",
358
Self::Buffer(_) => "in-mem",
359
}
360
}
361
362
// @TODO: I would like to remove this function eventually.
363
pub fn into_owned(&self) -> PolarsResult<ScanSource> {
364
Ok(match self {
365
ScanSourceRef::Path(path) => ScanSource::Path((*path).clone()),
366
ScanSourceRef::File(file) => {
367
if let Ok(file) = file.try_clone() {
368
ScanSource::File(Arc::new(file))
369
} else {
370
ScanSource::Buffer(self.to_memslice()?)
371
}
372
},
373
ScanSourceRef::Buffer(buffer) => ScanSource::Buffer((*buffer).clone()),
374
})
375
}
376
377
pub fn as_path(&self) -> Option<&PlRefPath> {
378
match self {
379
Self::Path(path) => Some(path),
380
Self::File(_) | Self::Buffer(_) => None,
381
}
382
}
383
384
pub fn is_cloud_url(&self) -> bool {
385
self.as_path().is_some_and(|x| x.has_scheme())
386
}
387
388
/// Turn the scan source into a memory slice
389
pub fn to_memslice(&self) -> PolarsResult<Buffer<u8>> {
390
self.to_buffer_possibly_async(false, None, 0)
391
}
392
393
#[allow(clippy::wrong_self_convention)]
394
#[cfg(feature = "cloud")]
395
fn to_buffer_async<F: Fn(Arc<FileCacheEntry>) -> PolarsResult<std::fs::File>>(
396
&self,
397
open_cache_entry: F,
398
run_async: bool,
399
) -> PolarsResult<Buffer<u8>> {
400
match self {
401
ScanSourceRef::Path(path) => {
402
let file = if run_async {
403
open_cache_entry(
404
polars_io::file_cache::FILE_CACHE
405
.get_entry((*path).clone())
406
.unwrap(),
407
)?
408
} else {
409
polars_utils::open_file(path.as_std_path())?
410
};
411
412
Ok(Buffer::from_owner(MMapSemaphore::new_from_file(&file)?))
413
},
414
ScanSourceRef::File(file) => {
415
Ok(Buffer::from_owner(MMapSemaphore::new_from_file(file)?))
416
},
417
ScanSourceRef::Buffer(buff) => Ok((*buff).clone()),
418
}
419
}
420
421
#[cfg(feature = "cloud")]
422
pub fn to_buffer_async_assume_latest(&self, run_async: bool) -> PolarsResult<Buffer<u8>> {
423
self.to_buffer_async(|entry| entry.try_open_assume_latest(), run_async)
424
}
425
426
#[cfg(feature = "cloud")]
427
pub fn to_buffer_async_check_latest(&self, run_async: bool) -> PolarsResult<Buffer<u8>> {
428
self.to_buffer_async(|entry| entry.try_open_check_latest(), run_async)
429
}
430
431
#[cfg(not(feature = "cloud"))]
432
#[allow(clippy::wrong_self_convention)]
433
fn to_buffer_async(&self, run_async: bool) -> PolarsResult<Buffer<u8>> {
434
match self {
435
ScanSourceRef::Path(path) => {
436
let file = polars_utils::open_file(path.as_std_path())?;
437
Ok(Buffer::from_owner(MMapSemaphore::new_from_file(&file)?))
438
},
439
ScanSourceRef::File(file) => {
440
Ok(Buffer::from_owner(MMapSemaphore::new_from_file(file)?))
441
},
442
ScanSourceRef::Buffer(buff) => Ok((*buff).clone()),
443
}
444
}
445
446
#[cfg(not(feature = "cloud"))]
447
pub fn to_buffer_async_assume_latest(&self, run_async: bool) -> PolarsResult<Buffer<u8>> {
448
self.to_buffer_async(run_async)
449
}
450
451
#[cfg(not(feature = "cloud"))]
452
pub fn to_buffer_async_check_latest(&self, run_async: bool) -> PolarsResult<Buffer<u8>> {
453
self.to_buffer_async(run_async)
454
}
455
456
pub fn to_buffer_possibly_async(
457
&self,
458
run_async: bool,
459
#[cfg(feature = "cloud")] cache_entries: Option<
460
&Vec<Arc<polars_io::file_cache::FileCacheEntry>>,
461
>,
462
#[cfg(not(feature = "cloud"))] cache_entries: Option<&()>,
463
index: usize,
464
) -> PolarsResult<Buffer<u8>> {
465
match self {
466
Self::Path(path) => {
467
let file = if run_async {
468
feature_gated!("cloud", {
469
cache_entries.unwrap()[index].try_open_check_latest()?
470
})
471
} else {
472
polars_utils::open_file(path.as_std_path())?
473
};
474
475
Ok(Buffer::from_owner(MMapSemaphore::new_from_file(&file)?))
476
},
477
Self::File(file) => Ok(Buffer::from_owner(MMapSemaphore::new_from_file(file)?)),
478
Self::Buffer(buff) => Ok((*buff).clone()),
479
}
480
}
481
482
pub async fn to_dyn_byte_source(
483
&self,
484
builder: &DynByteSourceBuilder,
485
cloud_options: Option<&CloudOptions>,
486
io_metrics: Option<Arc<IOMetrics>>,
487
) -> PolarsResult<DynByteSource> {
488
match self {
489
Self::Path(path) => {
490
builder
491
.try_build_from_path((*path).clone(), cloud_options, io_metrics)
492
.await
493
},
494
Self::File(file) => Ok(DynByteSource::from(Buffer::from_owner(
495
MMapSemaphore::new_from_file(file)?,
496
))),
497
Self::Buffer(buff) => Ok(DynByteSource::from((*buff).clone())),
498
}
499
}
500
501
pub fn run_async(&self) -> bool {
502
matches!(self, Self::Path(p) if p.has_scheme() || polars_config::config().force_async())
503
}
504
}
505
506
impl<'a> Iterator for ScanSourceIter<'a> {
507
type Item = ScanSourceRef<'a>;
508
509
fn next(&mut self) -> Option<Self::Item> {
510
let item = match self.sources {
511
ScanSources::Paths(paths) => ScanSourceRef::Path(paths.get(self.offset)?),
512
ScanSources::Files(files) => ScanSourceRef::File(files.get(self.offset)?),
513
ScanSources::Buffers(buffers) => ScanSourceRef::Buffer(buffers.get(self.offset)?),
514
};
515
516
self.offset += 1;
517
Some(item)
518
}
519
520
fn size_hint(&self) -> (usize, Option<usize>) {
521
let len = self.sources.len() - self.offset;
522
(len, Some(len))
523
}
524
}
525
526
impl ExactSizeIterator for ScanSourceIter<'_> {}
527
528