Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/dsl/file_scan/mod.rs
8446 views
1
use std::hash::Hash;
2
use std::sync::Mutex;
3
4
use deletion::DeletionFilesList;
5
use polars_core::schema::iceberg::IcebergSchemaRef;
6
use polars_core::utils::get_numeric_upcast_supertype_lossless;
7
use polars_io::cloud::CloudOptions;
8
#[cfg(feature = "csv")]
9
use polars_io::csv::read::CsvReadOptions;
10
#[cfg(feature = "ipc")]
11
use polars_io::ipc::IpcScanOptions;
12
#[cfg(feature = "parquet")]
13
use polars_io::parquet::metadata::FileMetadataRef;
14
#[cfg(feature = "parquet")]
15
use polars_io::parquet::read::ParquetOptions;
16
use polars_io::{HiveOptions, RowIndex};
17
use polars_utils::slice_enum::Slice;
18
#[cfg(feature = "serde")]
19
use serde::{Deserialize, Serialize};
20
use strum_macros::IntoStaticStr;
21
22
use super::*;
23
use crate::dsl::default_values::DefaultFieldValues;
24
pub mod default_values;
25
pub mod deletion;
26
27
#[cfg(feature = "python")]
28
pub mod python_dataset;
29
#[cfg(feature = "python")]
30
pub use python_dataset::{DATASET_PROVIDER_VTABLE, PythonDatasetProviderVTable};
31
32
bitflags::bitflags! {
33
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34
pub struct ScanFlags : u32 {
35
const SPECIALIZED_PREDICATE_FILTER = 0x01;
36
}
37
}
38
39
const _: () = {
40
assert!(std::mem::size_of::<FileScanDsl>() <= 100);
41
};
42
43
#[derive(Clone, Debug, IntoStaticStr)]
44
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
45
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
46
/// Note: This is cheaply cloneable.
47
pub enum FileScanDsl {
48
#[cfg(feature = "csv")]
49
Csv { options: Arc<CsvReadOptions> },
50
51
#[cfg(feature = "json")]
52
NDJson { options: NDJsonReadOptions },
53
54
#[cfg(feature = "parquet")]
55
Parquet { options: ParquetOptions },
56
57
#[cfg(feature = "ipc")]
58
Ipc { options: IpcScanOptions },
59
60
#[cfg(feature = "python")]
61
PythonDataset {
62
dataset_object: Arc<python_dataset::PythonDatasetProvider>,
63
},
64
65
#[cfg(feature = "scan_lines")]
66
Lines { name: PlSmallStr },
67
68
#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]
69
Anonymous {
70
options: Arc<AnonymousScanOptions>,
71
function: Arc<dyn AnonymousScan>,
72
file_info: FileInfo,
73
},
74
}
75
76
const _: () = {
77
assert!(std::mem::size_of::<FileScanIR>() <= 80);
78
};
79
80
#[derive(Clone, Debug, IntoStaticStr)]
81
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
82
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
83
/// Note: This is cheaply cloneable.
84
pub enum FileScanIR {
85
#[cfg(feature = "csv")]
86
Csv { options: Arc<CsvReadOptions> },
87
88
#[cfg(feature = "json")]
89
NDJson { options: NDJsonReadOptions },
90
91
#[cfg(feature = "parquet")]
92
Parquet {
93
options: ParquetOptions,
94
#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]
95
metadata: Option<FileMetadataRef>,
96
},
97
98
#[cfg(feature = "ipc")]
99
Ipc {
100
options: IpcScanOptions,
101
#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]
102
metadata: Option<Arc<arrow::io::ipc::read::FileMetadata>>,
103
},
104
105
#[cfg(feature = "python")]
106
PythonDataset {
107
dataset_object: Arc<python_dataset::PythonDatasetProvider>,
108
cached_ir: Arc<Mutex<Option<ExpandedDataset>>>,
109
},
110
111
#[cfg(feature = "scan_lines")]
112
Lines { name: PlSmallStr },
113
114
#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]
115
Anonymous {
116
options: Arc<AnonymousScanOptions>,
117
function: Arc<dyn AnonymousScan>,
118
},
119
}
120
121
impl FileScanIR {
122
pub fn flags(&self) -> ScanFlags {
123
match self {
124
#[cfg(feature = "csv")]
125
Self::Csv { .. } => ScanFlags::empty(),
126
#[cfg(feature = "ipc")]
127
Self::Ipc { .. } => ScanFlags::empty(),
128
#[cfg(feature = "parquet")]
129
Self::Parquet { .. } => ScanFlags::SPECIALIZED_PREDICATE_FILTER,
130
#[cfg(feature = "json")]
131
Self::NDJson { .. } => ScanFlags::empty(),
132
#[allow(unreachable_patterns)]
133
_ => ScanFlags::empty(),
134
}
135
}
136
137
pub(crate) fn sort_projection(&self, _has_row_index: bool) -> bool {
138
match self {
139
#[cfg(feature = "csv")]
140
Self::Csv { .. } => true,
141
#[cfg(feature = "ipc")]
142
Self::Ipc { .. } => _has_row_index,
143
#[cfg(feature = "parquet")]
144
Self::Parquet { .. } => false,
145
#[allow(unreachable_patterns)]
146
_ => false,
147
}
148
}
149
150
pub fn streamable(&self) -> bool {
151
match self {
152
#[cfg(feature = "csv")]
153
Self::Csv { .. } => true,
154
#[cfg(feature = "ipc")]
155
Self::Ipc { .. } => false,
156
#[cfg(feature = "parquet")]
157
Self::Parquet { .. } => true,
158
#[cfg(feature = "json")]
159
Self::NDJson { .. } => false,
160
#[allow(unreachable_patterns)]
161
_ => false,
162
}
163
}
164
}
165
166
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Hash)]
167
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
168
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
169
pub enum MissingColumnsPolicy {
170
#[default]
171
Raise,
172
/// Inserts full-NULL columns for the missing ones.
173
Insert,
174
}
175
176
/// Used by scans.
177
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
178
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
179
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
180
pub struct CastColumnsPolicy {
181
/// Allow casting when target dtype is lossless supertype
182
pub integer_upcast: bool,
183
184
/// Allow casting integers to floats.
185
#[cfg_attr(feature = "serde", serde(default))]
186
pub integer_to_float_cast: bool,
187
188
/// Allow upcasting from small floats to bigger floats
189
pub float_upcast: bool,
190
/// Allow downcasting from big floats to smaller floats
191
pub float_downcast: bool,
192
193
/// Allow datetime[ns] to be casted to any lower precision. Important for
194
/// being able to read datasets written by spark.
195
pub datetime_nanoseconds_downcast: bool,
196
/// Allow datetime[us] to datetime[ms]
197
pub datetime_microseconds_downcast: bool,
198
199
/// Allow casting to change time units.
200
pub datetime_convert_timezone: bool,
201
202
/// DataType::Null to any
203
pub null_upcast: bool,
204
205
/// DataType::Categorical to string
206
pub categorical_to_string: bool,
207
208
pub missing_struct_fields: MissingColumnsPolicy,
209
pub extra_struct_fields: ExtraColumnsPolicy,
210
}
211
212
impl CastColumnsPolicy {
213
/// Configuration variant that defaults to raising on mismatch.
214
pub const ERROR_ON_MISMATCH: Self = Self {
215
integer_upcast: false,
216
integer_to_float_cast: false,
217
float_upcast: false,
218
float_downcast: false,
219
datetime_nanoseconds_downcast: false,
220
datetime_microseconds_downcast: false,
221
datetime_convert_timezone: false,
222
null_upcast: true,
223
categorical_to_string: false,
224
missing_struct_fields: MissingColumnsPolicy::Raise,
225
extra_struct_fields: ExtraColumnsPolicy::Raise,
226
};
227
}
228
229
impl Default for CastColumnsPolicy {
230
fn default() -> Self {
231
Self::ERROR_ON_MISMATCH
232
}
233
}
234
235
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Hash)]
236
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
237
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
238
pub enum ExtraColumnsPolicy {
239
/// Error if there are extra columns outside the target schema.
240
#[default]
241
Raise,
242
Ignore,
243
}
244
245
#[derive(Debug, Clone, Eq, Hash, PartialEq, strum_macros::IntoStaticStr)]
246
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
247
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
248
pub enum ColumnMapping {
249
Iceberg(IcebergSchemaRef),
250
}
251
252
#[derive(Debug, Clone)]
253
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
254
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
255
pub struct TableStatistics(pub Arc<DataFrame>);
256
257
impl PartialEq for TableStatistics {
258
fn eq(&self, other: &Self) -> bool {
259
Arc::ptr_eq(&self.0, &other.0)
260
}
261
}
262
263
impl Eq for TableStatistics {}
264
265
impl Hash for TableStatistics {
266
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
267
state.write_usize(Arc::as_ptr(&self.0) as *const () as usize);
268
}
269
}
270
271
impl std::ops::Deref for TableStatistics {
272
type Target = Arc<DataFrame>;
273
274
fn deref(&self) -> &Self::Target {
275
&self.0
276
}
277
}
278
279
/// Scan arguments shared across different scan types.
280
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
281
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
282
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
283
pub struct UnifiedScanArgs {
284
/// User-provided schema of the file. Will be inferred during IR conversion
285
/// if None.
286
pub schema: Option<SchemaRef>,
287
pub cloud_options: Option<CloudOptions>,
288
pub hive_options: HiveOptions,
289
290
pub rechunk: bool,
291
pub cache: bool,
292
pub glob: bool,
293
/// Files with these prefixes will not be read.
294
pub hidden_file_prefix: Option<Arc<[PlSmallStr]>>,
295
296
pub projection: Option<Arc<[PlSmallStr]>>,
297
pub column_mapping: Option<ColumnMapping>,
298
/// Default values for missing columns.
299
pub default_values: Option<DefaultFieldValues>,
300
pub row_index: Option<RowIndex>,
301
/// Slice applied before predicates
302
pub pre_slice: Option<Slice>,
303
304
pub cast_columns_policy: CastColumnsPolicy,
305
pub missing_columns_policy: MissingColumnsPolicy,
306
pub extra_columns_policy: ExtraColumnsPolicy,
307
pub include_file_paths: Option<PlSmallStr>,
308
309
pub deletion_files: Option<DeletionFilesList>,
310
pub table_statistics: Option<TableStatistics>,
311
/// Stores (physical, deleted) row counts of the table if known upfront (e.g. for Iceberg).
312
/// This allows for row-count queries to succeed without scanning all files.
313
///
314
/// Note, intentionally store u64 instead of IdxSize to avoid erroring if it's unused.
315
pub row_count: Option<(u64, u64)>,
316
}
317
318
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
319
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
320
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
321
pub struct PredicateFileSkip {
322
/// If `true` the predicate can be skipped at runtime.
323
pub no_residual_predicate: bool,
324
/// Number of files before skipping
325
pub original_len: usize,
326
}
327
328
impl UnifiedScanArgs {
329
pub fn has_row_index_or_slice(&self) -> bool {
330
self.row_index.is_some() || self.pre_slice.is_some()
331
}
332
}
333
334
// Manual default, we have `glob: true` by default.
335
impl Default for UnifiedScanArgs {
336
fn default() -> Self {
337
Self {
338
schema: None,
339
cloud_options: None,
340
hive_options: HiveOptions::new_enabled(),
341
rechunk: false,
342
cache: false,
343
glob: true,
344
hidden_file_prefix: None,
345
projection: None,
346
column_mapping: None,
347
default_values: None,
348
row_index: None,
349
pre_slice: None,
350
cast_columns_policy: CastColumnsPolicy::default(),
351
missing_columns_policy: MissingColumnsPolicy::default(),
352
extra_columns_policy: ExtraColumnsPolicy::default(),
353
include_file_paths: None,
354
deletion_files: None,
355
table_statistics: None,
356
row_count: None,
357
}
358
}
359
}
360
361
/// Manual impls of Eq/Hash, as some fields are `Arc<T>` where T does not have Eq/Hash. For these
362
/// fields we compare the pointer addresses instead.
363
mod _file_scan_eq_hash {
364
use std::hash::{Hash, Hasher};
365
use std::sync::Arc;
366
367
#[cfg(feature = "scan_lines")]
368
use polars_utils::pl_str::PlSmallStr;
369
370
use super::FileScanIR;
371
372
impl Eq for FileScanIR {}
373
374
impl Hash for FileScanIR {
375
fn hash<H: Hasher>(&self, state: &mut H) {
376
FileScanEqHashWrap::from(self).hash(state)
377
}
378
}
379
380
impl PartialEq for FileScanIR {
381
fn eq(&self, other: &Self) -> bool {
382
FileScanEqHashWrap::from(self) == FileScanEqHashWrap::from(other)
383
}
384
}
385
386
/// # Hash / Eq safety
387
/// * All usizes originate from `Arc<>`s, and the lifetime of this enum is bound to that of the
388
/// input ref.
389
#[derive(PartialEq, Hash)]
390
pub enum FileScanEqHashWrap<'a> {
391
#[cfg(feature = "csv")]
392
Csv {
393
options: &'a polars_io::csv::read::CsvReadOptions,
394
},
395
396
#[cfg(feature = "json")]
397
NDJson {
398
options: &'a crate::prelude::NDJsonReadOptions,
399
},
400
401
#[cfg(feature = "parquet")]
402
Parquet {
403
options: &'a polars_io::prelude::ParquetOptions,
404
metadata: Option<usize>,
405
},
406
407
#[cfg(feature = "ipc")]
408
Ipc {
409
options: &'a polars_io::prelude::IpcScanOptions,
410
metadata: Option<usize>,
411
},
412
413
#[cfg(feature = "python")]
414
PythonDataset {
415
dataset_object: usize,
416
cached_ir: usize,
417
},
418
419
#[cfg(feature = "scan_lines")]
420
Lines { name: &'a PlSmallStr },
421
422
Anonymous {
423
options: &'a crate::dsl::AnonymousScanOptions,
424
function: usize,
425
},
426
427
/// Variant to ensure the lifetime is used regardless of feature gate combination.
428
#[expect(unused)]
429
Phantom(&'a ()),
430
}
431
432
impl<'a> From<&'a FileScanIR> for FileScanEqHashWrap<'a> {
433
fn from(value: &'a FileScanIR) -> Self {
434
match value {
435
#[cfg(feature = "csv")]
436
FileScanIR::Csv { options } => FileScanEqHashWrap::Csv { options },
437
438
#[cfg(feature = "json")]
439
FileScanIR::NDJson { options } => FileScanEqHashWrap::NDJson { options },
440
441
#[cfg(feature = "parquet")]
442
FileScanIR::Parquet { options, metadata } => FileScanEqHashWrap::Parquet {
443
options,
444
metadata: metadata.as_ref().map(arc_as_ptr),
445
},
446
447
#[cfg(feature = "ipc")]
448
FileScanIR::Ipc { options, metadata } => FileScanEqHashWrap::Ipc {
449
options,
450
metadata: metadata.as_ref().map(arc_as_ptr),
451
},
452
453
#[cfg(feature = "python")]
454
FileScanIR::PythonDataset {
455
dataset_object,
456
cached_ir,
457
} => FileScanEqHashWrap::PythonDataset {
458
dataset_object: arc_as_ptr(dataset_object),
459
cached_ir: arc_as_ptr(cached_ir),
460
},
461
462
#[cfg(feature = "scan_lines")]
463
FileScanIR::Lines { name } => FileScanEqHashWrap::Lines { name },
464
465
FileScanIR::Anonymous { options, function } => FileScanEqHashWrap::Anonymous {
466
options,
467
function: arc_as_ptr(function),
468
},
469
}
470
}
471
}
472
473
fn arc_as_ptr<T: ?Sized>(arc: &Arc<T>) -> usize {
474
Arc::as_ptr(arc) as *const () as usize
475
}
476
}
477
478
impl CastColumnsPolicy {
479
/// Checks if casting can be done to a dtype with a configured policy.
480
///
481
/// # Returns
482
/// * Ok(true): Cast needed to target dtype
483
/// * Ok(false): No casting needed
484
/// * Err(_): Forbidden by configuration, or incompatible types.
485
pub fn should_cast_column(
486
&self,
487
column_name: &str,
488
target_dtype: &DataType,
489
incoming_dtype: &DataType,
490
) -> PolarsResult<bool> {
491
let mismatch_err = |hint: &str| {
492
let hint_spacing = if hint.is_empty() { "" } else { ", " };
493
494
polars_bail!(
495
SchemaMismatch:
496
"data type mismatch for column {}: incoming: {:?} != target: {:?}{}{}",
497
column_name,
498
incoming_dtype,
499
target_dtype,
500
hint_spacing,
501
hint,
502
)
503
};
504
505
if incoming_dtype.is_null() && !target_dtype.is_null() {
506
return if self.null_upcast {
507
Ok(true)
508
} else {
509
mismatch_err("unimplemented: 'null-upcast' in scan cast options")
510
};
511
}
512
513
// We intercept the nested types first to prevent an expensive recursive eq - recursion
514
// is instead done manually through this function.
515
516
#[cfg(feature = "dtype-struct")]
517
if let DataType::Struct(target_fields) = target_dtype {
518
let DataType::Struct(incoming_fields) = incoming_dtype else {
519
return mismatch_err("");
520
};
521
522
let incoming_fields_schema = PlHashMap::from_iter(
523
incoming_fields
524
.iter()
525
.enumerate()
526
.map(|(i, fld)| (fld.name.as_str(), (i, &fld.dtype))),
527
);
528
529
let mut should_cast = incoming_fields.len() != target_fields.len();
530
531
for (target_idx, target_field) in target_fields.iter().enumerate() {
532
let Some((incoming_idx, incoming_field_dtype)) =
533
incoming_fields_schema.get(target_field.name().as_str())
534
else {
535
match self.missing_struct_fields {
536
MissingColumnsPolicy::Raise => {
537
return mismatch_err(&format!(
538
"encountered missing struct field: {}, \
539
hint: pass cast_options=pl.ScanCastOptions(missing_struct_fields='insert')",
540
target_field.name(),
541
));
542
},
543
MissingColumnsPolicy::Insert => {
544
should_cast = true;
545
// Must keep checking the rest of the fields.
546
continue;
547
},
548
};
549
};
550
551
// # Note
552
// We also need to cast if the struct fields are out of order. Currently there is
553
// no API parameter to control this - we always do this by default.
554
should_cast |= *incoming_idx != target_idx;
555
556
should_cast |= self.should_cast_column(
557
column_name,
558
&target_field.dtype,
559
incoming_field_dtype,
560
)?;
561
}
562
563
// Casting is also needed if there are extra fields, check them here.
564
565
// Take and re-use hashmap
566
let mut target_fields_schema = incoming_fields_schema;
567
target_fields_schema.clear();
568
569
target_fields_schema.extend(
570
target_fields
571
.iter()
572
.enumerate()
573
.map(|(i, fld)| (fld.name.as_str(), (i, &fld.dtype))),
574
);
575
576
for fld in incoming_fields {
577
if !target_fields_schema.contains_key(fld.name.as_str()) {
578
match self.extra_struct_fields {
579
ExtraColumnsPolicy::Ignore => {
580
should_cast = true;
581
break;
582
},
583
ExtraColumnsPolicy::Raise => {
584
return mismatch_err(&format!(
585
"encountered extra struct field: {}, \
586
hint: specify this field in the schema, or pass \
587
cast_options=pl.ScanCastOptions(extra_struct_fields='ignore')",
588
&fld.name,
589
));
590
},
591
}
592
}
593
}
594
595
return Ok(should_cast);
596
}
597
598
if let DataType::List(target_inner) = target_dtype {
599
let DataType::List(incoming_inner) = incoming_dtype else {
600
return mismatch_err("");
601
};
602
603
return self.should_cast_column(column_name, target_inner, incoming_inner);
604
}
605
606
#[cfg(feature = "dtype-array")]
607
if let DataType::Array(target_inner, target_width) = target_dtype {
608
let DataType::Array(incoming_inner, incoming_width) = incoming_dtype else {
609
return mismatch_err("");
610
};
611
612
if incoming_width != target_width {
613
return mismatch_err("");
614
}
615
616
return self.should_cast_column(column_name, target_inner, incoming_inner);
617
}
618
619
// Eq here should be cheap as we have intercepted all nested types above.
620
621
debug_assert!(!target_dtype.is_nested());
622
623
// If we were to drop cast on an `Unknown` incoming_dtype, it could eventually
624
// lead to dtype errors. The reason is that the logic used by type coercion differs
625
// from the casting logic used by `materialize_unknown`.
626
if incoming_dtype.contains_unknown() {
627
return Ok(true);
628
}
629
630
// Note: Only call this with non-nested types for performance
631
let materialize_unknown = |dtype: &DataType| -> std::borrow::Cow<DataType> {
632
dtype
633
.clone()
634
.materialize_unknown(true)
635
.map(std::borrow::Cow::Owned)
636
.unwrap_or(std::borrow::Cow::Borrowed(incoming_dtype))
637
};
638
639
let incoming_dtype = std::borrow::Cow::Borrowed(incoming_dtype);
640
let target_dtype = materialize_unknown(target_dtype);
641
642
if target_dtype == incoming_dtype {
643
return Ok(false);
644
}
645
646
let incoming_dtype = incoming_dtype.as_ref();
647
let target_dtype = target_dtype.as_ref();
648
649
//
650
// After this point the dtypes are mismatching.
651
//
652
653
if target_dtype.is_integer() && incoming_dtype.is_integer() {
654
if !self.integer_upcast {
655
return mismatch_err(
656
"hint: pass cast_options=pl.ScanCastOptions(integer_cast='upcast')",
657
);
658
}
659
660
return match get_numeric_upcast_supertype_lossless(incoming_dtype, target_dtype) {
661
Some(ref v) if v == target_dtype => Ok(true),
662
_ => mismatch_err("incoming dtype cannot safely cast to target dtype"),
663
};
664
}
665
666
if target_dtype.is_float() && incoming_dtype.is_float() {
667
return match (target_dtype, incoming_dtype) {
668
(DataType::Float64, DataType::Float32)
669
| (DataType::Float64, DataType::Float16)
670
| (DataType::Float32, DataType::Float16) => {
671
if self.float_upcast {
672
Ok(true)
673
} else {
674
mismatch_err(
675
"hint: pass cast_options=pl.ScanCastOptions(float_cast='upcast')",
676
)
677
}
678
},
679
680
(DataType::Float16, DataType::Float32)
681
| (DataType::Float16, DataType::Float64)
682
| (DataType::Float32, DataType::Float64) => {
683
if self.float_downcast {
684
Ok(true)
685
} else {
686
mismatch_err(
687
"hint: pass cast_options=pl.ScanCastOptions(float_cast='downcast')",
688
)
689
}
690
},
691
692
_ => unreachable!(),
693
};
694
}
695
696
if target_dtype.is_float() && incoming_dtype.is_integer() {
697
return if !self.integer_to_float_cast {
698
mismatch_err(
699
"hint: pass cast_options=pl.ScanCastOptions(integer_cast='allow-float')",
700
)
701
} else {
702
Ok(true)
703
};
704
}
705
706
if let (
707
DataType::Datetime(target_unit, target_zone),
708
DataType::Datetime(incoming_unit, incoming_zone),
709
) = (target_dtype, incoming_dtype)
710
{
711
// Check timezone
712
if !self.datetime_convert_timezone
713
&& !TimeZone::eq_none_as_utc(incoming_zone.as_ref(), target_zone.as_ref())
714
{
715
return mismatch_err(
716
"hint: pass cast_options=pl.ScanCastOptions(datetime_cast='convert-timezone')",
717
);
718
}
719
720
// Check unit
721
if target_unit != incoming_unit {
722
return match (incoming_unit, target_unit) {
723
(TimeUnit::Nanoseconds, _) => {
724
if self.datetime_nanoseconds_downcast {
725
Ok(true)
726
} else {
727
mismatch_err(
728
"hint: pass cast_options=pl.ScanCastOptions(datetime_cast='nanosecond-downcast')",
729
)
730
}
731
},
732
733
(TimeUnit::Microseconds, TimeUnit::Milliseconds) => {
734
if self.datetime_microseconds_downcast {
735
Ok(true)
736
} else {
737
// TODO
738
mismatch_err(
739
"unimplemented: 'microsecond-downcast' in scan cast options",
740
)
741
}
742
},
743
744
_ => mismatch_err(""),
745
};
746
}
747
748
// Dtype differs and we are allowed to coerce
749
return Ok(true);
750
}
751
752
mismatch_err("")
753
}
754
}
755
756