Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/conversion/dsl_to_ir/scans.rs
8509 views
1
use std::io::{BufReader, Cursor};
2
use std::sync::{LazyLock, RwLock};
3
4
use either::Either;
5
use polars_buffer::Buffer;
6
use polars_io::csv::read::streaming::read_until_start_and_infer_schema;
7
use polars_io::prelude::*;
8
use polars_io::utils::byte_source::{ByteSource, DynByteSourceBuilder};
9
use polars_io::utils::compression::{ByteSourceReader, CompressedReader, SupportedCompression};
10
use polars_io::utils::stream_buf_reader::ReaderSource;
11
use polars_io::{RowIndex, pl_async};
12
13
use super::*;
14
15
pub(super) async fn dsl_to_ir(
16
sources: ScanSources,
17
mut unified_scan_args_box: Box<UnifiedScanArgs>,
18
scan_type: Box<FileScanDsl>,
19
cached_ir: Arc<Mutex<Option<IR>>>,
20
cache_file_info: SourcesToFileInfo,
21
verbose: bool,
22
) -> PolarsResult<()> {
23
// Note that the first metadata can still end up being `None` later if the files were
24
// filtered from predicate pushdown.
25
// Check and drop the lock in its own scope
26
let is_not_cached = {
27
let cached_ir_guard = cached_ir.lock().unwrap();
28
cached_ir_guard.is_none()
29
};
30
31
if is_not_cached {
32
let unified_scan_args = unified_scan_args_box.as_mut();
33
34
if let Some(hive_schema) = unified_scan_args.hive_options.schema.as_deref() {
35
match unified_scan_args.hive_options.enabled {
36
// Enable hive_partitioning if it is unspecified but a non-empty hive_schema given
37
None if !hive_schema.is_empty() => {
38
unified_scan_args.hive_options.enabled = Some(true)
39
},
40
// hive_partitioning was explicitly disabled
41
Some(false) => polars_bail!(
42
ComputeError:
43
"a hive schema was given but hive_partitioning was disabled"
44
),
45
Some(true) | None => {},
46
}
47
}
48
49
let sources_before_expansion = &sources;
50
51
let sources = match &*scan_type {
52
#[cfg(feature = "parquet")]
53
FileScanDsl::Parquet { .. } => {
54
sources
55
.expand_paths_with_hive_update(unified_scan_args)
56
.await?
57
},
58
#[cfg(feature = "ipc")]
59
FileScanDsl::Ipc { .. } => {
60
sources
61
.expand_paths_with_hive_update(unified_scan_args)
62
.await?
63
},
64
#[cfg(feature = "csv")]
65
FileScanDsl::Csv { .. } => sources.expand_paths(unified_scan_args).await?,
66
#[cfg(feature = "json")]
67
FileScanDsl::NDJson { .. } => sources.expand_paths(unified_scan_args).await?,
68
#[cfg(feature = "python")]
69
FileScanDsl::PythonDataset { .. } => {
70
// There are a lot of places that short-circuit if the paths is empty,
71
// so we just give a dummy path here.
72
ScanSources::Paths(Buffer::from_iter([PlRefPath::new("PL_PY_DSET")]))
73
},
74
#[cfg(feature = "scan_lines")]
75
FileScanDsl::Lines { .. } => sources.expand_paths(unified_scan_args).await?,
76
FileScanDsl::Anonymous { .. } => sources.clone(),
77
};
78
79
// For cloud we must deduplicate files. Serialization/deserialization leads to Arc's losing there
80
// sharing.
81
let (mut file_info, scan_type_ir) = {
82
cache_file_info
83
.get_or_insert(
84
&scan_type,
85
&sources,
86
sources_before_expansion,
87
unified_scan_args,
88
verbose,
89
)
90
.await?
91
};
92
93
if unified_scan_args.hive_options.enabled.is_none() {
94
// We expect this to be `Some(_)` after this point. If it hasn't been auto-enabled
95
// we explicitly set it to disabled.
96
unified_scan_args.hive_options.enabled = Some(false);
97
}
98
99
let hive_parts = if unified_scan_args.hive_options.enabled.unwrap()
100
&& let Some(file_schema) = file_info.reader_schema.as_ref()
101
{
102
let paths = sources
103
.as_paths()
104
.ok_or_else(|| polars_err!(nyi = "Hive-partitioning of in-memory buffers"))?;
105
106
#[allow(unused_assignments)]
107
let mut owned = None;
108
109
hive_partitions_from_paths(
110
paths,
111
unified_scan_args.hive_options.hive_start_idx,
112
unified_scan_args.hive_options.schema.clone(),
113
match file_schema {
114
Either::Left(v) => {
115
owned = Some(Schema::from_arrow_schema(v.as_ref()));
116
owned.as_ref().unwrap()
117
},
118
Either::Right(v) => v.as_ref(),
119
},
120
unified_scan_args.hive_options.try_parse_dates,
121
)?
122
} else {
123
None
124
};
125
126
if let Some(ref hive_parts) = hive_parts {
127
let hive_schema = hive_parts.schema();
128
file_info.update_schema_with_hive_schema(hive_schema.clone());
129
} else if let Some(hive_schema) = unified_scan_args.hive_options.schema.clone() {
130
// We hit here if we are passed the `hive_schema` to `scan_parquet` but end up with an empty file
131
// list during path expansion. In this case we still want to return an empty DataFrame with this
132
// schema.
133
file_info.update_schema_with_hive_schema(hive_schema);
134
}
135
136
if let Some(ref file_path_col) = unified_scan_args.include_file_paths {
137
let schema: &mut Schema = Arc::make_mut(&mut file_info.schema);
138
139
if schema.contains(file_path_col) {
140
polars_bail!(
141
Duplicate: r#"column name for file paths "{}" conflicts with column name from file"#,
142
file_path_col
143
);
144
}
145
146
schema.insert_at_index(schema.len(), file_path_col.clone(), DataType::String)?;
147
}
148
149
unified_scan_args.projection = if let Some(file_schema) = file_info.reader_schema.as_ref() {
150
maybe_init_projection_excluding_hive(
151
file_schema,
152
hive_parts.as_ref().map(|h| h.schema()),
153
)
154
} else {
155
None
156
};
157
158
if let Some(row_index) = &unified_scan_args.row_index {
159
let schema = Arc::make_mut(&mut file_info.schema);
160
*schema = schema
161
.new_inserting_at_index(0, row_index.name.clone(), IDX_DTYPE)
162
.unwrap();
163
}
164
165
let ir = if sources.is_empty() && !matches!(&(*scan_type), FileScanDsl::Anonymous { .. }) {
166
IR::DataFrameScan {
167
df: Arc::new(DataFrame::empty_with_schema(&file_info.schema)),
168
schema: file_info.schema,
169
output_schema: None,
170
}
171
} else {
172
let unified_scan_args = unified_scan_args_box;
173
174
IR::Scan {
175
sources,
176
file_info,
177
hive_parts,
178
predicate: None,
179
predicate_file_skip_applied: None,
180
scan_type: Box::new(scan_type_ir),
181
output_schema: None,
182
unified_scan_args,
183
}
184
};
185
186
let mut cached_ir = cached_ir.lock().unwrap();
187
cached_ir.replace(ir);
188
}
189
190
Ok(())
191
}
192
193
pub(super) fn insert_row_index_to_schema(
194
schema: &mut Schema,
195
name: PlSmallStr,
196
) -> PolarsResult<()> {
197
if schema.contains(&name) {
198
polars_bail!(
199
Duplicate:
200
"cannot add row_index with name '{}': \
201
column already exists in file.",
202
name,
203
)
204
}
205
206
schema.insert_at_index(0, name, IDX_DTYPE)?;
207
208
Ok(())
209
}
210
211
#[cfg(any(feature = "parquet", feature = "ipc"))]
212
fn prepare_output_schema(
213
mut schema: Schema,
214
row_index: Option<&RowIndex>,
215
) -> PolarsResult<SchemaRef> {
216
if let Some(rc) = row_index {
217
insert_row_index_to_schema(&mut schema, rc.name.clone())?;
218
}
219
Ok(Arc::new(schema))
220
}
221
222
#[cfg(any(feature = "json", feature = "csv"))]
223
fn prepare_schemas(
224
mut schema: Schema,
225
row_index: Option<&RowIndex>,
226
) -> PolarsResult<(SchemaRef, SchemaRef)> {
227
Ok(if let Some(rc) = row_index {
228
let reader_schema = schema.clone();
229
insert_row_index_to_schema(&mut schema, rc.name.clone())?;
230
(Arc::new(reader_schema), Arc::new(schema))
231
} else {
232
let schema = Arc::new(schema);
233
(schema.clone(), schema)
234
})
235
}
236
237
#[cfg(feature = "parquet")]
238
pub(super) async fn parquet_file_info(
239
first_scan_source: ScanSourceRef<'_>,
240
row_index: Option<&RowIndex>,
241
#[allow(unused)] cloud_options: Option<&polars_io::cloud::CloudOptions>,
242
n_sources: usize,
243
) -> PolarsResult<(FileInfo, Option<FileMetadataRef>)> {
244
use polars_core::error::feature_gated;
245
246
let (reader_schema, num_rows, metadata) = {
247
if first_scan_source.is_cloud_url() {
248
let first_path = first_scan_source.as_path().unwrap();
249
feature_gated!("cloud", {
250
let mut reader =
251
ParquetObjectStore::from_uri(first_path.clone(), cloud_options, None).await?;
252
253
(
254
reader.schema().await?,
255
reader.num_rows().await?,
256
reader.get_metadata().await?.clone(),
257
)
258
})
259
} else {
260
let memslice = first_scan_source.to_memslice()?;
261
let mut reader = ParquetReader::new(std::io::Cursor::new(memslice));
262
(
263
reader.schema()?,
264
reader.num_rows()?,
265
reader.get_metadata()?.clone(),
266
)
267
}
268
};
269
270
let schema =
271
prepare_output_schema(Schema::from_arrow_schema(reader_schema.as_ref()), row_index)?;
272
273
let known_size = if n_sources == 1 { Some(num_rows) } else { None };
274
275
let file_info = FileInfo::new(
276
schema,
277
Some(Either::Left(reader_schema)),
278
(known_size, num_rows * n_sources),
279
);
280
281
Ok((file_info, Some(metadata)))
282
}
283
284
pub fn max_metadata_scan_cached() -> usize {
285
static MAX_SCANS_METADATA_CACHED: LazyLock<usize> = LazyLock::new(|| {
286
let value = std::env::var("POLARS_MAX_CACHED_METADATA_SCANS").map_or(8, |v| {
287
v.parse::<usize>()
288
.expect("invalid `POLARS_MAX_CACHED_METADATA_SCANS` value")
289
});
290
if value == 0 {
291
return usize::MAX;
292
}
293
value
294
});
295
*MAX_SCANS_METADATA_CACHED
296
}
297
298
// TODO! return metadata arced
299
#[cfg(feature = "ipc")]
300
pub(super) async fn ipc_file_info(
301
first_scan_source: ScanSourceRef<'_>,
302
row_index: Option<&RowIndex>,
303
cloud_options: Option<&polars_io::cloud::CloudOptions>,
304
) -> PolarsResult<(FileInfo, arrow::io::ipc::read::FileMetadata)> {
305
use polars_core::error::feature_gated;
306
307
let metadata = match first_scan_source {
308
ScanSourceRef::Path(path) => {
309
if path.has_scheme() {
310
feature_gated!("cloud", {
311
polars_io::ipc::IpcReaderAsync::from_uri(path.clone(), cloud_options)
312
.await?
313
.metadata()
314
.await?
315
})
316
} else {
317
arrow::io::ipc::read::read_file_metadata(&mut std::io::BufReader::new(
318
polars_utils::open_file(path.as_std_path())?,
319
))?
320
}
321
},
322
ScanSourceRef::File(file) => {
323
arrow::io::ipc::read::read_file_metadata(&mut std::io::BufReader::new(file))?
324
},
325
ScanSourceRef::Buffer(buff) => {
326
arrow::io::ipc::read::read_file_metadata(&mut std::io::Cursor::new(buff))?
327
},
328
};
329
330
let file_info = FileInfo::new(
331
prepare_output_schema(
332
Schema::from_arrow_schema(metadata.schema.as_ref()),
333
row_index,
334
)?,
335
Some(Either::Left(Arc::clone(&metadata.schema))),
336
(None, usize::MAX),
337
);
338
339
Ok((file_info, metadata))
340
}
341
342
#[cfg(feature = "csv")]
343
pub async fn csv_file_info(
344
sources: &ScanSources,
345
_first_scan_source: ScanSourceRef<'_>,
346
row_index: Option<&RowIndex>,
347
csv_options: &mut CsvReadOptions,
348
cloud_options: Option<&polars_io::cloud::CloudOptions>,
349
) -> PolarsResult<FileInfo> {
350
use polars_core::POOL;
351
use polars_core::error::feature_gated;
352
use rayon::iter::{IntoParallelIterator, ParallelIterator};
353
354
// Holding _first_scan_source should guarantee sources is not empty.
355
debug_assert!(!sources.is_empty());
356
357
// TODO:
358
// * See if we can do better than scanning all files if there is a row limit
359
360
// prints the error message if paths is empty.
361
let run_async =
362
sources.is_cloud_url() || (sources.is_paths() && polars_config::config().force_async());
363
364
let cache_entries = {
365
if run_async {
366
let sources = sources.clone();
367
assert!(sources.as_paths().is_some());
368
369
feature_gated!("cloud", {
370
Some(
371
polars_io::file_cache::init_entries_from_uri_list(
372
(0..sources.len())
373
.map(move |i| sources.as_paths().unwrap().get(i).unwrap().clone()),
374
cloud_options,
375
)
376
.await?,
377
)
378
})
379
} else {
380
None
381
}
382
};
383
384
let infer_schema_length = csv_options.infer_schema_length;
385
let infer_schema_func = |i| {
386
const ASSUMED_COMPRESSION_RATIO: usize = 4;
387
let source = sources.at(i);
388
389
let (mem_slice_raw, file_size, decompressed_slice_size_hint) = if run_async
390
&& let Some(infer_schema_length) = infer_schema_length
391
{
392
// Only download what we need for schema inference.
393
// To do so, we use an iterative two-way progressive trial-and-error download strategy
394
// until we either have enough rows, or reached EOF. In every iteration, we either
395
// increase fetch_size (download progressively more), or try_read_size (try and
396
// decompress more of what we have, in the case of compressed).
397
const INITIAL_FETCH: usize = 64 * 1024;
398
399
// Collect metadata.
400
let byte_source = pl_async::get_runtime().block_on(async move {
401
source
402
.to_dyn_byte_source(&DynByteSourceBuilder::ObjectStore, cloud_options, None)
403
.await
404
})?;
405
let byte_source = Arc::new(byte_source);
406
407
let file_size = {
408
let byte_source = byte_source.clone();
409
pl_async::get_runtime().block_on(async move { byte_source.get_size().await })?
410
};
411
412
let compression = if file_size >= 4 {
413
let byte_source = byte_source.clone();
414
let magic_range = 0..4;
415
let magic_bytes = pl_async::get_runtime()
416
.block_on(async move { byte_source.get_range(magic_range).await })?;
417
SupportedCompression::check(&magic_bytes)
418
} else {
419
None
420
};
421
422
let mut offset = 0;
423
let mut fetch_size = INITIAL_FETCH;
424
let mut try_read_size = INITIAL_FETCH * ASSUMED_COMPRESSION_RATIO;
425
let mut truncated_bytes: Vec<u8> = Vec::with_capacity(INITIAL_FETCH);
426
let mut reached_eof = false;
427
428
// Collect enough rows to satisfy infer_schema_length.
429
let (mem_slice_raw, decompressed_slice_size_hint) = loop {
430
let range = offset..std::cmp::min(file_size, offset + fetch_size);
431
432
if range.is_empty() {
433
reached_eof = true
434
} else {
435
let byte_source = byte_source.clone();
436
let fetch_bytes = pl_async::get_runtime()
437
.block_on(async move { byte_source.get_range(range).await })?;
438
offset += fetch_bytes.len();
439
truncated_bytes.extend_from_slice(fetch_bytes.as_ref());
440
}
441
442
let decompressed_size_hint =
443
Some(offset * compression.map_or(1, |_| ASSUMED_COMPRESSION_RATIO));
444
let mut reader = ByteSourceReader::<ReaderSource>::from_memory(
445
Buffer::from_owner(truncated_bytes.clone()),
446
)?;
447
448
let read_size = if compression.is_none() {
449
offset
450
} else if reached_eof {
451
usize::MAX
452
} else {
453
try_read_size
454
};
455
456
// Note: if `count_rows_from_reader_par` and therefore also `read_next_slice` were to
457
// handle truncated compressed bytes gracefully, we could avoid the following EoF check
458
// and remove `try_read_size` from the loop.
459
let (slice, bytes_read) =
460
match reader.read_next_slice(&Buffer::new(), read_size, decompressed_size_hint)
461
{
462
Ok(v) => v,
463
// We assume that unexpected EOF indicates that we lack sufficient data.
464
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
465
fetch_size *= 2;
466
continue;
467
},
468
Err(e) => Err(e)?,
469
};
470
471
let row_count = polars_io::csv::read::count_rows_from_slice_par(
472
slice.clone(),
473
csv_options.parse_options.quote_char,
474
csv_options.parse_options.comment_prefix.as_ref(),
475
csv_options.parse_options.eol_char,
476
csv_options.has_header,
477
csv_options.skip_lines,
478
csv_options.skip_rows,
479
csv_options.skip_rows_after_header,
480
csv_options.raise_if_empty,
481
)?;
482
483
if row_count < infer_schema_length && !reached_eof {
484
if compression.is_some() && bytes_read == read_size {
485
// Decompressor had more to give — read_size too small
486
try_read_size *= 2;
487
} else {
488
// Decompressor exhausted input — need more compressed bytes
489
// Or, no compression
490
fetch_size *= 2;
491
}
492
continue;
493
}
494
495
break (Buffer::from_owner(truncated_bytes), Some(bytes_read));
496
};
497
(mem_slice_raw, file_size, decompressed_slice_size_hint)
498
} else {
499
let mem_slice_raw =
500
source.to_buffer_possibly_async(run_async, cache_entries.as_ref(), i)?;
501
let file_size = mem_slice_raw.len();
502
let compression = SupportedCompression::check(&mem_slice_raw);
503
let decompressed_slice_size_hint = Some(match compression {
504
None => file_size,
505
Some(_) => file_size * ASSUMED_COMPRESSION_RATIO,
506
});
507
(mem_slice_raw, file_size, decompressed_slice_size_hint)
508
};
509
510
let mut reader = ByteSourceReader::from_memory(mem_slice_raw)?;
511
let compression = reader.compression();
512
513
let mut first_row_len = 0;
514
let (schema, _) = read_until_start_and_infer_schema(
515
csv_options,
516
None,
517
decompressed_slice_size_hint,
518
Some(Box::new(|line| {
519
first_row_len = line.len() + 1;
520
})),
521
&mut reader,
522
)?;
523
524
let decompressed_file_size_hint = match compression {
525
None => file_size,
526
Some(_) => file_size * ASSUMED_COMPRESSION_RATIO,
527
};
528
529
// TODO. We can do (much) better by collect statistics as part of row count and/or schema
530
// inference, including observed average row_length and compression ratio.
531
let estimated_rows =
532
(decompressed_file_size_hint as f64 / first_row_len as f64).round() as usize;
533
534
Ok((schema, estimated_rows))
535
};
536
537
let merge_func =
538
|a: PolarsResult<(Schema, usize)>, b: PolarsResult<(Schema, usize)>| match (a, b) {
539
(Err(e), _) | (_, Err(e)) => Err(e),
540
(Ok((mut schema_a, row_estimate_a)), Ok((schema_b, row_estimate_b))) => {
541
match (schema_a.is_empty(), schema_b.is_empty()) {
542
(true, _) => Ok((schema_b, row_estimate_b)),
543
(_, true) => Ok((schema_a, row_estimate_a)),
544
_ => {
545
schema_a.to_supertype(&schema_b)?;
546
Ok((schema_a, row_estimate_a.saturating_add(row_estimate_b)))
547
},
548
}
549
},
550
};
551
552
assert!(
553
csv_options.schema.is_none(),
554
"DSL to IR schema inference should not run if user provides a schema."
555
);
556
// Run inference in parallel with a specific merge order.
557
// TODO: flatten to single level once Schema::to_supertype is commutative.
558
let si_results = POOL.join(
559
|| infer_schema_func(0),
560
|| {
561
(1..sources.len())
562
.into_par_iter()
563
.map(infer_schema_func)
564
.reduce(|| Ok(Default::default()), merge_func)
565
},
566
);
567
568
let (inferred_schema, estimated_n_rows) = merge_func(si_results.0, si_results.1)?;
569
let inferred_schema_ref = Arc::new(inferred_schema);
570
571
let (schema, reader_schema) = if let Some(rc) = row_index {
572
let mut output_schema = (*inferred_schema_ref).clone();
573
insert_row_index_to_schema(&mut output_schema, rc.name.clone())?;
574
575
(Arc::new(output_schema), inferred_schema_ref)
576
} else {
577
(inferred_schema_ref.clone(), inferred_schema_ref)
578
};
579
580
Ok(FileInfo::new(
581
schema,
582
Some(Either::Right(reader_schema)),
583
(None, estimated_n_rows),
584
))
585
}
586
587
#[cfg(feature = "json")]
588
pub async fn ndjson_file_info(
589
sources: &ScanSources,
590
first_scan_source: ScanSourceRef<'_>,
591
row_index: Option<&RowIndex>,
592
ndjson_options: &NDJsonReadOptions,
593
cloud_options: Option<&polars_io::cloud::CloudOptions>,
594
) -> PolarsResult<FileInfo> {
595
use polars_core::error::feature_gated;
596
597
let run_async =
598
sources.is_cloud_url() || (sources.is_paths() && polars_config::config().force_async());
599
600
let cache_entries = {
601
if run_async {
602
let sources = sources.clone();
603
assert!(sources.as_paths().is_some());
604
605
feature_gated!("cloud", {
606
Some(
607
polars_io::file_cache::init_entries_from_uri_list(
608
(0..sources.len())
609
.map(move |i| sources.as_paths().unwrap().get(i).unwrap().clone()),
610
cloud_options,
611
)
612
.await?,
613
)
614
})
615
} else {
616
None
617
}
618
};
619
620
let infer_schema_length = ndjson_options.infer_schema_length;
621
622
let mut schema = if let Some(schema) = ndjson_options.schema.clone() {
623
schema
624
} else if run_async && let Some(infer_schema_length) = infer_schema_length {
625
// Only download what we need for schema inference.
626
// To do so, we use an iterative two-way progressive trial-and-error download strategy
627
// until we either have enough rows, or reached EOF. In every iteration, we either
628
// increase fetch_size (download progressively more), or try_read_size (try and
629
// decompress more of what we have, in the case of compressed).
630
use polars_io::utils::compression::{ByteSourceReader, SupportedCompression};
631
use polars_io::utils::stream_buf_reader::ReaderSource;
632
633
const INITIAL_FETCH: usize = 64 * 1024;
634
const ASSUMED_COMPRESSION_RATIO: usize = 4;
635
636
let first_scan_source = first_scan_source.into_owned()?.clone();
637
let cloud_options = cloud_options.cloned();
638
// TODO. Support IOMetrics collection during planning phase.
639
let byte_source = pl_async::get_runtime()
640
.spawn(async move {
641
first_scan_source
642
.as_scan_source_ref()
643
.to_dyn_byte_source(
644
&DynByteSourceBuilder::ObjectStore,
645
cloud_options.as_ref(),
646
None,
647
)
648
.await
649
})
650
.await
651
.unwrap()?;
652
let byte_source = Arc::new(byte_source);
653
654
let file_size = {
655
let byte_source = byte_source.clone();
656
pl_async::get_runtime()
657
.spawn(async move { byte_source.get_size().await })
658
.await
659
.unwrap()?
660
};
661
662
let mut offset = 0;
663
let mut fetch_size = INITIAL_FETCH;
664
let mut try_read_size = INITIAL_FETCH * ASSUMED_COMPRESSION_RATIO;
665
let mut truncated_bytes: Vec<u8> = Vec::with_capacity(INITIAL_FETCH);
666
let mut reached_eof = false;
667
668
// Collect enough rows to satisfy infer_schema_length
669
let memslice = loop {
670
let range = offset..std::cmp::min(file_size, offset + fetch_size);
671
672
if range.is_empty() {
673
reached_eof = true
674
} else {
675
let byte_source = byte_source.clone();
676
let fetch_bytes = pl_async::get_runtime()
677
.spawn(async move { byte_source.get_range(range).await })
678
.await
679
.unwrap()?;
680
offset += fetch_bytes.len();
681
truncated_bytes.extend_from_slice(fetch_bytes.as_ref());
682
}
683
684
let compression = SupportedCompression::check(&truncated_bytes);
685
let mut reader = ByteSourceReader::<ReaderSource>::from_memory(Buffer::from_owner(
686
truncated_bytes.clone(),
687
))?;
688
let read_size = if compression.is_none() {
689
offset
690
} else if reached_eof {
691
usize::MAX
692
} else {
693
try_read_size
694
};
695
696
let uncompressed_size_hint = Some(
697
offset
698
* if compression.is_none() {
699
1
700
} else {
701
ASSUMED_COMPRESSION_RATIO
702
},
703
);
704
705
let (slice, bytes_read) =
706
match reader.read_next_slice(&Buffer::new(), read_size, uncompressed_size_hint) {
707
Ok(v) => v,
708
// We assume that unexpected EOF indicates that we lack sufficient data.
709
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
710
fetch_size *= 2;
711
continue;
712
},
713
Err(e) => Err(e)?,
714
};
715
716
if polars_io::ndjson::count_rows(&slice) < infer_schema_length.into() && !reached_eof {
717
if compression.is_some() && bytes_read == read_size {
718
// Decompressor had more to give — read_size too small
719
try_read_size *= 2;
720
} else {
721
// Decompressor exhausted input — need more compressed bytes
722
// Or, no compression
723
fetch_size *= 2;
724
}
725
continue;
726
}
727
728
break slice;
729
};
730
731
let mut buf_reader = BufReader::new(Cursor::new(memslice));
732
Arc::new(polars_io::ndjson::infer_schema(
733
&mut buf_reader,
734
ndjson_options.infer_schema_length,
735
)?)
736
} else {
737
// Download the entire object.
738
// Warning - this is potentially memory-expensive in the case of a cloud source, and goes
739
// against the design goal of a streaming reader. This can be optimized.
740
let mem_slice =
741
first_scan_source.to_buffer_possibly_async(run_async, cache_entries.as_ref(), 0)?;
742
let mut reader = BufReader::new(CompressedReader::try_new(mem_slice)?);
743
744
Arc::new(polars_io::ndjson::infer_schema(
745
&mut reader,
746
ndjson_options.infer_schema_length,
747
)?)
748
};
749
750
if let Some(overwriting_schema) = &ndjson_options.schema_overwrite {
751
overwrite_schema(Arc::make_mut(&mut schema), overwriting_schema)?;
752
}
753
754
let mut reader_schema = schema.clone();
755
756
if row_index.is_some() {
757
(schema, reader_schema) = prepare_schemas(Arc::unwrap_or_clone(schema), row_index)?
758
}
759
760
Ok(FileInfo::new(
761
schema,
762
Some(Either::Right(reader_schema)),
763
(None, usize::MAX),
764
))
765
}
766
767
// Add flags that influence metadata/schema here
768
#[derive(Eq, Hash, PartialEq)]
769
enum CachedSourceKey {
770
ParquetIpc {
771
first_path: PlRefPath,
772
schema_overwrite: Option<SchemaRef>,
773
},
774
CsvJson {
775
paths: Buffer<PlRefPath>,
776
schema: Option<SchemaRef>,
777
schema_overwrite: Option<SchemaRef>,
778
},
779
}
780
781
#[derive(Default, Clone)]
782
pub(super) struct SourcesToFileInfo {
783
inner: Arc<RwLock<PlHashMap<CachedSourceKey, (FileInfo, FileScanIR)>>>,
784
}
785
786
impl SourcesToFileInfo {
787
async fn infer_or_parse(
788
&self,
789
scan_type: FileScanDsl,
790
sources: &ScanSources,
791
sources_before_expansion: &ScanSources,
792
unified_scan_args: &mut UnifiedScanArgs,
793
) -> PolarsResult<(FileInfo, FileScanIR)> {
794
let require_first_source = |failed_operation_name: &'static str, hint: &'static str| {
795
sources.first_or_empty_expand_err(
796
failed_operation_name,
797
sources_before_expansion,
798
unified_scan_args.glob,
799
hint,
800
)
801
};
802
803
let n_sources = sources.len();
804
let cloud_options = unified_scan_args.cloud_options.as_ref();
805
806
Ok(match scan_type {
807
#[cfg(feature = "parquet")]
808
FileScanDsl::Parquet { options } => {
809
if let Some(schema) = &options.schema {
810
// We were passed a schema, we don't have to call `parquet_file_info`,
811
// but this does mean we don't have `row_estimation` and `first_metadata`.
812
813
(
814
FileInfo {
815
schema: schema.clone(),
816
reader_schema: Some(either::Either::Left(Arc::new(
817
schema.to_arrow(CompatLevel::newest()),
818
))),
819
row_estimation: (None, usize::MAX),
820
},
821
FileScanIR::Parquet {
822
options,
823
metadata: None,
824
},
825
)
826
} else {
827
{
828
let first_scan_source = require_first_source(
829
"failed to retrieve first file schema (parquet)",
830
"\
831
passing a schema can allow \
832
this scan to succeed with an empty DataFrame.",
833
)?;
834
835
if verbose() {
836
eprintln!(
837
"sourcing parquet scan file schema from: '{}'",
838
first_scan_source.to_include_path_name()
839
)
840
}
841
842
let (mut file_info, mut metadata) = scans::parquet_file_info(
843
first_scan_source,
844
unified_scan_args.row_index.as_ref(),
845
cloud_options,
846
n_sources,
847
)
848
.await?;
849
850
if let Some((total, deleted)) = unified_scan_args.row_count {
851
let size = (total - deleted) as usize;
852
file_info.row_estimation = (Some(size), size);
853
}
854
855
if self.inner.read().unwrap().len() > max_metadata_scan_cached() {
856
_ = metadata.take();
857
}
858
859
PolarsResult::Ok((file_info, FileScanIR::Parquet { options, metadata }))
860
}
861
.map_err(|e| e.context(failed_here!(parquet scan)))?
862
}
863
},
864
#[cfg(feature = "ipc")]
865
FileScanDsl::Ipc { options } => {
866
let first_scan_source =
867
require_first_source("failed to retrieve first file schema (ipc)", "")?;
868
869
if verbose() {
870
eprintln!(
871
"sourcing ipc scan file schema from: '{}'",
872
first_scan_source.to_include_path_name()
873
)
874
}
875
876
let (file_info, md) = scans::ipc_file_info(
877
first_scan_source,
878
unified_scan_args.row_index.as_ref(),
879
cloud_options,
880
)
881
.await?;
882
883
PolarsResult::Ok((
884
file_info,
885
FileScanIR::Ipc {
886
options,
887
metadata: Some(Arc::new(md)),
888
},
889
))
890
}
891
.map_err(|e| e.context(failed_here!(ipc scan)))?,
892
#[cfg(feature = "csv")]
893
FileScanDsl::Csv { mut options } => {
894
{
895
// TODO: This is a hack. We conditionally set `allow_missing_columns` to
896
// mimic existing behavior, but this should be taken from a user provided
897
// parameter instead.
898
if options.schema.is_some() && options.has_header {
899
unified_scan_args.missing_columns_policy = MissingColumnsPolicy::Insert;
900
}
901
902
let file_info = if let Some(schema) = options.schema.clone() {
903
FileInfo {
904
schema: schema.clone(),
905
reader_schema: Some(either::Either::Right(schema)),
906
row_estimation: (None, usize::MAX),
907
}
908
} else {
909
let first_scan_source =
910
require_first_source("failed to retrieve file schemas (csv)", "")?;
911
912
if verbose() {
913
eprintln!(
914
"sourcing csv scan file schema from: '{}'",
915
first_scan_source.to_include_path_name()
916
)
917
}
918
919
scans::csv_file_info(
920
sources,
921
first_scan_source,
922
unified_scan_args.row_index.as_ref(),
923
Arc::make_mut(&mut options),
924
cloud_options,
925
)
926
.await?
927
};
928
929
PolarsResult::Ok((file_info, FileScanIR::Csv { options }))
930
}
931
.map_err(|e| e.context(failed_here!(csv scan)))?
932
},
933
#[cfg(feature = "json")]
934
FileScanDsl::NDJson { options } => {
935
let file_info = if let Some(schema) = options.schema.clone() {
936
FileInfo {
937
schema: schema.clone(),
938
reader_schema: Some(either::Either::Right(schema)),
939
row_estimation: (None, usize::MAX),
940
}
941
} else {
942
let first_scan_source =
943
require_first_source("failed to retrieve first file schema (ndjson)", "")?;
944
945
if verbose() {
946
eprintln!(
947
"sourcing ndjson scan file schema from: '{}'",
948
first_scan_source.to_include_path_name()
949
)
950
}
951
952
scans::ndjson_file_info(
953
sources,
954
first_scan_source,
955
unified_scan_args.row_index.as_ref(),
956
&options,
957
cloud_options,
958
)
959
.await?
960
};
961
962
PolarsResult::Ok((file_info, FileScanIR::NDJson { options }))
963
}
964
.map_err(|e| e.context(failed_here!(ndjson scan)))?,
965
#[cfg(feature = "python")]
966
FileScanDsl::PythonDataset { dataset_object } => (|| {
967
if crate::dsl::DATASET_PROVIDER_VTABLE.get().is_none() {
968
polars_bail!(ComputeError: "DATASET_PROVIDER_VTABLE (python) not initialized")
969
}
970
971
let mut schema = dataset_object.schema()?;
972
let reader_schema = schema.clone();
973
974
if let Some(row_index) = &unified_scan_args.row_index {
975
insert_row_index_to_schema(Arc::make_mut(&mut schema), row_index.name.clone())?;
976
}
977
978
PolarsResult::Ok((
979
FileInfo {
980
schema,
981
reader_schema: Some(either::Either::Right(reader_schema)),
982
row_estimation: (None, usize::MAX),
983
},
984
FileScanIR::PythonDataset {
985
dataset_object,
986
cached_ir: Default::default(),
987
},
988
))
989
})()
990
.map_err(|e| e.context(failed_here!(python dataset scan)))?,
991
#[cfg(feature = "scan_lines")]
992
FileScanDsl::Lines { name } => {
993
// We were passed a schema, we don't have to call `parquet_file_info`,
994
// but this does mean we don't have `row_estimation` and `first_metadata`.
995
let schema = Arc::new(Schema::from_iter([(name.clone(), DataType::String)]));
996
997
(
998
FileInfo {
999
schema: schema.clone(),
1000
reader_schema: Some(either::Either::Right(schema.clone())),
1001
row_estimation: (None, usize::MAX),
1002
},
1003
FileScanIR::Lines { name },
1004
)
1005
},
1006
FileScanDsl::Anonymous {
1007
file_info,
1008
options,
1009
function,
1010
} => (file_info, FileScanIR::Anonymous { options, function }),
1011
})
1012
}
1013
1014
pub(super) async fn get_or_insert(
1015
&self,
1016
scan_type: &FileScanDsl,
1017
sources: &ScanSources,
1018
sources_before_expansion: &ScanSources,
1019
unified_scan_args: &mut UnifiedScanArgs,
1020
verbose: bool,
1021
) -> PolarsResult<(FileInfo, FileScanIR)> {
1022
// Only cache non-empty paths. Others are directly parsed.
1023
let paths = match sources {
1024
ScanSources::Paths(paths) if !paths.is_empty() => paths.clone(),
1025
1026
_ => {
1027
return self
1028
.infer_or_parse(
1029
scan_type.clone(),
1030
sources,
1031
sources_before_expansion,
1032
unified_scan_args,
1033
)
1034
.await;
1035
},
1036
};
1037
1038
let (k, v): (CachedSourceKey, Option<(FileInfo, FileScanIR)>) = match scan_type {
1039
#[cfg(feature = "parquet")]
1040
FileScanDsl::Parquet { options } => {
1041
let key = CachedSourceKey::ParquetIpc {
1042
first_path: paths[0].clone(),
1043
schema_overwrite: options.schema.clone(),
1044
};
1045
1046
let guard = self.inner.read().unwrap();
1047
let v = guard.get(&key);
1048
(key, v.cloned())
1049
},
1050
#[cfg(feature = "ipc")]
1051
FileScanDsl::Ipc { options: _ } => {
1052
let key = CachedSourceKey::ParquetIpc {
1053
first_path: paths[0].clone(),
1054
schema_overwrite: None,
1055
};
1056
1057
let guard = self.inner.read().unwrap();
1058
let v = guard.get(&key);
1059
(key, v.cloned())
1060
},
1061
#[cfg(feature = "csv")]
1062
FileScanDsl::Csv { options } => {
1063
let key = CachedSourceKey::CsvJson {
1064
paths: paths.clone(),
1065
schema: options.schema.clone(),
1066
schema_overwrite: options.schema_overwrite.clone(),
1067
};
1068
let guard = self.inner.read().unwrap();
1069
let v = guard.get(&key);
1070
(key, v.cloned())
1071
},
1072
#[cfg(feature = "json")]
1073
FileScanDsl::NDJson { options } => {
1074
let key = CachedSourceKey::CsvJson {
1075
paths: paths.clone(),
1076
schema: options.schema.clone(),
1077
schema_overwrite: options.schema_overwrite.clone(),
1078
};
1079
let guard = self.inner.read().unwrap();
1080
let v = guard.get(&key);
1081
(key, v.cloned())
1082
},
1083
_ => {
1084
return self
1085
.infer_or_parse(
1086
scan_type.clone(),
1087
sources,
1088
sources_before_expansion,
1089
unified_scan_args,
1090
)
1091
.await;
1092
},
1093
};
1094
1095
if let Some(out) = v {
1096
if verbose {
1097
eprintln!("FILE_INFO CACHE HIT")
1098
}
1099
Ok(out)
1100
} else {
1101
let v = self
1102
.infer_or_parse(
1103
scan_type.clone(),
1104
sources,
1105
sources_before_expansion,
1106
unified_scan_args,
1107
)
1108
.await?;
1109
self.inner.write().unwrap().insert(k, v.clone());
1110
Ok(v)
1111
}
1112
}
1113
}
1114
1115