Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/conversion/dsl_to_ir/scans.rs
7889 views
1
use std::sync::LazyLock;
2
3
use arrow::buffer::Buffer;
4
use either::Either;
5
use polars_io::RowIndex;
6
#[cfg(feature = "cloud")]
7
use polars_io::pl_async::get_runtime;
8
use polars_io::prelude::*;
9
use polars_io::utils::compression::maybe_decompress_bytes;
10
11
use super::*;
12
13
pub(super) fn dsl_to_ir(
14
sources: ScanSources,
15
mut unified_scan_args_box: Box<UnifiedScanArgs>,
16
scan_type: Box<FileScanDsl>,
17
cached_ir: Arc<Mutex<Option<IR>>>,
18
ctxt: &mut DslConversionContext,
19
) -> PolarsResult<IR> {
20
// Note that the first metadata can still end up being `None` later if the files were
21
// filtered from predicate pushdown.
22
let mut cached_ir = cached_ir.lock().unwrap();
23
24
if cached_ir.is_none() {
25
let unified_scan_args = unified_scan_args_box.as_mut();
26
27
if let Some(hive_schema) = unified_scan_args.hive_options.schema.as_deref() {
28
match unified_scan_args.hive_options.enabled {
29
// Enable hive_partitioning if it is unspecified but a non-empty hive_schema given
30
None if !hive_schema.is_empty() => {
31
unified_scan_args.hive_options.enabled = Some(true)
32
},
33
// hive_partitioning was explicitly disabled
34
Some(false) => polars_bail!(
35
ComputeError:
36
"a hive schema was given but hive_partitioning was disabled"
37
),
38
Some(true) | None => {},
39
}
40
}
41
42
let sources_before_expansion = &sources;
43
44
let sources = match &*scan_type {
45
#[cfg(feature = "parquet")]
46
FileScanDsl::Parquet { .. } => {
47
sources.expand_paths_with_hive_update(unified_scan_args)?
48
},
49
#[cfg(feature = "ipc")]
50
FileScanDsl::Ipc { .. } => sources.expand_paths_with_hive_update(unified_scan_args)?,
51
#[cfg(feature = "csv")]
52
FileScanDsl::Csv { .. } => sources.expand_paths(unified_scan_args)?,
53
#[cfg(feature = "json")]
54
FileScanDsl::NDJson { .. } => sources.expand_paths(unified_scan_args)?,
55
#[cfg(feature = "python")]
56
FileScanDsl::PythonDataset { .. } => {
57
// There are a lot of places that short-circuit if the paths is empty,
58
// so we just give a dummy path here.
59
ScanSources::Paths(Buffer::from_iter([PlPath::from_str("dummy")]))
60
},
61
#[cfg(feature = "scan_lines")]
62
FileScanDsl::Lines { .. } => sources.expand_paths(unified_scan_args)?,
63
FileScanDsl::Anonymous { .. } => sources.clone(),
64
};
65
66
// For cloud we must deduplicate files. Serialization/deserialization leads to Arc's losing there
67
// sharing.
68
let (mut file_info, scan_type_ir) = ctxt.cache_file_info.get_or_insert(
69
&scan_type,
70
&sources,
71
sources_before_expansion,
72
unified_scan_args,
73
ctxt.verbose,
74
)?;
75
76
if unified_scan_args.hive_options.enabled.is_none() {
77
// We expect this to be `Some(_)` after this point. If it hasn't been auto-enabled
78
// we explicitly set it to disabled.
79
unified_scan_args.hive_options.enabled = Some(false);
80
}
81
82
let hive_parts = if unified_scan_args.hive_options.enabled.unwrap()
83
&& file_info.reader_schema.is_some()
84
{
85
let paths = sources
86
.as_paths()
87
.ok_or_else(|| polars_err!(nyi = "Hive-partitioning of in-memory buffers"))?;
88
89
#[allow(unused_assignments)]
90
let mut owned = None;
91
92
hive_partitions_from_paths(
93
paths,
94
unified_scan_args.hive_options.hive_start_idx,
95
unified_scan_args.hive_options.schema.clone(),
96
match file_info.reader_schema.as_ref().unwrap() {
97
Either::Left(v) => {
98
owned = Some(Schema::from_arrow_schema(v.as_ref()));
99
owned.as_ref().unwrap()
100
},
101
Either::Right(v) => v.as_ref(),
102
},
103
unified_scan_args.hive_options.try_parse_dates,
104
)?
105
} else {
106
None
107
};
108
109
if let Some(ref hive_parts) = hive_parts {
110
let hive_schema = hive_parts.schema();
111
file_info.update_schema_with_hive_schema(hive_schema.clone());
112
} else if let Some(hive_schema) = unified_scan_args.hive_options.schema.clone() {
113
// We hit here if we are passed the `hive_schema` to `scan_parquet` but end up with an empty file
114
// list during path expansion. In this case we still want to return an empty DataFrame with this
115
// schema.
116
file_info.update_schema_with_hive_schema(hive_schema);
117
}
118
119
if let Some(ref file_path_col) = unified_scan_args.include_file_paths {
120
let schema: &mut Schema = Arc::make_mut(&mut file_info.schema);
121
122
if schema.contains(file_path_col) {
123
polars_bail!(
124
Duplicate: r#"column name for file paths "{}" conflicts with column name from file"#,
125
file_path_col
126
);
127
}
128
129
schema.insert_at_index(schema.len(), file_path_col.clone(), DataType::String)?;
130
}
131
132
unified_scan_args.projection = if file_info.reader_schema.is_some() {
133
maybe_init_projection_excluding_hive(
134
file_info.reader_schema.as_ref().unwrap(),
135
hive_parts.as_ref().map(|h| h.schema()),
136
)
137
} else {
138
None
139
};
140
141
if let Some(row_index) = &unified_scan_args.row_index {
142
let schema = Arc::make_mut(&mut file_info.schema);
143
*schema = schema
144
.new_inserting_at_index(0, row_index.name.clone(), IDX_DTYPE)
145
.unwrap();
146
}
147
148
let ir = if sources.is_empty() && !matches!(&(*scan_type), FileScanDsl::Anonymous { .. }) {
149
IR::DataFrameScan {
150
df: Arc::new(DataFrame::empty_with_schema(&file_info.schema)),
151
schema: file_info.schema,
152
output_schema: None,
153
}
154
} else {
155
let unified_scan_args = unified_scan_args_box;
156
157
IR::Scan {
158
sources,
159
file_info,
160
hive_parts,
161
predicate: None,
162
predicate_file_skip_applied: None,
163
scan_type: Box::new(scan_type_ir),
164
output_schema: None,
165
unified_scan_args,
166
}
167
};
168
169
cached_ir.replace(ir);
170
}
171
172
Ok(cached_ir.clone().unwrap())
173
}
174
175
pub(super) fn insert_row_index_to_schema(
176
schema: &mut Schema,
177
name: PlSmallStr,
178
) -> PolarsResult<()> {
179
if schema.contains(&name) {
180
polars_bail!(
181
Duplicate:
182
"cannot add row_index with name '{}': \
183
column already exists in file.",
184
name,
185
)
186
}
187
188
schema.insert_at_index(0, name, IDX_DTYPE)?;
189
190
Ok(())
191
}
192
193
#[cfg(any(feature = "parquet", feature = "ipc"))]
194
fn prepare_output_schema(
195
mut schema: Schema,
196
row_index: Option<&RowIndex>,
197
) -> PolarsResult<SchemaRef> {
198
if let Some(rc) = row_index {
199
insert_row_index_to_schema(&mut schema, rc.name.clone())?;
200
}
201
Ok(Arc::new(schema))
202
}
203
204
#[cfg(any(feature = "json", feature = "csv"))]
205
fn prepare_schemas(
206
mut schema: Schema,
207
row_index: Option<&RowIndex>,
208
) -> PolarsResult<(SchemaRef, SchemaRef)> {
209
Ok(if let Some(rc) = row_index {
210
let reader_schema = schema.clone();
211
insert_row_index_to_schema(&mut schema, rc.name.clone())?;
212
(Arc::new(reader_schema), Arc::new(schema))
213
} else {
214
let schema = Arc::new(schema);
215
(schema.clone(), schema)
216
})
217
}
218
219
#[cfg(feature = "parquet")]
220
pub(super) fn parquet_file_info(
221
first_scan_source: ScanSourceRef<'_>,
222
row_index: Option<&RowIndex>,
223
#[allow(unused)] cloud_options: Option<&polars_io::cloud::CloudOptions>,
224
n_sources: usize,
225
) -> PolarsResult<(FileInfo, Option<FileMetadataRef>)> {
226
use polars_core::error::feature_gated;
227
228
let (reader_schema, num_rows, metadata) = {
229
if first_scan_source.is_cloud_url() {
230
let first_path = first_scan_source.as_path().unwrap();
231
feature_gated!("cloud", {
232
get_runtime().block_in_place_on(async {
233
let mut reader =
234
ParquetObjectStore::from_uri(first_path, cloud_options, None).await?;
235
236
PolarsResult::Ok((
237
reader.schema().await?,
238
reader.num_rows().await?,
239
reader.get_metadata().await?.clone(),
240
))
241
})?
242
})
243
} else {
244
let memslice = first_scan_source.to_memslice()?;
245
let mut reader = ParquetReader::new(std::io::Cursor::new(memslice));
246
(
247
reader.schema()?,
248
reader.num_rows()?,
249
reader.get_metadata()?.clone(),
250
)
251
}
252
};
253
254
let schema =
255
prepare_output_schema(Schema::from_arrow_schema(reader_schema.as_ref()), row_index)?;
256
257
let known_size = if n_sources == 1 { Some(num_rows) } else { None };
258
259
let file_info = FileInfo::new(
260
schema,
261
Some(Either::Left(reader_schema)),
262
(known_size, num_rows * n_sources),
263
);
264
265
Ok((file_info, Some(metadata)))
266
}
267
268
pub fn max_metadata_scan_cached() -> usize {
269
static MAX_SCANS_METADATA_CACHED: LazyLock<usize> = LazyLock::new(|| {
270
let value = std::env::var("POLARS_MAX_CACHED_METADATA_SCANS").map_or(8, |v| {
271
v.parse::<usize>()
272
.expect("invalid `POLARS_MAX_CACHED_METADATA_SCANS` value")
273
});
274
if value == 0 {
275
return usize::MAX;
276
}
277
value
278
});
279
*MAX_SCANS_METADATA_CACHED
280
}
281
282
// TODO! return metadata arced
283
#[cfg(feature = "ipc")]
284
pub(super) fn ipc_file_info(
285
first_scan_source: ScanSourceRef<'_>,
286
row_index: Option<&RowIndex>,
287
cloud_options: Option<&polars_io::cloud::CloudOptions>,
288
) -> PolarsResult<(FileInfo, arrow::io::ipc::read::FileMetadata)> {
289
use polars_core::error::feature_gated;
290
use polars_utils::plpath::PlPathRef;
291
292
let metadata = match first_scan_source {
293
ScanSourceRef::Path(path) => match path {
294
PlPathRef::Cloud(_) => {
295
feature_gated!("cloud", {
296
get_runtime().block_on(async {
297
polars_io::ipc::IpcReaderAsync::from_uri(path, cloud_options)
298
.await?
299
.metadata()
300
.await
301
})?
302
})
303
},
304
PlPathRef::Local(path) => arrow::io::ipc::read::read_file_metadata(
305
&mut std::io::BufReader::new(polars_utils::open_file(path)?),
306
)?,
307
},
308
ScanSourceRef::File(file) => {
309
arrow::io::ipc::read::read_file_metadata(&mut std::io::BufReader::new(file))?
310
},
311
ScanSourceRef::Buffer(buff) => {
312
arrow::io::ipc::read::read_file_metadata(&mut std::io::Cursor::new(buff))?
313
},
314
};
315
316
let file_info = FileInfo::new(
317
prepare_output_schema(
318
Schema::from_arrow_schema(metadata.schema.as_ref()),
319
row_index,
320
)?,
321
Some(Either::Left(Arc::clone(&metadata.schema))),
322
(None, usize::MAX),
323
);
324
325
Ok((file_info, metadata))
326
}
327
328
#[cfg(feature = "csv")]
329
pub fn csv_file_info(
330
sources: &ScanSources,
331
_first_scan_source: ScanSourceRef<'_>,
332
row_index: Option<&RowIndex>,
333
csv_options: &mut CsvReadOptions,
334
cloud_options: Option<&polars_io::cloud::CloudOptions>,
335
) -> PolarsResult<FileInfo> {
336
use std::io::{Read, Seek};
337
338
use polars_core::error::feature_gated;
339
use polars_core::{POOL, config};
340
use polars_io::csv::read::schema_inference::SchemaInferenceResult;
341
use polars_io::utils::get_reader_bytes;
342
use rayon::iter::{IntoParallelIterator, ParallelIterator};
343
344
// Holding _first_scan_source should guarantee sources is not empty.
345
debug_assert!(!sources.is_empty());
346
347
// TODO:
348
// * See if we can do better than scanning all files if there is a row limit
349
// * See if we can do this without downloading the entire file
350
351
// prints the error message if paths is empty.
352
let run_async = sources.is_cloud_url() || (sources.is_paths() && config::force_async());
353
354
let cache_entries = {
355
if run_async {
356
feature_gated!("cloud", {
357
Some(polars_io::file_cache::init_entries_from_uri_list(
358
sources
359
.as_paths()
360
.unwrap()
361
.iter()
362
.map(|path| Arc::from(path.to_str())),
363
cloud_options,
364
)?)
365
})
366
} else {
367
None
368
}
369
};
370
371
let infer_schema_func = |i| {
372
let source = sources.at(i);
373
let memslice = source.to_memslice_possibly_async(run_async, cache_entries.as_ref(), i)?;
374
let owned = &mut vec![];
375
let mut reader = std::io::Cursor::new(maybe_decompress_bytes(&memslice, owned)?);
376
if reader.read(&mut [0; 4])? < 2 && csv_options.raise_if_empty {
377
polars_bail!(NoData: "empty CSV")
378
}
379
reader.rewind()?;
380
381
let reader_bytes = get_reader_bytes(&mut reader).expect("could not mmap file");
382
383
// this needs a way to estimated bytes/rows.
384
SchemaInferenceResult::try_from_reader_bytes_and_options(&reader_bytes, csv_options)
385
};
386
387
let merge_func = |a: PolarsResult<SchemaInferenceResult>,
388
b: PolarsResult<SchemaInferenceResult>| {
389
match (a, b) {
390
(Err(e), _) | (_, Err(e)) => Err(e),
391
(Ok(a), Ok(b)) => {
392
let merged_schema = if csv_options.schema.is_some() {
393
csv_options.schema.clone().unwrap()
394
} else {
395
let schema_a = a.get_inferred_schema();
396
let schema_b = b.get_inferred_schema();
397
398
match (schema_a.is_empty(), schema_b.is_empty()) {
399
(true, _) => schema_b,
400
(_, true) => schema_a,
401
_ => {
402
let mut s = Arc::unwrap_or_clone(schema_a);
403
s.to_supertype(&schema_b)?;
404
Arc::new(s)
405
},
406
}
407
};
408
409
Ok(a.with_inferred_schema(merged_schema))
410
},
411
}
412
};
413
414
let si_results = POOL.join(
415
|| infer_schema_func(0),
416
|| {
417
(1..sources.len())
418
.into_par_iter()
419
.map(infer_schema_func)
420
.reduce(|| Ok(Default::default()), merge_func)
421
},
422
);
423
424
let si_result = merge_func(si_results.0, si_results.1)?;
425
426
csv_options.update_with_inference_result(&si_result);
427
428
let mut schema = csv_options
429
.schema
430
.clone()
431
.unwrap_or_else(|| si_result.get_inferred_schema());
432
433
let reader_schema = if let Some(rc) = row_index {
434
let reader_schema = schema.clone();
435
let mut output_schema = (*reader_schema).clone();
436
insert_row_index_to_schema(&mut output_schema, rc.name.clone())?;
437
schema = Arc::new(output_schema);
438
reader_schema
439
} else {
440
schema.clone()
441
};
442
443
let estimated_n_rows = si_result.get_estimated_n_rows();
444
445
Ok(FileInfo::new(
446
schema,
447
Some(Either::Right(reader_schema)),
448
(None, estimated_n_rows),
449
))
450
}
451
452
#[cfg(feature = "json")]
453
pub fn ndjson_file_info(
454
sources: &ScanSources,
455
first_scan_source: ScanSourceRef<'_>,
456
row_index: Option<&RowIndex>,
457
ndjson_options: &NDJsonReadOptions,
458
cloud_options: Option<&polars_io::cloud::CloudOptions>,
459
) -> PolarsResult<FileInfo> {
460
use polars_core::config;
461
use polars_core::error::feature_gated;
462
463
let run_async = sources.is_cloud_url() || (sources.is_paths() && config::force_async());
464
465
let cache_entries = {
466
if run_async {
467
feature_gated!("cloud", {
468
Some(polars_io::file_cache::init_entries_from_uri_list(
469
sources
470
.as_paths()
471
.unwrap()
472
.iter()
473
.map(|path| Arc::from(path.to_str())),
474
cloud_options,
475
)?)
476
})
477
} else {
478
None
479
}
480
};
481
482
let owned = &mut vec![];
483
484
let mut schema = if let Some(schema) = ndjson_options.schema.clone() {
485
schema
486
} else {
487
let memslice =
488
first_scan_source.to_memslice_possibly_async(run_async, cache_entries.as_ref(), 0)?;
489
let mut reader = std::io::Cursor::new(maybe_decompress_bytes(&memslice, owned)?);
490
491
Arc::new(polars_io::ndjson::infer_schema(
492
&mut reader,
493
ndjson_options.infer_schema_length,
494
)?)
495
};
496
497
if let Some(overwriting_schema) = &ndjson_options.schema_overwrite {
498
overwrite_schema(Arc::make_mut(&mut schema), overwriting_schema)?;
499
}
500
501
let mut reader_schema = schema.clone();
502
503
if row_index.is_some() {
504
(schema, reader_schema) = prepare_schemas(Arc::unwrap_or_clone(schema), row_index)?
505
}
506
507
Ok(FileInfo::new(
508
schema,
509
Some(Either::Right(reader_schema)),
510
(None, usize::MAX),
511
))
512
}
513
514
// Add flags that influence metadata/schema here
515
#[derive(Eq, Hash, PartialEq)]
516
enum CachedSourceKey {
517
ParquetIpc {
518
first_path: PlPath,
519
schema_overwrite: Option<SchemaRef>,
520
},
521
CsvJson {
522
paths: Buffer<PlPath>,
523
schema: Option<SchemaRef>,
524
schema_overwrite: Option<SchemaRef>,
525
},
526
}
527
528
#[derive(Default)]
529
pub(super) struct SourcesToFileInfo {
530
inner: PlHashMap<CachedSourceKey, (FileInfo, FileScanIR)>,
531
}
532
533
impl SourcesToFileInfo {
534
fn infer_or_parse(
535
&mut self,
536
scan_type: FileScanDsl,
537
sources: &ScanSources,
538
sources_before_expansion: &ScanSources,
539
unified_scan_args: &mut UnifiedScanArgs,
540
) -> PolarsResult<(FileInfo, FileScanIR)> {
541
let require_first_source = |failed_operation_name: &'static str, hint: &'static str| {
542
sources.first_or_empty_expand_err(
543
failed_operation_name,
544
sources_before_expansion,
545
unified_scan_args.glob,
546
hint,
547
)
548
};
549
550
let n_sources = sources.len();
551
let cloud_options = unified_scan_args.cloud_options.as_ref();
552
553
Ok(match scan_type {
554
#[cfg(feature = "parquet")]
555
FileScanDsl::Parquet { options } => {
556
if let Some(schema) = &options.schema {
557
// We were passed a schema, we don't have to call `parquet_file_info`,
558
// but this does mean we don't have `row_estimation` and `first_metadata`.
559
560
(
561
FileInfo {
562
schema: schema.clone(),
563
reader_schema: Some(either::Either::Left(Arc::new(
564
schema.to_arrow(CompatLevel::newest()),
565
))),
566
row_estimation: (None, usize::MAX),
567
},
568
FileScanIR::Parquet {
569
options,
570
metadata: None,
571
},
572
)
573
} else {
574
(|| {
575
let first_scan_source = require_first_source(
576
"failed to retrieve first file schema (parquet)",
577
"\
578
passing a schema can allow \
579
this scan to succeed with an empty DataFrame.",
580
)?;
581
582
if verbose() {
583
eprintln!(
584
"sourcing parquet scan file schema from: '{}'",
585
first_scan_source.to_include_path_name()
586
)
587
}
588
589
let (mut file_info, mut metadata) = scans::parquet_file_info(
590
first_scan_source,
591
unified_scan_args.row_index.as_ref(),
592
cloud_options,
593
n_sources,
594
)?;
595
596
if let Some((total, deleted)) = unified_scan_args.row_count {
597
let size = (total - deleted) as usize;
598
file_info.row_estimation = (Some(size), size);
599
}
600
601
if self.inner.len() > max_metadata_scan_cached() {
602
_ = metadata.take();
603
}
604
605
PolarsResult::Ok((file_info, FileScanIR::Parquet { options, metadata }))
606
})()
607
.map_err(|e| e.context(failed_here!(parquet scan)))?
608
}
609
},
610
#[cfg(feature = "ipc")]
611
FileScanDsl::Ipc { options } => (|| {
612
let first_scan_source =
613
require_first_source("failed to retrieve first file schema (ipc)", "")?;
614
615
if verbose() {
616
eprintln!(
617
"sourcing ipc scan file schema from: '{}'",
618
first_scan_source.to_include_path_name()
619
)
620
}
621
622
let (file_info, md) = scans::ipc_file_info(
623
first_scan_source,
624
unified_scan_args.row_index.as_ref(),
625
cloud_options,
626
)?;
627
628
PolarsResult::Ok((
629
file_info,
630
FileScanIR::Ipc {
631
options,
632
metadata: Some(Arc::new(md)),
633
},
634
))
635
})()
636
.map_err(|e| e.context(failed_here!(ipc scan)))?,
637
#[cfg(feature = "csv")]
638
FileScanDsl::Csv { mut options } => {
639
(|| {
640
// TODO: This is a hack. We conditionally set `allow_missing_columns` to
641
// mimic existing behavior, but this should be taken from a user provided
642
// parameter instead.
643
if options.schema.is_some() && options.has_header {
644
unified_scan_args.missing_columns_policy = MissingColumnsPolicy::Insert;
645
}
646
647
let file_info = if let Some(schema) = options.schema.clone() {
648
FileInfo {
649
schema: schema.clone(),
650
reader_schema: Some(either::Either::Right(schema)),
651
row_estimation: (None, usize::MAX),
652
}
653
} else {
654
let first_scan_source =
655
require_first_source("failed to retrieve file schemas (csv)", "")?;
656
657
if verbose() {
658
eprintln!(
659
"sourcing csv scan file schema from: '{}'",
660
first_scan_source.to_include_path_name()
661
)
662
}
663
664
scans::csv_file_info(
665
sources,
666
first_scan_source,
667
unified_scan_args.row_index.as_ref(),
668
&mut options,
669
cloud_options,
670
)?
671
};
672
673
PolarsResult::Ok((file_info, FileScanIR::Csv { options }))
674
})()
675
.map_err(|e| e.context(failed_here!(csv scan)))?
676
},
677
#[cfg(feature = "json")]
678
FileScanDsl::NDJson { options } => (|| {
679
let file_info = if let Some(schema) = options.schema.clone() {
680
FileInfo {
681
schema: schema.clone(),
682
reader_schema: Some(either::Either::Right(schema)),
683
row_estimation: (None, usize::MAX),
684
}
685
} else {
686
let first_scan_source =
687
require_first_source("failed to retrieve first file schema (ndjson)", "")?;
688
689
if verbose() {
690
eprintln!(
691
"sourcing ndjson scan file schema from: '{}'",
692
first_scan_source.to_include_path_name()
693
)
694
}
695
696
scans::ndjson_file_info(
697
sources,
698
first_scan_source,
699
unified_scan_args.row_index.as_ref(),
700
&options,
701
cloud_options,
702
)?
703
};
704
705
PolarsResult::Ok((file_info, FileScanIR::NDJson { options }))
706
})()
707
.map_err(|e| e.context(failed_here!(ndjson scan)))?,
708
#[cfg(feature = "python")]
709
FileScanDsl::PythonDataset { dataset_object } => (|| {
710
if crate::dsl::DATASET_PROVIDER_VTABLE.get().is_none() {
711
polars_bail!(ComputeError: "DATASET_PROVIDER_VTABLE (python) not initialized")
712
}
713
714
let mut schema = dataset_object.schema()?;
715
let reader_schema = schema.clone();
716
717
if let Some(row_index) = &unified_scan_args.row_index {
718
insert_row_index_to_schema(Arc::make_mut(&mut schema), row_index.name.clone())?;
719
}
720
721
PolarsResult::Ok((
722
FileInfo {
723
schema,
724
reader_schema: Some(either::Either::Right(reader_schema)),
725
row_estimation: (None, usize::MAX),
726
},
727
FileScanIR::PythonDataset {
728
dataset_object,
729
cached_ir: Default::default(),
730
},
731
))
732
})()
733
.map_err(|e| e.context(failed_here!(python dataset scan)))?,
734
#[cfg(feature = "scan_lines")]
735
FileScanDsl::Lines { name } => {
736
// We were passed a schema, we don't have to call `parquet_file_info`,
737
// but this does mean we don't have `row_estimation` and `first_metadata`.
738
let schema = Arc::new(Schema::from_iter([(name.clone(), DataType::String)]));
739
740
(
741
FileInfo {
742
schema: schema.clone(),
743
reader_schema: Some(either::Either::Right(schema.clone())),
744
row_estimation: (None, usize::MAX),
745
},
746
FileScanIR::Lines { name },
747
)
748
},
749
FileScanDsl::Anonymous {
750
file_info,
751
options,
752
function,
753
} => (file_info, FileScanIR::Anonymous { options, function }),
754
})
755
}
756
757
pub(super) fn get_or_insert(
758
&mut self,
759
scan_type: &FileScanDsl,
760
sources: &ScanSources,
761
sources_before_expansion: &ScanSources,
762
unified_scan_args: &mut UnifiedScanArgs,
763
verbose: bool,
764
) -> PolarsResult<(FileInfo, FileScanIR)> {
765
// Only cache non-empty paths. Others are directly parsed.
766
let paths = match sources {
767
ScanSources::Paths(paths) if !paths.is_empty() => paths.clone(),
768
769
_ => {
770
return self.infer_or_parse(
771
scan_type.clone(),
772
sources,
773
sources_before_expansion,
774
unified_scan_args,
775
);
776
},
777
};
778
779
let (k, v): (CachedSourceKey, Option<&(FileInfo, FileScanIR)>) = match scan_type {
780
#[cfg(feature = "parquet")]
781
FileScanDsl::Parquet { options } => {
782
let key = CachedSourceKey::ParquetIpc {
783
first_path: paths[0].clone(),
784
schema_overwrite: options.schema.clone(),
785
};
786
787
let v = self.inner.get(&key);
788
(key, v)
789
},
790
#[cfg(feature = "ipc")]
791
FileScanDsl::Ipc { options: _ } => {
792
let key = CachedSourceKey::ParquetIpc {
793
first_path: paths[0].clone(),
794
schema_overwrite: None,
795
};
796
797
let v = self.inner.get(&key);
798
(key, v)
799
},
800
#[cfg(feature = "csv")]
801
FileScanDsl::Csv { options } => {
802
let key = CachedSourceKey::CsvJson {
803
paths: paths.clone(),
804
schema: options.schema.clone(),
805
schema_overwrite: options.schema_overwrite.clone(),
806
};
807
let v = self.inner.get(&key);
808
(key, v)
809
},
810
#[cfg(feature = "json")]
811
FileScanDsl::NDJson { options } => {
812
let key = CachedSourceKey::CsvJson {
813
paths: paths.clone(),
814
schema: options.schema.clone(),
815
schema_overwrite: options.schema_overwrite.clone(),
816
};
817
let v = self.inner.get(&key);
818
(key, v)
819
},
820
_ => {
821
return self.infer_or_parse(
822
scan_type.clone(),
823
sources,
824
sources_before_expansion,
825
unified_scan_args,
826
);
827
},
828
};
829
830
if let Some(out) = v {
831
if verbose {
832
eprintln!("FILE_INFO CACHE HIT")
833
}
834
Ok(out.clone())
835
} else {
836
let v = self.infer_or_parse(
837
scan_type.clone(),
838
sources,
839
sources_before_expansion,
840
unified_scan_args,
841
)?;
842
self.inner.insert(k, v.clone());
843
Ok(v)
844
}
845
}
846
}
847
848