Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/dsl/file_scan/mod.rs
6940 views
1
use std::hash::Hash;
2
use std::sync::Mutex;
3
4
use deletion::DeletionFilesList;
5
use polars_core::schema::iceberg::IcebergSchemaRef;
6
use polars_core::utils::get_numeric_upcast_supertype_lossless;
7
use polars_io::cloud::CloudOptions;
8
#[cfg(feature = "csv")]
9
use polars_io::csv::read::CsvReadOptions;
10
#[cfg(feature = "ipc")]
11
use polars_io::ipc::IpcScanOptions;
12
#[cfg(feature = "parquet")]
13
use polars_io::parquet::metadata::FileMetadataRef;
14
#[cfg(feature = "parquet")]
15
use polars_io::parquet::read::ParquetOptions;
16
use polars_io::{HiveOptions, RowIndex};
17
use polars_utils::slice_enum::Slice;
18
#[cfg(feature = "serde")]
19
use serde::{Deserialize, Serialize};
20
use strum_macros::IntoStaticStr;
21
22
use super::*;
23
use crate::dsl::default_values::DefaultFieldValues;
24
pub mod default_values;
25
pub mod deletion;
26
27
#[cfg(feature = "python")]
28
pub mod python_dataset;
29
#[cfg(feature = "python")]
30
pub use python_dataset::{DATASET_PROVIDER_VTABLE, PythonDatasetProviderVTable};
31
32
bitflags::bitflags! {
33
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34
pub struct ScanFlags : u32 {
35
const SPECIALIZED_PREDICATE_FILTER = 0x01;
36
}
37
}
38
39
#[derive(Clone, Debug, IntoStaticStr)]
40
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
41
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
42
// TODO: Arc<> some of the options and the cloud options.
43
pub enum FileScanDsl {
44
#[cfg(feature = "csv")]
45
Csv { options: CsvReadOptions },
46
47
#[cfg(feature = "json")]
48
NDJson { options: NDJsonReadOptions },
49
50
#[cfg(feature = "parquet")]
51
Parquet { options: ParquetOptions },
52
53
#[cfg(feature = "ipc")]
54
Ipc { options: IpcScanOptions },
55
56
#[cfg(feature = "python")]
57
PythonDataset {
58
dataset_object: Arc<python_dataset::PythonDatasetProvider>,
59
},
60
61
#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]
62
Anonymous {
63
options: Arc<AnonymousScanOptions>,
64
function: Arc<dyn AnonymousScan>,
65
file_info: FileInfo,
66
},
67
}
68
69
#[derive(Clone, Debug, IntoStaticStr)]
70
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
71
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
72
// TODO: Arc<> some of the options and the cloud options.
73
pub enum FileScanIR {
74
#[cfg(feature = "csv")]
75
Csv { options: CsvReadOptions },
76
77
#[cfg(feature = "json")]
78
NDJson { options: NDJsonReadOptions },
79
80
#[cfg(feature = "parquet")]
81
Parquet {
82
options: ParquetOptions,
83
#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]
84
metadata: Option<FileMetadataRef>,
85
},
86
87
#[cfg(feature = "ipc")]
88
Ipc {
89
options: IpcScanOptions,
90
#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]
91
metadata: Option<Arc<arrow::io::ipc::read::FileMetadata>>,
92
},
93
94
#[cfg(feature = "python")]
95
PythonDataset {
96
dataset_object: Arc<python_dataset::PythonDatasetProvider>,
97
cached_ir: Arc<Mutex<Option<ExpandedDataset>>>,
98
},
99
100
#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]
101
Anonymous {
102
options: Arc<AnonymousScanOptions>,
103
function: Arc<dyn AnonymousScan>,
104
},
105
}
106
107
impl FileScanIR {
108
pub fn flags(&self) -> ScanFlags {
109
match self {
110
#[cfg(feature = "csv")]
111
Self::Csv { .. } => ScanFlags::empty(),
112
#[cfg(feature = "ipc")]
113
Self::Ipc { .. } => ScanFlags::empty(),
114
#[cfg(feature = "parquet")]
115
Self::Parquet { .. } => ScanFlags::SPECIALIZED_PREDICATE_FILTER,
116
#[cfg(feature = "json")]
117
Self::NDJson { .. } => ScanFlags::empty(),
118
#[allow(unreachable_patterns)]
119
_ => ScanFlags::empty(),
120
}
121
}
122
123
pub(crate) fn sort_projection(&self, _has_row_index: bool) -> bool {
124
match self {
125
#[cfg(feature = "csv")]
126
Self::Csv { .. } => true,
127
#[cfg(feature = "ipc")]
128
Self::Ipc { .. } => _has_row_index,
129
#[cfg(feature = "parquet")]
130
Self::Parquet { .. } => false,
131
#[allow(unreachable_patterns)]
132
_ => false,
133
}
134
}
135
136
pub fn streamable(&self) -> bool {
137
match self {
138
#[cfg(feature = "csv")]
139
Self::Csv { .. } => true,
140
#[cfg(feature = "ipc")]
141
Self::Ipc { .. } => false,
142
#[cfg(feature = "parquet")]
143
Self::Parquet { .. } => true,
144
#[cfg(feature = "json")]
145
Self::NDJson { .. } => false,
146
#[allow(unreachable_patterns)]
147
_ => false,
148
}
149
}
150
}
151
152
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Hash)]
153
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
154
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
155
pub enum MissingColumnsPolicy {
156
#[default]
157
Raise,
158
/// Inserts full-NULL columns for the missing ones.
159
Insert,
160
}
161
162
/// Used by scans.
163
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
164
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
165
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
166
pub struct CastColumnsPolicy {
167
/// Allow casting when target dtype is lossless supertype
168
pub integer_upcast: bool,
169
170
/// Allow Float32 -> Float64
171
pub float_upcast: bool,
172
/// Allow Float64 -> Float32
173
pub float_downcast: bool,
174
175
/// Allow datetime[ns] to be casted to any lower precision. Important for
176
/// being able to read datasets written by spark.
177
pub datetime_nanoseconds_downcast: bool,
178
/// Allow datetime[us] to datetime[ms]
179
pub datetime_microseconds_downcast: bool,
180
181
/// Allow casting to change time units.
182
pub datetime_convert_timezone: bool,
183
184
/// DataType::Null to any
185
pub null_upcast: bool,
186
187
pub missing_struct_fields: MissingColumnsPolicy,
188
pub extra_struct_fields: ExtraColumnsPolicy,
189
}
190
191
impl CastColumnsPolicy {
192
/// Configuration variant that defaults to raising on mismatch.
193
pub const ERROR_ON_MISMATCH: Self = Self {
194
integer_upcast: false,
195
float_upcast: false,
196
float_downcast: false,
197
datetime_nanoseconds_downcast: false,
198
datetime_microseconds_downcast: false,
199
datetime_convert_timezone: false,
200
null_upcast: true,
201
missing_struct_fields: MissingColumnsPolicy::Raise,
202
extra_struct_fields: ExtraColumnsPolicy::Raise,
203
};
204
}
205
206
impl Default for CastColumnsPolicy {
207
fn default() -> Self {
208
Self::ERROR_ON_MISMATCH
209
}
210
}
211
212
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Hash)]
213
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
214
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
215
pub enum ExtraColumnsPolicy {
216
/// Error if there are extra columns outside the target schema.
217
#[default]
218
Raise,
219
Ignore,
220
}
221
222
#[derive(Debug, Clone, Eq, Hash, PartialEq)]
223
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
224
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
225
pub enum ColumnMapping {
226
Iceberg(IcebergSchemaRef),
227
}
228
229
/// Scan arguments shared across different scan types.
230
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
231
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
232
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
233
pub struct UnifiedScanArgs {
234
/// User-provided schema of the file. Will be inferred during IR conversion
235
/// if None.
236
pub schema: Option<SchemaRef>,
237
pub cloud_options: Option<CloudOptions>,
238
pub hive_options: HiveOptions,
239
240
pub rechunk: bool,
241
pub cache: bool,
242
pub glob: bool,
243
244
pub projection: Option<Arc<[PlSmallStr]>>,
245
pub column_mapping: Option<ColumnMapping>,
246
/// Default values for missing columns.
247
pub default_values: Option<DefaultFieldValues>,
248
pub row_index: Option<RowIndex>,
249
/// Slice applied before predicates
250
pub pre_slice: Option<Slice>,
251
252
pub cast_columns_policy: CastColumnsPolicy,
253
pub missing_columns_policy: MissingColumnsPolicy,
254
pub extra_columns_policy: ExtraColumnsPolicy,
255
pub include_file_paths: Option<PlSmallStr>,
256
257
pub deletion_files: Option<DeletionFilesList>,
258
}
259
260
impl Default for UnifiedScanArgs {
261
fn default() -> Self {
262
Self {
263
schema: None,
264
cloud_options: None,
265
hive_options: HiveOptions::new_enabled(),
266
rechunk: false,
267
cache: false,
268
glob: true,
269
projection: None,
270
column_mapping: None,
271
default_values: None,
272
row_index: None,
273
pre_slice: None,
274
cast_columns_policy: CastColumnsPolicy::default(),
275
missing_columns_policy: MissingColumnsPolicy::default(),
276
extra_columns_policy: ExtraColumnsPolicy::default(),
277
include_file_paths: None,
278
deletion_files: None,
279
}
280
}
281
}
282
283
/// Manual impls of Eq/Hash, as some fields are `Arc<T>` where T does not have Eq/Hash. For these
284
/// fields we compare the pointer addresses instead.
285
mod _file_scan_eq_hash {
286
use std::hash::{Hash, Hasher};
287
use std::sync::Arc;
288
289
use super::FileScanIR;
290
291
impl Eq for FileScanIR {}
292
293
impl Hash for FileScanIR {
294
fn hash<H: Hasher>(&self, state: &mut H) {
295
FileScanEqHashWrap::from(self).hash(state)
296
}
297
}
298
299
impl PartialEq for FileScanIR {
300
fn eq(&self, other: &Self) -> bool {
301
FileScanEqHashWrap::from(self) == FileScanEqHashWrap::from(other)
302
}
303
}
304
305
/// # Hash / Eq safety
306
/// * All usizes originate from `Arc<>`s, and the lifetime of this enum is bound to that of the
307
/// input ref.
308
#[derive(PartialEq, Hash)]
309
pub enum FileScanEqHashWrap<'a> {
310
#[cfg(feature = "csv")]
311
Csv {
312
options: &'a polars_io::csv::read::CsvReadOptions,
313
},
314
315
#[cfg(feature = "json")]
316
NDJson {
317
options: &'a crate::prelude::NDJsonReadOptions,
318
},
319
320
#[cfg(feature = "parquet")]
321
Parquet {
322
options: &'a polars_io::prelude::ParquetOptions,
323
metadata: Option<usize>,
324
},
325
326
#[cfg(feature = "ipc")]
327
Ipc {
328
options: &'a polars_io::prelude::IpcScanOptions,
329
metadata: Option<usize>,
330
},
331
332
#[cfg(feature = "python")]
333
PythonDataset {
334
dataset_object: usize,
335
cached_ir: usize,
336
},
337
338
Anonymous {
339
options: &'a crate::dsl::AnonymousScanOptions,
340
function: usize,
341
},
342
343
/// Variant to ensure the lifetime is used regardless of feature gate combination.
344
#[expect(unused)]
345
Phantom(&'a ()),
346
}
347
348
impl<'a> From<&'a FileScanIR> for FileScanEqHashWrap<'a> {
349
fn from(value: &'a FileScanIR) -> Self {
350
match value {
351
#[cfg(feature = "csv")]
352
FileScanIR::Csv { options } => FileScanEqHashWrap::Csv { options },
353
354
#[cfg(feature = "json")]
355
FileScanIR::NDJson { options } => FileScanEqHashWrap::NDJson { options },
356
357
#[cfg(feature = "parquet")]
358
FileScanIR::Parquet { options, metadata } => FileScanEqHashWrap::Parquet {
359
options,
360
metadata: metadata.as_ref().map(arc_as_ptr),
361
},
362
363
#[cfg(feature = "ipc")]
364
FileScanIR::Ipc { options, metadata } => FileScanEqHashWrap::Ipc {
365
options,
366
metadata: metadata.as_ref().map(arc_as_ptr),
367
},
368
369
#[cfg(feature = "python")]
370
FileScanIR::PythonDataset {
371
dataset_object,
372
cached_ir,
373
} => FileScanEqHashWrap::PythonDataset {
374
dataset_object: arc_as_ptr(dataset_object),
375
cached_ir: arc_as_ptr(cached_ir),
376
},
377
378
FileScanIR::Anonymous { options, function } => FileScanEqHashWrap::Anonymous {
379
options,
380
function: arc_as_ptr(function),
381
},
382
}
383
}
384
}
385
386
fn arc_as_ptr<T: ?Sized>(arc: &Arc<T>) -> usize {
387
Arc::as_ptr(arc) as *const () as usize
388
}
389
}
390
391
impl CastColumnsPolicy {
392
/// Checks if casting can be done to a dtype with a configured policy.
393
///
394
/// # Returns
395
/// * Ok(true): Cast needed to target dtype
396
/// * Ok(false): No casting needed
397
/// * Err(_): Forbidden by configuration, or incompatible types.
398
pub fn should_cast_column(
399
&self,
400
column_name: &str,
401
target_dtype: &DataType,
402
incoming_dtype: &DataType,
403
) -> PolarsResult<bool> {
404
let mismatch_err = |hint: &str| {
405
let hint_spacing = if hint.is_empty() { "" } else { ", " };
406
407
polars_bail!(
408
SchemaMismatch:
409
"data type mismatch for column {}: incoming: {:?} != target: {:?}{}{}",
410
column_name,
411
incoming_dtype,
412
target_dtype,
413
hint_spacing,
414
hint,
415
)
416
};
417
418
if incoming_dtype.is_null() && !target_dtype.is_null() {
419
return if self.null_upcast {
420
Ok(true)
421
} else {
422
mismatch_err("unimplemented: 'null-upcast' in scan cast options")
423
};
424
}
425
426
// We intercept the nested types first to prevent an expensive recursive eq - recursion
427
// is instead done manually through this function.
428
429
#[cfg(feature = "dtype-struct")]
430
if let DataType::Struct(target_fields) = target_dtype {
431
let DataType::Struct(incoming_fields) = incoming_dtype else {
432
return mismatch_err("");
433
};
434
435
let incoming_fields_schema = PlHashMap::from_iter(
436
incoming_fields
437
.iter()
438
.enumerate()
439
.map(|(i, fld)| (fld.name.as_str(), (i, &fld.dtype))),
440
);
441
442
let mut should_cast = incoming_fields.len() != target_fields.len();
443
444
for (target_idx, target_field) in target_fields.iter().enumerate() {
445
let Some((incoming_idx, incoming_field_dtype)) =
446
incoming_fields_schema.get(target_field.name().as_str())
447
else {
448
match self.missing_struct_fields {
449
MissingColumnsPolicy::Raise => {
450
return mismatch_err(&format!(
451
"encountered missing struct field: {}, \
452
hint: pass cast_options=pl.ScanCastOptions(missing_struct_fields='insert')",
453
target_field.name(),
454
));
455
},
456
MissingColumnsPolicy::Insert => {
457
should_cast = true;
458
// Must keep checking the rest of the fields.
459
continue;
460
},
461
};
462
};
463
464
// # Note
465
// We also need to cast if the struct fields are out of order. Currently there is
466
// no API parameter to control this - we always do this by default.
467
should_cast |= *incoming_idx != target_idx;
468
469
should_cast |= self.should_cast_column(
470
column_name,
471
&target_field.dtype,
472
incoming_field_dtype,
473
)?;
474
}
475
476
// Casting is also needed if there are extra fields, check them here.
477
478
// Take and re-use hashmap
479
let mut target_fields_schema = incoming_fields_schema;
480
target_fields_schema.clear();
481
482
target_fields_schema.extend(
483
target_fields
484
.iter()
485
.enumerate()
486
.map(|(i, fld)| (fld.name.as_str(), (i, &fld.dtype))),
487
);
488
489
for fld in incoming_fields {
490
if !target_fields_schema.contains_key(fld.name.as_str()) {
491
match self.extra_struct_fields {
492
ExtraColumnsPolicy::Ignore => {
493
should_cast = true;
494
break;
495
},
496
ExtraColumnsPolicy::Raise => {
497
return mismatch_err(&format!(
498
"encountered extra struct field: {}, \
499
hint: specify this field in the schema, or pass \
500
cast_options=pl.ScanCastOptions(extra_struct_fields='ignore')",
501
&fld.name,
502
));
503
},
504
}
505
}
506
}
507
508
return Ok(should_cast);
509
}
510
511
if let DataType::List(target_inner) = target_dtype {
512
let DataType::List(incoming_inner) = incoming_dtype else {
513
return mismatch_err("");
514
};
515
516
return self.should_cast_column(column_name, target_inner, incoming_inner);
517
}
518
519
#[cfg(feature = "dtype-array")]
520
if let DataType::Array(target_inner, target_width) = target_dtype {
521
let DataType::Array(incoming_inner, incoming_width) = incoming_dtype else {
522
return mismatch_err("");
523
};
524
525
if incoming_width != target_width {
526
return mismatch_err("");
527
}
528
529
return self.should_cast_column(column_name, target_inner, incoming_inner);
530
}
531
532
// Eq here should be cheap as we have intercepted all nested types above.
533
534
debug_assert!(!target_dtype.is_nested());
535
536
// If we were to drop cast on an `Unknown` incoming_dtype, it could eventually
537
// lead to dtype errors. The reason is that the logic used by type coercion differs
538
// from the casting logic used by `materialize_unknown`.
539
if incoming_dtype.contains_unknown() {
540
return Ok(true);
541
}
542
543
// Note: Only call this with non-nested types for performance
544
let materialize_unknown = |dtype: &DataType| -> std::borrow::Cow<DataType> {
545
dtype
546
.clone()
547
.materialize_unknown(true)
548
.map(std::borrow::Cow::Owned)
549
.unwrap_or(std::borrow::Cow::Borrowed(incoming_dtype))
550
};
551
552
let incoming_dtype = std::borrow::Cow::Borrowed(incoming_dtype);
553
let target_dtype = materialize_unknown(target_dtype);
554
555
if target_dtype == incoming_dtype {
556
return Ok(false);
557
}
558
559
let incoming_dtype = incoming_dtype.as_ref();
560
let target_dtype = target_dtype.as_ref();
561
562
//
563
// After this point the dtypes are mismatching.
564
//
565
566
if target_dtype.is_integer() && incoming_dtype.is_integer() {
567
if !self.integer_upcast {
568
return mismatch_err(
569
"hint: pass cast_options=pl.ScanCastOptions(integer_cast='upcast')",
570
);
571
}
572
573
return match get_numeric_upcast_supertype_lossless(incoming_dtype, target_dtype) {
574
Some(ref v) if v == target_dtype => Ok(true),
575
_ => mismatch_err("incoming dtype cannot safely cast to target dtype"),
576
};
577
}
578
579
if target_dtype.is_float() && incoming_dtype.is_float() {
580
return match (target_dtype, incoming_dtype) {
581
(DataType::Float64, DataType::Float32) => {
582
if self.float_upcast {
583
Ok(true)
584
} else {
585
mismatch_err(
586
"hint: pass cast_options=pl.ScanCastOptions(float_cast='upcast')",
587
)
588
}
589
},
590
591
(DataType::Float32, DataType::Float64) => {
592
if self.float_downcast {
593
Ok(true)
594
} else {
595
mismatch_err(
596
"hint: pass cast_options=pl.ScanCastOptions(float_cast='downcast')",
597
)
598
}
599
},
600
601
_ => unreachable!(),
602
};
603
}
604
605
if let (
606
DataType::Datetime(target_unit, target_zone),
607
DataType::Datetime(incoming_unit, incoming_zone),
608
) = (target_dtype, incoming_dtype)
609
{
610
// Check timezone
611
if !self.datetime_convert_timezone
612
&& !TimeZone::eq_none_as_utc(incoming_zone.as_ref(), target_zone.as_ref())
613
{
614
return mismatch_err(
615
"hint: pass cast_options=pl.ScanCastOptions(datetime_cast='convert-timezone')",
616
);
617
}
618
619
// Check unit
620
if target_unit != incoming_unit {
621
return match (incoming_unit, target_unit) {
622
(TimeUnit::Nanoseconds, _) => {
623
if self.datetime_nanoseconds_downcast {
624
Ok(true)
625
} else {
626
mismatch_err(
627
"hint: pass cast_options=pl.ScanCastOptions(datetime_cast='nanosecond-downcast')",
628
)
629
}
630
},
631
632
(TimeUnit::Microseconds, TimeUnit::Milliseconds) => {
633
if self.datetime_microseconds_downcast {
634
Ok(true)
635
} else {
636
// TODO
637
mismatch_err(
638
"unimplemented: 'microsecond-downcast' in scan cast options",
639
)
640
}
641
},
642
643
_ => mismatch_err(""),
644
};
645
}
646
647
// Dtype differs and we are allowed to coerce
648
return Ok(true);
649
}
650
651
mismatch_err("")
652
}
653
}
654
655