Path: blob/main/crates/polars-plan/src/dsl/options/mod.rs
8458 views
use std::hash::Hash;1#[cfg(feature = "json")]2use std::num::NonZeroUsize;3use std::sync::Arc;45pub mod file_provider;6pub mod sink;7pub use polars_config::Engine;8use polars_core::error::PolarsResult;9use polars_core::prelude::*;10#[cfg(feature = "csv")]11use polars_io::csv::write::CsvWriterOptions;12#[cfg(feature = "ipc")]13use polars_io::ipc::IpcWriterOptions;14#[cfg(feature = "json")]15use polars_io::ndjson::NDJsonWriterOptions;16#[cfg(feature = "parquet")]17use polars_io::parquet::write::ParquetWriteOptions;18#[cfg(feature = "iejoin")]19use polars_ops::frame::IEJoinOptions;20use polars_ops::frame::{CrossJoinFilter, CrossJoinOptions, JoinTypeOptions};21use polars_ops::prelude::{JoinArgs, JoinType};22#[cfg(feature = "dynamic_group_by")]23use polars_time::DynamicGroupOptions;24#[cfg(feature = "dynamic_group_by")]25use polars_time::RollingGroupOptions;26use polars_utils::IdxSize;27use polars_utils::pl_str::PlSmallStr;28#[cfg(feature = "serde")]29use serde::{Deserialize, Serialize};30pub use sink::{31CallbackSinkType, FileSinkOptions, PartitionStrategy, PartitionStrategyIR,32PartitionedSinkOptions, PartitionedSinkOptionsIR, SinkDestination, SinkTarget, SinkType,33SinkTypeIR, UnifiedSinkArgs,34};35use strum_macros::IntoStaticStr;3637use super::Expr;38use crate::dsl::Selector;39use crate::plans::ExprIR;4041#[derive(Copy, Clone, PartialEq, Debug, Eq, Hash)]42#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]43#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]44pub struct RollingCovOptions {45pub window_size: IdxSize,46pub min_periods: IdxSize,47pub ddof: u8,48}4950#[derive(Clone, PartialEq, Debug, Eq, Hash)]51#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]52#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]53pub struct StrptimeOptions {54/// Formatting string55pub format: Option<PlSmallStr>,56/// If set then polars will return an error if any date parsing fails57pub strict: bool,58/// If polars may parse matches that not contain the whole string59/// e.g. "foo-2021-01-01-bar" could match "2021-01-01"60pub exact: bool,61/// use a cache of unique, converted dates to apply the datetime conversion.62pub cache: bool,63}6465impl Default for StrptimeOptions {66fn default() -> Self {67StrptimeOptions {68format: None,69strict: true,70exact: true,71cache: true,72}73}74}7576#[derive(Clone, PartialEq, Eq, IntoStaticStr, Debug)]77#[cfg_attr(feature = "ir_serde", derive(Serialize, Deserialize))]78#[strum(serialize_all = "snake_case")]79pub enum JoinTypeOptionsIR {80#[cfg(feature = "iejoin")]81IEJoin(IEJoinOptions),82// Fused cross join and filter (only used in the in-memory engine)83CrossAndFilter {84predicate: ExprIR, // Must be elementwise.85},86}8788impl Hash for JoinTypeOptionsIR {89fn hash<H: std::hash::Hasher>(&self, state: &mut H) {90use JoinTypeOptionsIR::*;91match self {92#[cfg(feature = "iejoin")]93IEJoin(opt) => opt.hash(state),94CrossAndFilter { predicate } => {95predicate.node().hash(state);96},97}98}99}100101impl JoinTypeOptionsIR {102pub fn compile<C: FnOnce(&ExprIR) -> PolarsResult<Arc<dyn CrossJoinFilter>>>(103self,104plan: C,105) -> PolarsResult<JoinTypeOptions> {106use JoinTypeOptionsIR::*;107match self {108CrossAndFilter { predicate } => {109let predicate = plan(&predicate)?;110111Ok(JoinTypeOptions::Cross(CrossJoinOptions { predicate }))112},113#[cfg(feature = "iejoin")]114IEJoin(opt) => Ok(JoinTypeOptions::IEJoin(opt)),115}116}117}118119#[derive(Clone, Debug, PartialEq, Hash)]120#[cfg_attr(feature = "ir_serde", derive(Serialize, Deserialize))]121pub struct JoinOptionsIR {122pub allow_parallel: bool,123pub force_parallel: bool,124pub args: JoinArgs,125pub options: Option<JoinTypeOptionsIR>,126}127128impl From<JoinOptions> for JoinOptionsIR {129fn from(opts: JoinOptions) -> Self {130Self {131allow_parallel: opts.allow_parallel,132force_parallel: opts.force_parallel,133args: opts.args,134options: Default::default(),135}136}137}138139#[derive(Clone, Debug, PartialEq, Hash)]140#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]141#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]142pub struct JoinOptions {143pub allow_parallel: bool,144pub force_parallel: bool,145pub args: JoinArgs,146}147148impl Default for JoinOptions {149fn default() -> Self {150Self {151allow_parallel: true,152force_parallel: false,153// Todo!: make default154args: JoinArgs::new(JoinType::Left),155}156}157}158159impl From<JoinOptionsIR> for JoinOptions {160fn from(opts: JoinOptionsIR) -> Self {161Self {162allow_parallel: opts.allow_parallel,163force_parallel: opts.force_parallel,164args: opts.args,165}166}167}168169#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Hash, IntoStaticStr)]170#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]171#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]172#[strum(serialize_all = "snake_case")]173pub enum WindowMapping {174/// Map the group values to the position175#[default]176GroupsToRows,177/// Explode the aggregated list and just do a hstack instead of a join178/// this requires the groups to be sorted to make any sense179Explode,180/// Join the groups as 'List<group_dtype>' to the row positions.181/// warning: this can be memory intensive182Join,183}184185#[derive(Clone, Debug, PartialEq, Eq, Hash)]186#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]187#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]188pub struct UnpivotArgsDSL {189pub on: Option<Selector>,190pub index: Selector,191pub variable_name: Option<PlSmallStr>,192pub value_name: Option<PlSmallStr>,193}194195#[derive(Clone, Debug, Copy, Eq, PartialEq, Hash)]196#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]197pub struct UnionOptions {198pub slice: Option<(i64, usize)>,199// known row_output, estimated row output200pub rows: (Option<usize>, usize),201pub parallel: bool,202pub from_partitioned_ds: bool,203pub flattened_by_opt: bool,204pub rechunk: bool,205pub maintain_order: bool,206}207208impl Default for UnionOptions {209fn default() -> Self {210Self {211slice: None,212rows: (None, 0),213parallel: true,214from_partitioned_ds: false,215flattened_by_opt: false,216rechunk: false,217maintain_order: true,218}219}220}221222#[derive(Clone, Debug, Copy, Eq, PartialEq, Hash)]223#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]224#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]225pub struct HConcatOptions {226pub parallel: bool,227pub strict: bool,228// Treat unit values as scalar.229// E.g. broadcast them instead of fill nulls.230pub broadcast_unit_length: bool,231}232233impl Default for HConcatOptions {234fn default() -> Self {235Self {236parallel: true,237strict: false,238broadcast_unit_length: false,239}240}241}242243#[derive(Clone, Debug, PartialEq, Eq, Default, Hash)]244#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]245#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]246pub struct GroupbyOptions {247#[cfg(feature = "dynamic_group_by")]248pub dynamic: Option<DynamicGroupOptions>,249#[cfg(feature = "dynamic_group_by")]250pub rolling: Option<RollingGroupOptions>,251/// Take only a slice of the result252pub slice: Option<(i64, usize)>,253}254255impl GroupbyOptions {256pub fn is_rolling(&self) -> bool {257#[cfg(feature = "dynamic_group_by")]258{259self.rolling.is_some()260}261#[cfg(not(feature = "dynamic_group_by"))]262{263false264}265}266267pub fn is_dynamic(&self) -> bool {268#[cfg(feature = "dynamic_group_by")]269{270self.dynamic.is_some()271}272#[cfg(not(feature = "dynamic_group_by"))]273{274false275}276}277}278279#[derive(Clone, Debug, Eq, PartialEq, Default, Hash)]280#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]281#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]282pub struct DistinctOptionsDSL {283/// Subset of columns/expressions that will be taken into account.284pub subset: Option<Vec<Expr>>,285/// This will maintain the order of the input.286/// Note that this is more expensive.287/// `maintain_order` is not supported in the streaming288/// engine.289pub maintain_order: bool,290/// Which rows to keep.291pub keep_strategy: UniqueKeepStrategy,292}293294#[derive(Clone, Copy, PartialEq, Eq, Debug)]295pub struct LogicalPlanUdfOptions {296/// allow predicate pushdown optimizations297pub predicate_pd: bool,298/// allow projection pushdown optimizations299pub projection_pd: bool,300// used for formatting301pub fmt_str: &'static str,302}303304#[derive(Clone, PartialEq, Eq, Debug, Default, Hash)]305#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]306pub struct AnonymousScanOptions {307pub skip_rows: Option<usize>,308pub fmt_str: &'static str,309}310311const _: () = {312assert!(std::mem::size_of::<FileWriteFormat>() <= 50);313};314315#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]316#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]317#[derive(Clone, Debug, PartialEq, Eq, Hash, strum_macros::IntoStaticStr)]318pub enum FileWriteFormat {319#[cfg(feature = "parquet")]320Parquet(Arc<ParquetWriteOptions>),321#[cfg(feature = "ipc")]322Ipc(IpcWriterOptions),323#[cfg(feature = "csv")]324Csv(CsvWriterOptions),325#[cfg(feature = "json")]326NDJson(NDJsonWriterOptions),327}328329impl FileWriteFormat {330pub fn extension(&self) -> &'static str {331match self {332#[cfg(feature = "parquet")]333Self::Parquet(_) => "parquet",334#[cfg(feature = "ipc")]335Self::Ipc(_) => "ipc",336#[cfg(feature = "csv")]337Self::Csv(_) => "csv",338#[cfg(feature = "json")]339Self::NDJson(_) => "jsonl",340341#[allow(unreachable_patterns)]342_ => unreachable!("enable file type features"),343}344}345}346347//348// Arguments given to `concat`. Differs from `UnionOptions` as the latter is IR state.349#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]350#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]351#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]352pub struct UnionArgs {353pub parallel: bool,354pub rechunk: bool,355pub to_supertypes: bool,356pub diagonal: bool,357pub strict: bool,358// If it is a union from a scan over multiple files.359pub from_partitioned_ds: bool,360pub maintain_order: bool,361}362363impl Default for UnionArgs {364fn default() -> Self {365Self {366parallel: true,367rechunk: false,368to_supertypes: false,369diagonal: false,370// By default, strict should be true in v2.0.0371strict: false,372from_partitioned_ds: false,373maintain_order: true,374}375}376}377378impl From<UnionArgs> for UnionOptions {379fn from(args: UnionArgs) -> Self {380UnionOptions {381slice: None,382parallel: args.parallel,383rows: (None, 0),384from_partitioned_ds: args.from_partitioned_ds,385flattened_by_opt: false,386rechunk: args.rechunk,387maintain_order: args.maintain_order,388}389}390}391392#[derive(Clone, Debug, PartialEq, Eq, Hash)]393#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]394#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]395#[cfg(feature = "json")]396pub struct NDJsonReadOptions {397pub n_threads: Option<usize>,398pub infer_schema_length: Option<NonZeroUsize>,399pub chunk_size: NonZeroUsize,400pub low_memory: bool,401pub ignore_errors: bool,402pub schema: Option<SchemaRef>,403pub schema_overwrite: Option<SchemaRef>,404}405406407