Path: blob/main/crates/polars-plan/src/dsl/options/mod.rs
6940 views
use std::hash::Hash;1#[cfg(feature = "json")]2use std::num::NonZeroUsize;3use std::str::FromStr;4use std::sync::Arc;56mod sink;78use polars_core::error::PolarsResult;9use polars_core::prelude::*;10#[cfg(feature = "csv")]11use polars_io::csv::write::CsvWriterOptions;12#[cfg(feature = "ipc")]13use polars_io::ipc::IpcWriterOptions;14#[cfg(feature = "json")]15use polars_io::json::JsonWriterOptions;16#[cfg(feature = "parquet")]17use polars_io::parquet::write::ParquetWriteOptions;18#[cfg(feature = "iejoin")]19use polars_ops::frame::IEJoinOptions;20use polars_ops::frame::{CrossJoinFilter, CrossJoinOptions, JoinTypeOptions};21use polars_ops::prelude::{JoinArgs, JoinType};22#[cfg(feature = "dynamic_group_by")]23use polars_time::DynamicGroupOptions;24#[cfg(feature = "dynamic_group_by")]25use polars_time::RollingGroupOptions;26use polars_utils::IdxSize;27use polars_utils::pl_str::PlSmallStr;28#[cfg(feature = "serde")]29use serde::{Deserialize, Serialize};30pub use sink::*;31use strum_macros::IntoStaticStr;3233use super::ExprIR;34use crate::dsl::Selector;3536#[derive(Copy, Clone, PartialEq, Debug, Eq, Hash)]37#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]38#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]39pub struct RollingCovOptions {40pub window_size: IdxSize,41pub min_periods: IdxSize,42pub ddof: u8,43}4445#[derive(Clone, PartialEq, Debug, Eq, Hash)]46#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]47#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]48pub struct StrptimeOptions {49/// Formatting string50pub format: Option<PlSmallStr>,51/// If set then polars will return an error if any date parsing fails52pub strict: bool,53/// If polars may parse matches that not contain the whole string54/// e.g. "foo-2021-01-01-bar" could match "2021-01-01"55pub exact: bool,56/// use a cache of unique, converted dates to apply the datetime conversion.57pub cache: bool,58}5960impl Default for StrptimeOptions {61fn default() -> Self {62StrptimeOptions {63format: None,64strict: true,65exact: true,66cache: true,67}68}69}7071#[derive(Clone, PartialEq, Eq, IntoStaticStr, Debug)]72#[cfg_attr(feature = "ir_serde", derive(Serialize, Deserialize))]73#[strum(serialize_all = "snake_case")]74pub enum JoinTypeOptionsIR {75#[cfg(feature = "iejoin")]76IEJoin(IEJoinOptions),77// Fused cross join and filter (only used in the in-memory engine)78CrossAndFilter {79predicate: ExprIR, // Must be elementwise.80},81}8283impl Hash for JoinTypeOptionsIR {84fn hash<H: std::hash::Hasher>(&self, state: &mut H) {85use JoinTypeOptionsIR::*;86match self {87#[cfg(feature = "iejoin")]88IEJoin(opt) => opt.hash(state),89CrossAndFilter { predicate } => predicate.node().hash(state),90}91}92}9394impl JoinTypeOptionsIR {95pub fn compile<C: FnOnce(&ExprIR) -> PolarsResult<Arc<dyn CrossJoinFilter>>>(96self,97plan: C,98) -> PolarsResult<JoinTypeOptions> {99use JoinTypeOptionsIR::*;100match self {101CrossAndFilter { predicate } => {102let predicate = plan(&predicate)?;103104Ok(JoinTypeOptions::Cross(CrossJoinOptions { predicate }))105},106#[cfg(feature = "iejoin")]107IEJoin(opt) => Ok(JoinTypeOptions::IEJoin(opt)),108}109}110}111112#[derive(Clone, Debug, PartialEq, Hash)]113#[cfg_attr(feature = "ir_serde", derive(Serialize, Deserialize))]114pub struct JoinOptionsIR {115pub allow_parallel: bool,116pub force_parallel: bool,117pub args: JoinArgs,118pub options: Option<JoinTypeOptionsIR>,119/// Proxy of the number of rows in both sides of the joins120/// Holds `(Option<known_size>, estimated_size)`121pub rows_left: (Option<usize>, usize),122pub rows_right: (Option<usize>, usize),123}124125impl From<JoinOptions> for JoinOptionsIR {126fn from(opts: JoinOptions) -> Self {127Self {128allow_parallel: opts.allow_parallel,129force_parallel: opts.force_parallel,130args: opts.args,131options: Default::default(),132rows_left: (None, usize::MAX),133rows_right: (None, usize::MAX),134}135}136}137138#[derive(Clone, Debug, PartialEq, Hash)]139#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]140#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]141pub struct JoinOptions {142pub allow_parallel: bool,143pub force_parallel: bool,144pub args: JoinArgs,145}146147impl Default for JoinOptions {148fn default() -> Self {149JoinOptions {150allow_parallel: true,151force_parallel: false,152// Todo!: make default153args: JoinArgs::new(JoinType::Left),154}155}156}157158impl From<JoinOptionsIR> for JoinOptions {159fn from(opts: JoinOptionsIR) -> Self {160Self {161allow_parallel: opts.allow_parallel,162force_parallel: opts.force_parallel,163args: opts.args,164}165}166}167168#[derive(Clone, Debug, PartialEq, Eq, Hash)]169#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]170#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]171pub enum WindowType {172/// Explode the aggregated list and just do a hstack instead of a join173/// this requires the groups to be sorted to make any sense174Over(WindowMapping),175#[cfg(feature = "dynamic_group_by")]176Rolling(RollingGroupOptions),177}178179impl From<WindowMapping> for WindowType {180fn from(value: WindowMapping) -> Self {181Self::Over(value)182}183}184185impl Default for WindowType {186fn default() -> Self {187Self::Over(WindowMapping::default())188}189}190191#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Hash, IntoStaticStr)]192#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]193#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]194#[strum(serialize_all = "snake_case")]195pub enum WindowMapping {196/// Map the group values to the position197#[default]198GroupsToRows,199/// Explode the aggregated list and just do a hstack instead of a join200/// this requires the groups to be sorted to make any sense201Explode,202/// Join the groups as 'List<group_dtype>' to the row positions.203/// warning: this can be memory intensive204Join,205}206207#[derive(Clone, Debug, PartialEq, Eq, Hash)]208#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]209pub enum NestedType {210#[cfg(feature = "dtype-array")]211Array,212// List,213}214215#[derive(Clone, Debug, PartialEq, Eq, Hash)]216#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]217#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]218pub struct UnpivotArgsDSL {219pub on: Selector,220pub index: Selector,221pub variable_name: Option<PlSmallStr>,222pub value_name: Option<PlSmallStr>,223}224225#[derive(Clone, Debug, Copy, Eq, PartialEq, Hash)]226#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]227pub enum Engine {228Auto,229Streaming,230InMemory,231Gpu,232}233234impl FromStr for Engine {235type Err = String;236237fn from_str(s: &str) -> Result<Self, Self::Err> {238match s {239// "cpu" for backwards compatibility240"auto" => Ok(Engine::Auto),241"cpu" | "in-memory" => Ok(Engine::InMemory),242"streaming" => Ok(Engine::Streaming),243"gpu" => Ok(Engine::Gpu),244"old-streaming" => Err("the 'old-streaming' engine has been removed".to_owned()),245v => Err(format!(246"`engine` must be one of {{'auto', 'in-memory', 'streaming', 'gpu'}}, got {v}",247)),248}249}250}251252impl Engine {253pub fn into_static_str(self) -> &'static str {254match self {255Self::Auto => "auto",256Self::Streaming => "streaming",257Self::InMemory => "in-memory",258Self::Gpu => "gpu",259}260}261}262263#[derive(Clone, Debug, Copy, Eq, PartialEq, Hash)]264#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]265pub struct UnionOptions {266pub slice: Option<(i64, usize)>,267// known row_output, estimated row output268pub rows: (Option<usize>, usize),269pub parallel: bool,270pub from_partitioned_ds: bool,271pub flattened_by_opt: bool,272pub rechunk: bool,273pub maintain_order: bool,274}275276impl Default for UnionOptions {277fn default() -> Self {278Self {279slice: None,280rows: (None, 0),281parallel: true,282from_partitioned_ds: false,283flattened_by_opt: false,284rechunk: false,285maintain_order: true,286}287}288}289290#[derive(Clone, Debug, Copy, Default, Eq, PartialEq, Hash)]291#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]292#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]293pub struct HConcatOptions {294pub parallel: bool,295}296297#[derive(Clone, Debug, PartialEq, Eq, Default, Hash)]298#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]299#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]300pub struct GroupbyOptions {301#[cfg(feature = "dynamic_group_by")]302pub dynamic: Option<DynamicGroupOptions>,303#[cfg(feature = "dynamic_group_by")]304pub rolling: Option<RollingGroupOptions>,305/// Take only a slice of the result306pub slice: Option<(i64, usize)>,307}308309impl GroupbyOptions {310pub(crate) fn is_rolling(&self) -> bool {311#[cfg(feature = "dynamic_group_by")]312{313self.rolling.is_some()314}315#[cfg(not(feature = "dynamic_group_by"))]316{317false318}319}320321pub(crate) fn is_dynamic(&self) -> bool {322#[cfg(feature = "dynamic_group_by")]323{324self.dynamic.is_some()325}326#[cfg(not(feature = "dynamic_group_by"))]327{328false329}330}331}332333#[derive(Clone, Debug, Eq, PartialEq, Default, Hash)]334#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]335#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]336pub struct DistinctOptionsDSL {337/// Subset of columns that will be taken into account.338pub subset: Option<Selector>,339/// This will maintain the order of the input.340/// Note that this is more expensive.341/// `maintain_order` is not supported in the streaming342/// engine.343pub maintain_order: bool,344/// Which rows to keep.345pub keep_strategy: UniqueKeepStrategy,346}347348#[derive(Clone, Copy, PartialEq, Eq, Debug)]349pub struct LogicalPlanUdfOptions {350/// allow predicate pushdown optimizations351pub predicate_pd: bool,352/// allow projection pushdown optimizations353pub projection_pd: bool,354// used for formatting355pub fmt_str: &'static str,356}357358#[derive(Clone, PartialEq, Eq, Debug, Default, Hash)]359#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]360pub struct AnonymousScanOptions {361pub skip_rows: Option<usize>,362pub fmt_str: &'static str,363}364365#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]366#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]367#[derive(Clone, Debug, PartialEq, Eq, Hash)]368pub enum FileType {369#[cfg(feature = "parquet")]370Parquet(ParquetWriteOptions),371#[cfg(feature = "ipc")]372Ipc(IpcWriterOptions),373#[cfg(feature = "csv")]374Csv(CsvWriterOptions),375#[cfg(feature = "json")]376Json(JsonWriterOptions),377}378379impl FileType {380pub fn extension(&self) -> &'static str {381match self {382#[cfg(feature = "parquet")]383Self::Parquet(_) => "parquet",384#[cfg(feature = "ipc")]385Self::Ipc(_) => "ipc",386#[cfg(feature = "csv")]387Self::Csv(_) => "csv",388#[cfg(feature = "json")]389Self::Json(_) => "jsonl",390391#[allow(unreachable_patterns)]392_ => unreachable!("enable file type features"),393}394}395}396397//398// Arguments given to `concat`. Differs from `UnionOptions` as the latter is IR state.399#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]400#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]401#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]402pub struct UnionArgs {403pub parallel: bool,404pub rechunk: bool,405pub to_supertypes: bool,406pub diagonal: bool,407// If it is a union from a scan over multiple files.408pub from_partitioned_ds: bool,409pub maintain_order: bool,410}411412impl Default for UnionArgs {413fn default() -> Self {414Self {415parallel: true,416rechunk: false,417to_supertypes: false,418diagonal: false,419from_partitioned_ds: false,420maintain_order: true,421}422}423}424425impl From<UnionArgs> for UnionOptions {426fn from(args: UnionArgs) -> Self {427UnionOptions {428slice: None,429parallel: args.parallel,430rows: (None, 0),431from_partitioned_ds: args.from_partitioned_ds,432flattened_by_opt: false,433rechunk: args.rechunk,434maintain_order: args.maintain_order,435}436}437}438439#[derive(Clone, Debug, PartialEq, Eq, Hash)]440#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]441#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]442#[cfg(feature = "json")]443pub struct NDJsonReadOptions {444pub n_threads: Option<usize>,445pub infer_schema_length: Option<NonZeroUsize>,446pub chunk_size: NonZeroUsize,447pub low_memory: bool,448pub ignore_errors: bool,449pub schema: Option<SchemaRef>,450pub schema_overwrite: Option<SchemaRef>,451}452453454