Path: blob/main/crates/polars-plan/src/dsl/scan_sources.rs
8446 views
use std::fmt::{Debug, Formatter};1use std::fs::File;2use std::sync::Arc;34use polars_buffer::Buffer;5use polars_core::error::{PolarsResult, feature_gated};6use polars_error::polars_err;7use polars_io::cloud::CloudOptions;8#[cfg(feature = "cloud")]9use polars_io::file_cache::FileCacheEntry;10use polars_io::metrics::IOMetrics;11use polars_io::utils::byte_source::{DynByteSource, DynByteSourceBuilder};12use polars_io::{expand_paths, expand_paths_hive, expanded_from_single_directory};13use polars_utils::mmap::MMapSemaphore;14use polars_utils::pl_path::PlRefPath;15use polars_utils::pl_str::PlSmallStr;16#[cfg(feature = "serde")]17use serde::{Deserialize, Deserializer, Serialize, Serializer};1819use super::UnifiedScanArgs;2021#[cfg(feature = "serde")]22fn serialize_paths<S: Serializer>(paths: &Buffer<PlRefPath>, s: S) -> Result<S::Ok, S::Error> {23paths.as_slice().serialize(s)24}2526#[cfg(feature = "serde")]27fn deserialize_paths<'de, D: Deserializer<'de>>(d: D) -> Result<Buffer<PlRefPath>, D::Error> {28let v: Vec<PlRefPath> = Deserialize::deserialize(d)?;29Ok(Buffer::from(v))30}3132/// Set of sources to scan from33///34/// This can either be a list of paths to files, opened files or in-memory buffers. Mixing of35/// buffers is not currently possible.36#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]37#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]38#[derive(Clone)]39pub enum ScanSources {40#[cfg_attr(41feature = "serde",42serde(43serialize_with = "serialize_paths",44deserialize_with = "deserialize_paths"45)46)]47#[cfg_attr(feature = "dsl-schema", schemars(with = "Vec<PlRefPath>"))]48Paths(Buffer<PlRefPath>),49#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]50Files(Arc<[File]>),51#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]52Buffers(Arc<[Buffer<u8>]>),53}5455impl Debug for ScanSources {56fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {57match self {58Self::Paths(p) => write!(f, "paths: {:?}", p.as_ref()),59Self::Files(p) => write!(f, "files: {} files", p.len()),60Self::Buffers(b) => write!(f, "buffers: {} in-memory-buffers", b.len()),61}62}63}6465/// A reference to a single item in [`ScanSources`]66#[derive(Debug, Clone, Copy)]67pub enum ScanSourceRef<'a> {68Path(&'a PlRefPath),69File(&'a File),70Buffer(&'a Buffer<u8>),71}7273/// A single source to scan from74#[derive(Debug, Clone)]75pub enum ScanSource {76Path(PlRefPath),77File(Arc<File>),78Buffer(Buffer<u8>),79}8081impl ScanSource {82pub fn from_sources(sources: ScanSources) -> Result<Self, ScanSources> {83if sources.len() == 1 {84match sources {85ScanSources::Paths(ps) => Ok(Self::Path(ps.as_ref()[0].clone())),86ScanSources::Files(fs) => {87assert_eq!(fs.len(), 1);88let ptr: *const File = Arc::into_raw(fs) as *const File;89// SAFETY: A [T] with length 1 can be interpreted as T90let f: Arc<File> = unsafe { Arc::from_raw(ptr) };9192Ok(Self::File(f))93},94ScanSources::Buffers(bs) => Ok(Self::Buffer(bs.as_ref()[0].clone())),95}96} else {97Err(sources)98}99}100101pub fn into_sources(self) -> ScanSources {102match self {103ScanSource::Path(p) => ScanSources::Paths(Buffer::from_iter([p])),104ScanSource::File(f) => {105let ptr: *const [File] = std::ptr::slice_from_raw_parts(Arc::into_raw(f), 1);106// SAFETY: A T can be interpreted as [T] with length 1.107let fs: Arc<[File]> = unsafe { Arc::from_raw(ptr) };108ScanSources::Files(fs)109},110ScanSource::Buffer(m) => ScanSources::Buffers([m].into()),111}112}113114pub fn as_scan_source_ref(&self) -> ScanSourceRef<'_> {115match self {116ScanSource::Path(path) => ScanSourceRef::Path(path),117ScanSource::File(file) => ScanSourceRef::File(file.as_ref()),118ScanSource::Buffer(mem_slice) => ScanSourceRef::Buffer(mem_slice),119}120}121122pub fn run_async(&self) -> bool {123self.as_scan_source_ref().run_async()124}125126pub fn is_cloud_url(&self) -> bool {127if let ScanSource::Path(path) = self {128path.has_scheme()129} else {130false131}132}133}134135/// An iterator for [`ScanSources`]136pub struct ScanSourceIter<'a> {137sources: &'a ScanSources,138offset: usize,139}140141impl Default for ScanSources {142fn default() -> Self {143// We need to use `Paths` here to avoid erroring when doing hive-partitioned scans of empty144// file lists.145Self::Paths(Buffer::new())146}147}148149impl std::hash::Hash for ScanSources {150fn hash<H: std::hash::Hasher>(&self, state: &mut H) {151std::mem::discriminant(self).hash(state);152153// @NOTE: This is a bit crazy154//155// We don't really want to hash the file descriptors or the whole buffers so for now we156// just settle with the fact that the memory behind Arc's does not really move. Therefore,157// we can just hash the pointer.158match self {159Self::Paths(paths) => paths.hash(state),160Self::Files(files) => files.as_ptr().hash(state),161Self::Buffers(buffers) => buffers.as_ptr().hash(state),162}163}164}165166impl PartialEq for ScanSources {167fn eq(&self, other: &Self) -> bool {168match (self, other) {169(ScanSources::Paths(l), ScanSources::Paths(r)) => l == r,170(ScanSources::Files(l), ScanSources::Files(r)) => std::ptr::eq(l.as_ptr(), r.as_ptr()),171(ScanSources::Buffers(l), ScanSources::Buffers(r)) => {172std::ptr::eq(l.as_ptr(), r.as_ptr())173},174_ => false,175}176}177}178179impl Eq for ScanSources {}180181impl ScanSources {182pub async fn expand_paths(&self, scan_args: &mut UnifiedScanArgs) -> PolarsResult<Self> {183match self {184Self::Paths(paths) => Ok(Self::Paths(185expand_paths(186paths,187scan_args.glob,188scan_args.hidden_file_prefix.as_deref().unwrap_or_default(),189&mut scan_args.cloud_options,190)191.await?,192)),193v => Ok(v.clone()),194}195}196197/// This will update `scan_args.hive_options.enabled` to `true` if the existing value is `None`198/// and the paths are expanded from a single directory. Otherwise the existing value is maintained.199#[cfg(any(feature = "ipc", feature = "parquet"))]200pub async fn expand_paths_with_hive_update(201&self,202scan_args: &mut UnifiedScanArgs,203) -> PolarsResult<Self> {204match self {205Self::Paths(paths) => {206let (expanded_paths, hive_start_idx) = expand_paths_hive(207paths,208scan_args.glob,209scan_args.hidden_file_prefix.as_deref().unwrap_or_default(),210&mut scan_args.cloud_options,211scan_args.hive_options.enabled.unwrap_or(false),212)213.await?;214215if scan_args.hive_options.enabled.is_none()216&& expanded_from_single_directory(paths, expanded_paths.as_ref())217{218scan_args.hive_options.enabled = Some(true);219}220scan_args.hive_options.hive_start_idx = hive_start_idx;221222Ok(Self::Paths(expanded_paths))223},224v => Ok(v.clone()),225}226}227228pub fn iter(&self) -> ScanSourceIter<'_> {229ScanSourceIter {230sources: self,231offset: 0,232}233}234235/// Are the sources all paths?236pub fn is_paths(&self) -> bool {237matches!(self, Self::Paths(_))238}239240/// Try cast the scan sources to [`ScanSources::Paths`]241pub fn as_paths(&self) -> Option<&[PlRefPath]> {242match self {243Self::Paths(paths) => Some(paths.as_ref()),244Self::Files(_) | Self::Buffers(_) => None,245}246}247248/// Try cast the scan sources to [`ScanSources::Paths`] with a clone249pub fn into_paths(&self) -> Option<Buffer<PlRefPath>> {250match self {251Self::Paths(paths) => Some(paths.clone()),252Self::Files(_) | Self::Buffers(_) => None,253}254}255256/// Try get the first path in the scan sources257pub fn first_path(&self) -> Option<&PlRefPath> {258match self {259Self::Paths(paths) => paths.first(),260Self::Files(_) | Self::Buffers(_) => None,261}262}263264/// Is the first path a cloud URL?265pub fn is_cloud_url(&self) -> bool {266self.first_path().is_some_and(|path| path.has_scheme())267}268269pub fn len(&self) -> usize {270match self {271Self::Paths(s) => s.len(),272Self::Files(s) => s.len(),273Self::Buffers(s) => s.len(),274}275}276277pub fn is_empty(&self) -> bool {278self.len() == 0279}280281pub fn first(&self) -> Option<ScanSourceRef<'_>> {282self.get(0)283}284285pub fn first_or_empty_expand_err(286&self,287failed_message: &'static str,288sources_before_expansion: &ScanSources,289glob: bool,290hint: &'static str,291) -> PolarsResult<ScanSourceRef<'_>> {292let hint_padding = if hint.is_empty() { "" } else { " Hint: " };293294self.first().ok_or_else(|| match self {295Self::Paths(_) if !sources_before_expansion.is_empty() => polars_err!(296ComputeError:297"{}: expanded paths were empty \298(path expansion input: '{:?}', glob: {}).{}{}",299failed_message, sources_before_expansion, glob, hint_padding, hint300),301_ => polars_err!(302ComputeError:303"{}: empty input: {:?}.{}{}",304failed_message, self, hint_padding, hint305),306})307}308309/// Turn the [`ScanSources`] into some kind of identifier310pub fn id(&self) -> PlSmallStr {311if self.is_empty() {312return PlSmallStr::from_static("EMPTY");313}314315match self {316Self::Paths(paths) => PlSmallStr::from_str(paths.first().unwrap().as_str()),317Self::Files(_) => PlSmallStr::from_static("OPEN_FILES"),318Self::Buffers(_) => PlSmallStr::from_static("IN_MEMORY"),319}320}321322/// Get the scan source at specific address323pub fn get(&self, idx: usize) -> Option<ScanSourceRef<'_>> {324match self {325Self::Paths(paths) => paths.get(idx).map(ScanSourceRef::Path),326Self::Files(files) => files.get(idx).map(ScanSourceRef::File),327Self::Buffers(buffers) => buffers.get(idx).map(ScanSourceRef::Buffer),328}329}330331/// Get the scan source at specific address332///333/// # Panics334///335/// If the `idx` is out of range.336#[track_caller]337pub fn at(&self, idx: usize) -> ScanSourceRef<'_> {338self.get(idx).unwrap()339}340341/// Returns `None` if `self` is a `::File` variant.342pub fn gather(&self, indices: impl Iterator<Item = usize>) -> Option<Self> {343Some(match self {344Self::Paths(paths) => Self::Paths(indices.map(|i| paths[i].clone()).collect()),345Self::Buffers(buffers) => Self::Buffers(indices.map(|i| buffers[i].clone()).collect()),346Self::Files(_) => return None,347})348}349}350351impl ScanSourceRef<'_> {352/// Get the name for `include_paths`353pub fn to_include_path_name(&self) -> &str {354match self {355Self::Path(path) => path.as_str(),356Self::File(_) => "open-file",357Self::Buffer(_) => "in-mem",358}359}360361// @TODO: I would like to remove this function eventually.362pub fn into_owned(&self) -> PolarsResult<ScanSource> {363Ok(match self {364ScanSourceRef::Path(path) => ScanSource::Path((*path).clone()),365ScanSourceRef::File(file) => {366if let Ok(file) = file.try_clone() {367ScanSource::File(Arc::new(file))368} else {369ScanSource::Buffer(self.to_memslice()?)370}371},372ScanSourceRef::Buffer(buffer) => ScanSource::Buffer((*buffer).clone()),373})374}375376pub fn as_path(&self) -> Option<&PlRefPath> {377match self {378Self::Path(path) => Some(path),379Self::File(_) | Self::Buffer(_) => None,380}381}382383pub fn is_cloud_url(&self) -> bool {384self.as_path().is_some_and(|x| x.has_scheme())385}386387/// Turn the scan source into a memory slice388pub fn to_memslice(&self) -> PolarsResult<Buffer<u8>> {389self.to_buffer_possibly_async(false, None, 0)390}391392#[allow(clippy::wrong_self_convention)]393#[cfg(feature = "cloud")]394fn to_buffer_async<F: Fn(Arc<FileCacheEntry>) -> PolarsResult<std::fs::File>>(395&self,396open_cache_entry: F,397run_async: bool,398) -> PolarsResult<Buffer<u8>> {399match self {400ScanSourceRef::Path(path) => {401let file = if run_async {402open_cache_entry(403polars_io::file_cache::FILE_CACHE404.get_entry((*path).clone())405.unwrap(),406)?407} else {408polars_utils::open_file(path.as_std_path())?409};410411Ok(Buffer::from_owner(MMapSemaphore::new_from_file(&file)?))412},413ScanSourceRef::File(file) => {414Ok(Buffer::from_owner(MMapSemaphore::new_from_file(file)?))415},416ScanSourceRef::Buffer(buff) => Ok((*buff).clone()),417}418}419420#[cfg(feature = "cloud")]421pub fn to_buffer_async_assume_latest(&self, run_async: bool) -> PolarsResult<Buffer<u8>> {422self.to_buffer_async(|entry| entry.try_open_assume_latest(), run_async)423}424425#[cfg(feature = "cloud")]426pub fn to_buffer_async_check_latest(&self, run_async: bool) -> PolarsResult<Buffer<u8>> {427self.to_buffer_async(|entry| entry.try_open_check_latest(), run_async)428}429430#[cfg(not(feature = "cloud"))]431#[allow(clippy::wrong_self_convention)]432fn to_buffer_async(&self, run_async: bool) -> PolarsResult<Buffer<u8>> {433match self {434ScanSourceRef::Path(path) => {435let file = polars_utils::open_file(path.as_std_path())?;436Ok(Buffer::from_owner(MMapSemaphore::new_from_file(&file)?))437},438ScanSourceRef::File(file) => {439Ok(Buffer::from_owner(MMapSemaphore::new_from_file(file)?))440},441ScanSourceRef::Buffer(buff) => Ok((*buff).clone()),442}443}444445#[cfg(not(feature = "cloud"))]446pub fn to_buffer_async_assume_latest(&self, run_async: bool) -> PolarsResult<Buffer<u8>> {447self.to_buffer_async(run_async)448}449450#[cfg(not(feature = "cloud"))]451pub fn to_buffer_async_check_latest(&self, run_async: bool) -> PolarsResult<Buffer<u8>> {452self.to_buffer_async(run_async)453}454455pub fn to_buffer_possibly_async(456&self,457run_async: bool,458#[cfg(feature = "cloud")] cache_entries: Option<459&Vec<Arc<polars_io::file_cache::FileCacheEntry>>,460>,461#[cfg(not(feature = "cloud"))] cache_entries: Option<&()>,462index: usize,463) -> PolarsResult<Buffer<u8>> {464match self {465Self::Path(path) => {466let file = if run_async {467feature_gated!("cloud", {468cache_entries.unwrap()[index].try_open_check_latest()?469})470} else {471polars_utils::open_file(path.as_std_path())?472};473474Ok(Buffer::from_owner(MMapSemaphore::new_from_file(&file)?))475},476Self::File(file) => Ok(Buffer::from_owner(MMapSemaphore::new_from_file(file)?)),477Self::Buffer(buff) => Ok((*buff).clone()),478}479}480481pub async fn to_dyn_byte_source(482&self,483builder: &DynByteSourceBuilder,484cloud_options: Option<&CloudOptions>,485io_metrics: Option<Arc<IOMetrics>>,486) -> PolarsResult<DynByteSource> {487match self {488Self::Path(path) => {489builder490.try_build_from_path((*path).clone(), cloud_options, io_metrics)491.await492},493Self::File(file) => Ok(DynByteSource::from(Buffer::from_owner(494MMapSemaphore::new_from_file(file)?,495))),496Self::Buffer(buff) => Ok(DynByteSource::from((*buff).clone())),497}498}499500pub fn run_async(&self) -> bool {501matches!(self, Self::Path(p) if p.has_scheme() || polars_config::config().force_async())502}503}504505impl<'a> Iterator for ScanSourceIter<'a> {506type Item = ScanSourceRef<'a>;507508fn next(&mut self) -> Option<Self::Item> {509let item = match self.sources {510ScanSources::Paths(paths) => ScanSourceRef::Path(paths.get(self.offset)?),511ScanSources::Files(files) => ScanSourceRef::File(files.get(self.offset)?),512ScanSources::Buffers(buffers) => ScanSourceRef::Buffer(buffers.get(self.offset)?),513};514515self.offset += 1;516Some(item)517}518519fn size_hint(&self) -> (usize, Option<usize>) {520let len = self.sources.len() - self.offset;521(len, Some(len))522}523}524525impl ExactSizeIterator for ScanSourceIter<'_> {}526527528