Path: blob/main/crates/polars-plan/src/dsl/scan_sources.rs
6939 views
use std::fmt::{Debug, Formatter};1use std::fs::File;2use std::sync::Arc;34use polars_core::error::{PolarsResult, feature_gated};5use polars_error::polars_err;6use polars_io::cloud::CloudOptions;7#[cfg(feature = "cloud")]8use polars_io::file_cache::FileCacheEntry;9#[cfg(feature = "cloud")]10use polars_io::utils::byte_source::{DynByteSource, DynByteSourceBuilder};11use polars_io::{expand_paths, expand_paths_hive, expanded_from_single_directory};12use polars_utils::mmap::MemSlice;13use polars_utils::pl_str::PlSmallStr;14use polars_utils::plpath::{PlPath, PlPathRef};1516use super::UnifiedScanArgs;1718/// Set of sources to scan from19///20/// This can either be a list of paths to files, opened files or in-memory buffers. Mixing of21/// buffers is not currently possible.22#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]23#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]24#[derive(Clone)]25pub enum ScanSources {26Paths(Arc<[PlPath]>),2728#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]29Files(Arc<[File]>),30#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]31Buffers(Arc<[MemSlice]>),32}3334impl Debug for ScanSources {35fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {36match self {37Self::Paths(p) => write!(f, "paths: {:?}", p.as_ref()),38Self::Files(p) => write!(f, "files: {} files", p.len()),39Self::Buffers(b) => write!(f, "buffers: {} in-memory-buffers", b.len()),40}41}42}4344/// A reference to a single item in [`ScanSources`]45#[derive(Debug, Clone, Copy)]46pub enum ScanSourceRef<'a> {47Path(PlPathRef<'a>),48File(&'a File),49Buffer(&'a MemSlice),50}5152/// A single source to scan from53#[derive(Debug, Clone)]54pub enum ScanSource {55Path(PlPath),56File(Arc<File>),57Buffer(MemSlice),58}5960impl ScanSource {61pub fn from_sources(sources: ScanSources) -> Result<Self, ScanSources> {62if sources.len() == 1 {63match sources {64ScanSources::Paths(ps) => Ok(Self::Path(ps.as_ref()[0].clone())),65ScanSources::Files(fs) => {66assert_eq!(fs.len(), 1);67let ptr: *const File = Arc::into_raw(fs) as *const File;68// SAFETY: A [T] with length 1 can be interpreted as T69let f: Arc<File> = unsafe { Arc::from_raw(ptr) };7071Ok(Self::File(f))72},73ScanSources::Buffers(bs) => Ok(Self::Buffer(bs.as_ref()[0].clone())),74}75} else {76Err(sources)77}78}7980pub fn into_sources(self) -> ScanSources {81match self {82ScanSource::Path(p) => ScanSources::Paths([p].into()),83ScanSource::File(f) => {84let ptr: *const [File] = std::ptr::slice_from_raw_parts(Arc::into_raw(f), 1);85// SAFETY: A T can be interpreted as [T] with length 1.86let fs: Arc<[File]> = unsafe { Arc::from_raw(ptr) };87ScanSources::Files(fs)88},89ScanSource::Buffer(m) => ScanSources::Buffers([m].into()),90}91}9293pub fn as_scan_source_ref(&self) -> ScanSourceRef<'_> {94match self {95ScanSource::Path(path) => ScanSourceRef::Path(path.as_ref()),96ScanSource::File(file) => ScanSourceRef::File(file.as_ref()),97ScanSource::Buffer(mem_slice) => ScanSourceRef::Buffer(mem_slice),98}99}100101pub fn run_async(&self) -> bool {102self.as_scan_source_ref().run_async()103}104105pub fn is_cloud_url(&self) -> bool {106if let ScanSource::Path(path) = self {107path.is_cloud_url()108} else {109false110}111}112}113114/// An iterator for [`ScanSources`]115pub struct ScanSourceIter<'a> {116sources: &'a ScanSources,117offset: usize,118}119120impl Default for ScanSources {121fn default() -> Self {122// We need to use `Paths` here to avoid erroring when doing hive-partitioned scans of empty123// file lists.124Self::Paths(Arc::default())125}126}127128impl std::hash::Hash for ScanSources {129fn hash<H: std::hash::Hasher>(&self, state: &mut H) {130std::mem::discriminant(self).hash(state);131132// @NOTE: This is a bit crazy133//134// We don't really want to hash the file descriptors or the whole buffers so for now we135// just settle with the fact that the memory behind Arc's does not really move. Therefore,136// we can just hash the pointer.137match self {138Self::Paths(paths) => paths.hash(state),139Self::Files(files) => files.as_ptr().hash(state),140Self::Buffers(buffers) => buffers.as_ptr().hash(state),141}142}143}144145impl PartialEq for ScanSources {146fn eq(&self, other: &Self) -> bool {147match (self, other) {148(ScanSources::Paths(l), ScanSources::Paths(r)) => l == r,149(ScanSources::Files(l), ScanSources::Files(r)) => std::ptr::eq(l.as_ptr(), r.as_ptr()),150(ScanSources::Buffers(l), ScanSources::Buffers(r)) => {151std::ptr::eq(l.as_ptr(), r.as_ptr())152},153_ => false,154}155}156}157158impl Eq for ScanSources {}159160impl ScanSources {161pub fn expand_paths(162&self,163scan_args: &UnifiedScanArgs,164#[allow(unused_variables)] cloud_options: Option<&CloudOptions>,165) -> PolarsResult<Self> {166match self {167Self::Paths(paths) => Ok(Self::Paths(expand_paths(168paths,169scan_args.glob,170cloud_options,171)?)),172v => Ok(v.clone()),173}174}175176/// This will update `scan_args.hive_options.enabled` to `true` if the existing value is `None`177/// and the paths are expanded from a single directory. Otherwise the existing value is maintained.178#[cfg(any(feature = "ipc", feature = "parquet"))]179pub fn expand_paths_with_hive_update(180&self,181scan_args: &mut UnifiedScanArgs,182#[allow(unused_variables)] cloud_options: Option<&CloudOptions>,183) -> PolarsResult<Self> {184match self {185Self::Paths(paths) => {186let (expanded_paths, hive_start_idx) = expand_paths_hive(187paths,188scan_args.glob,189cloud_options,190scan_args.hive_options.enabled.unwrap_or(false),191)?;192193if scan_args.hive_options.enabled.is_none()194&& expanded_from_single_directory(paths, expanded_paths.as_ref())195{196scan_args.hive_options.enabled = Some(true);197}198scan_args.hive_options.hive_start_idx = hive_start_idx;199200Ok(Self::Paths(expanded_paths))201},202v => Ok(v.clone()),203}204}205206pub fn iter(&self) -> ScanSourceIter<'_> {207ScanSourceIter {208sources: self,209offset: 0,210}211}212213/// Are the sources all paths?214pub fn is_paths(&self) -> bool {215matches!(self, Self::Paths(_))216}217218/// Try cast the scan sources to [`ScanSources::Paths`]219pub fn as_paths(&self) -> Option<&[PlPath]> {220match self {221Self::Paths(paths) => Some(paths.as_ref()),222Self::Files(_) | Self::Buffers(_) => None,223}224}225226/// Try cast the scan sources to [`ScanSources::Paths`] with a clone227pub fn into_paths(&self) -> Option<Arc<[PlPath]>> {228match self {229Self::Paths(paths) => Some(paths.clone()),230Self::Files(_) | Self::Buffers(_) => None,231}232}233234/// Try get the first path in the scan sources235pub fn first_path(&self) -> Option<PlPathRef<'_>> {236match self {237Self::Paths(paths) => paths.first().map(|p| p.as_ref()),238Self::Files(_) | Self::Buffers(_) => None,239}240}241242/// Is the first path a cloud URL?243pub fn is_cloud_url(&self) -> bool {244self.first_path().is_some_and(|path| path.is_cloud_url())245}246247pub fn len(&self) -> usize {248match self {249Self::Paths(s) => s.len(),250Self::Files(s) => s.len(),251Self::Buffers(s) => s.len(),252}253}254255pub fn is_empty(&self) -> bool {256self.len() == 0257}258259pub fn first(&self) -> Option<ScanSourceRef<'_>> {260self.get(0)261}262263pub fn first_or_empty_expand_err(264&self,265failed_message: &'static str,266sources_before_expansion: &ScanSources,267glob: bool,268hint: &'static str,269) -> PolarsResult<ScanSourceRef<'_>> {270let hint_padding = if hint.is_empty() { "" } else { " Hint: " };271272self.first().ok_or_else(|| match self {273Self::Paths(_) if !sources_before_expansion.is_empty() => polars_err!(274ComputeError:275"{}: expanded paths were empty \276(path expansion input: '{:?}', glob: {}).{}{}",277failed_message, sources_before_expansion, glob, hint_padding, hint278),279_ => polars_err!(280ComputeError:281"{}: empty input: {:?}.{}{}",282failed_message, self, hint_padding, hint283),284})285}286287/// Turn the [`ScanSources`] into some kind of identifier288pub fn id(&self) -> PlSmallStr {289if self.is_empty() {290return PlSmallStr::from_static("EMPTY");291}292293match self {294Self::Paths(paths) => PlSmallStr::from_str(paths.first().unwrap().to_str()),295Self::Files(_) => PlSmallStr::from_static("OPEN_FILES"),296Self::Buffers(_) => PlSmallStr::from_static("IN_MEMORY"),297}298}299300/// Get the scan source at specific address301pub fn get(&self, idx: usize) -> Option<ScanSourceRef<'_>> {302match self {303Self::Paths(paths) => paths.get(idx).map(|p| ScanSourceRef::Path(p.as_ref())),304Self::Files(files) => files.get(idx).map(ScanSourceRef::File),305Self::Buffers(buffers) => buffers.get(idx).map(ScanSourceRef::Buffer),306}307}308309/// Get the scan source at specific address310///311/// # Panics312///313/// If the `idx` is out of range.314#[track_caller]315pub fn at(&self, idx: usize) -> ScanSourceRef<'_> {316self.get(idx).unwrap()317}318}319320impl ScanSourceRef<'_> {321/// Get the name for `include_paths`322pub fn to_include_path_name(&self) -> &str {323match self {324Self::Path(path) => path.to_str(),325Self::File(_) => "open-file",326Self::Buffer(_) => "in-mem",327}328}329330// @TODO: I would like to remove this function eventually.331pub fn into_owned(&self) -> PolarsResult<ScanSource> {332Ok(match self {333ScanSourceRef::Path(path) => ScanSource::Path((*path).into_owned()),334ScanSourceRef::File(file) => {335if let Ok(file) = file.try_clone() {336ScanSource::File(Arc::new(file))337} else {338ScanSource::Buffer(self.to_memslice()?)339}340},341ScanSourceRef::Buffer(buffer) => ScanSource::Buffer((*buffer).clone()),342})343}344345pub fn as_path(&self) -> Option<PlPathRef<'_>> {346match self {347Self::Path(path) => Some(*path),348Self::File(_) | Self::Buffer(_) => None,349}350}351352pub fn is_cloud_url(&self) -> bool {353self.as_path().is_some_and(|x| x.is_cloud_url())354}355356/// Turn the scan source into a memory slice357pub fn to_memslice(&self) -> PolarsResult<MemSlice> {358self.to_memslice_possibly_async(false, None, 0)359}360361#[allow(clippy::wrong_self_convention)]362#[cfg(feature = "cloud")]363fn to_memslice_async<F: Fn(Arc<FileCacheEntry>) -> PolarsResult<std::fs::File>>(364&self,365open_cache_entry: F,366run_async: bool,367) -> PolarsResult<MemSlice> {368match self {369ScanSourceRef::Path(path) => {370let file = if run_async {371open_cache_entry(polars_io::file_cache::FILE_CACHE.get_entry(*path).unwrap())?372} else {373polars_utils::open_file(path.as_local_path().unwrap())?374};375376MemSlice::from_file(&file)377},378ScanSourceRef::File(file) => MemSlice::from_file(file),379ScanSourceRef::Buffer(buff) => Ok((*buff).clone()),380}381}382383#[cfg(feature = "cloud")]384pub fn to_memslice_async_assume_latest(&self, run_async: bool) -> PolarsResult<MemSlice> {385self.to_memslice_async(|entry| entry.try_open_assume_latest(), run_async)386}387388#[cfg(feature = "cloud")]389pub fn to_memslice_async_check_latest(&self, run_async: bool) -> PolarsResult<MemSlice> {390self.to_memslice_async(|entry| entry.try_open_check_latest(), run_async)391}392393#[cfg(not(feature = "cloud"))]394#[allow(clippy::wrong_self_convention)]395fn to_memslice_async(&self, run_async: bool) -> PolarsResult<MemSlice> {396match self {397ScanSourceRef::Path(path) => {398let file = polars_utils::open_file(path.as_local_path().unwrap())?;399MemSlice::from_file(&file)400},401ScanSourceRef::File(file) => MemSlice::from_file(file),402ScanSourceRef::Buffer(buff) => Ok((*buff).clone()),403}404}405406#[cfg(not(feature = "cloud"))]407pub fn to_memslice_async_assume_latest(&self, run_async: bool) -> PolarsResult<MemSlice> {408self.to_memslice_async(run_async)409}410411#[cfg(not(feature = "cloud"))]412pub fn to_memslice_async_check_latest(&self, run_async: bool) -> PolarsResult<MemSlice> {413self.to_memslice_async(run_async)414}415416pub fn to_memslice_possibly_async(417&self,418run_async: bool,419#[cfg(feature = "cloud")] cache_entries: Option<420&Vec<Arc<polars_io::file_cache::FileCacheEntry>>,421>,422#[cfg(not(feature = "cloud"))] cache_entries: Option<&()>,423index: usize,424) -> PolarsResult<MemSlice> {425match self {426Self::Path(path) => {427let file = if run_async {428feature_gated!("cloud", {429cache_entries.unwrap()[index].try_open_check_latest()?430})431} else {432polars_utils::open_file(path.as_local_path().unwrap())?433};434435MemSlice::from_file(&file)436},437Self::File(file) => MemSlice::from_file(file),438Self::Buffer(buff) => Ok((*buff).clone()),439}440}441442#[cfg(feature = "cloud")]443pub async fn to_dyn_byte_source(444&self,445builder: &DynByteSourceBuilder,446cloud_options: Option<&CloudOptions>,447) -> PolarsResult<DynByteSource> {448match self {449Self::Path(path) => {450builder451.try_build_from_path(path.to_str(), cloud_options)452.await453},454Self::File(file) => Ok(DynByteSource::from(MemSlice::from_file(file)?)),455Self::Buffer(buff) => Ok(DynByteSource::from((*buff).clone())),456}457}458459pub fn run_async(&self) -> bool {460matches!(self, Self::Path(p) if p.is_cloud_url() || polars_core::config::force_async())461}462}463464impl<'a> Iterator for ScanSourceIter<'a> {465type Item = ScanSourceRef<'a>;466467fn next(&mut self) -> Option<Self::Item> {468let item = match self.sources {469ScanSources::Paths(paths) => ScanSourceRef::Path(paths.get(self.offset)?.as_ref()),470ScanSources::Files(files) => ScanSourceRef::File(files.get(self.offset)?),471ScanSources::Buffers(buffers) => ScanSourceRef::Buffer(buffers.get(self.offset)?),472};473474self.offset += 1;475Some(item)476}477478fn size_hint(&self) -> (usize, Option<usize>) {479let len = self.sources.len() - self.offset;480(len, Some(len))481}482}483484impl ExactSizeIterator for ScanSourceIter<'_> {}485486487