Path: blob/main/crates/polars-plan/src/dsl/builder_dsl.rs
8431 views
use std::sync::Arc;12use polars_core::prelude::*;3#[cfg(feature = "csv")]4use polars_io::csv::read::CsvReadOptions;5#[cfg(feature = "ipc")]6use polars_io::ipc::IpcScanOptions;7#[cfg(feature = "parquet")]8use polars_io::parquet::read::ParquetOptions;9use polars_utils::unique_id::UniqueId;1011use crate::dsl::functions::lit;12#[cfg(feature = "python")]13use crate::dsl::python_dsl::PythonFunction;14use crate::prelude::*;15pub struct DslBuilder(pub DslPlan);1617impl From<DslPlan> for DslBuilder {18fn from(lp: DslPlan) -> Self {19DslBuilder(lp)20}21}2223impl DslBuilder {24pub fn anonymous_scan(25function: Arc<dyn AnonymousScan>,26options: AnonymousScanOptions,27unified_scan_args: UnifiedScanArgs,28) -> PolarsResult<Self> {29let schema = unified_scan_args.schema.clone().ok_or_else(|| {30polars_err!(31ComputeError:32"anonymous scan requires schema to be specified in unified_scan_args"33)34})?;3536Ok(DslPlan::Scan {37sources: ScanSources::default(),38unified_scan_args: Box::new(unified_scan_args),39scan_type: Box::new(FileScanDsl::Anonymous {40function,41options: Arc::new(options),42file_info: FileInfo {43schema: schema.clone(),44reader_schema: Some(either::Either::Right(schema)),45..Default::default()46},47}),48cached_ir: Default::default(),49}50.into())51}5253#[cfg(feature = "parquet")]54pub fn scan_parquet(55sources: ScanSources,56options: ParquetOptions,57unified_scan_args: UnifiedScanArgs,58) -> PolarsResult<Self> {59Ok(DslPlan::Scan {60sources,61unified_scan_args: Box::new(unified_scan_args),62scan_type: Box::new(FileScanDsl::Parquet { options }),63cached_ir: Default::default(),64}65.into())66}6768#[cfg(feature = "ipc")]69pub fn scan_ipc(70sources: ScanSources,71options: IpcScanOptions,72unified_scan_args: UnifiedScanArgs,73) -> PolarsResult<Self> {74Ok(DslPlan::Scan {75sources,76unified_scan_args: Box::new(unified_scan_args),77scan_type: Box::new(FileScanDsl::Ipc { options }),78cached_ir: Default::default(),79}80.into())81}8283#[cfg(feature = "scan_lines")]84pub fn scan_lines(85sources: ScanSources,86unified_scan_args: UnifiedScanArgs,87name: PlSmallStr,88) -> PolarsResult<Self> {89Ok(DslPlan::Scan {90sources,91unified_scan_args: Box::new(unified_scan_args),92scan_type: Box::new(FileScanDsl::Lines { name }),93cached_ir: Default::default(),94}95.into())96}9798#[allow(clippy::too_many_arguments)]99#[cfg(feature = "csv")]100pub fn scan_csv(101sources: ScanSources,102options: impl Into<Arc<CsvReadOptions>>,103unified_scan_args: UnifiedScanArgs,104) -> PolarsResult<Self> {105Ok(DslPlan::Scan {106sources,107unified_scan_args: Box::new(unified_scan_args),108scan_type: Box::new(FileScanDsl::Csv {109options: options.into(),110}),111cached_ir: Default::default(),112}113.into())114}115116#[cfg(feature = "python")]117pub fn scan_python_dataset(118dataset_object: polars_utils::python_function::PythonObject,119) -> DslBuilder {120use super::python_dataset::PythonDatasetProvider;121122DslPlan::Scan {123sources: ScanSources::default(),124unified_scan_args: Default::default(),125scan_type: Box::new(FileScanDsl::PythonDataset {126dataset_object: Arc::new(PythonDatasetProvider::new(dataset_object)),127}),128cached_ir: Default::default(),129}130.into()131}132133pub fn cache(self) -> Self {134let input = Arc::new(self.0);135DslPlan::Cache {136input,137id: UniqueId::new(),138}139.into()140}141142pub fn drop(self, columns: Selector) -> Self {143self.project(vec![Expr::Selector(!columns)], ProjectionOptions::default())144}145146pub fn project(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {147DslPlan::Select {148expr: exprs,149input: Arc::new(self.0),150options,151}152.into()153}154155pub fn fill_null(self, fill_value: Expr) -> Self {156self.project(157vec![functions::all().as_expr().fill_null(fill_value)],158ProjectionOptions {159duplicate_check: false,160..Default::default()161},162)163}164165pub fn drop_nans(self, subset: Option<Selector>) -> Self {166let is_nan = subset167.unwrap_or(DataTypeSelector::Float.as_selector())168.as_expr()169.is_nan();170self.remove(functions::any_horizontal([is_nan]).unwrap())171}172173pub fn drop_nulls(self, subset: Option<Selector>) -> Self {174let is_not_null = subset.unwrap_or(Selector::Wildcard).as_expr().is_not_null();175self.filter(functions::all_horizontal([is_not_null]).unwrap())176}177178pub fn fill_nan(self, fill_value: Expr) -> Self {179self.map_private(DslFunction::FillNan(fill_value))180}181182pub fn with_columns(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {183if exprs.is_empty() {184return self;185}186187DslPlan::HStack {188input: Arc::new(self.0),189exprs,190options,191}192.into()193}194195pub fn match_to_schema(196self,197match_schema: SchemaRef,198per_column: Arc<[MatchToSchemaPerColumn]>,199extra_columns: ExtraColumnsPolicy,200) -> Self {201DslPlan::MatchToSchema {202input: Arc::new(self.0),203match_schema,204per_column,205extra_columns,206}207.into()208}209210pub fn pipe_with_schema(211self,212others: Vec<DslPlan>,213callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,214) -> Self {215let mut input = vec![self.0];216input.extend(others);217DslPlan::PipeWithSchema {218input: Arc::from(input),219callback,220}221.into()222}223224pub fn with_context(self, contexts: Vec<DslPlan>) -> Self {225DslPlan::ExtContext {226input: Arc::new(self.0),227contexts,228}229.into()230}231232/// Apply a filter predicate, keeping the rows that match it.233pub fn filter(self, predicate: Expr) -> Self {234DslPlan::Filter {235predicate,236input: Arc::new(self.0),237}238.into()239}240241/// Remove rows matching a filter predicate (note that rows242/// where the predicate resolves to `null` are *not* removed).243pub fn remove(self, predicate: Expr) -> Self {244DslPlan::Filter {245predicate: predicate.neq_missing(lit(true)),246input: Arc::new(self.0),247}248.into()249}250251#[allow(clippy::too_many_arguments)]252pub fn group_by<E: AsRef<[Expr]>>(253self,254keys: Vec<Expr>,255predicates: Vec<Expr>,256aggs: E,257apply: Option<(PlanCallback<DataFrame, DataFrame>, SchemaRef)>,258maintain_order: bool,259#[cfg(feature = "dynamic_group_by")] dynamic_options: Option<DynamicGroupOptions>,260#[cfg(feature = "dynamic_group_by")] rolling_options: Option<RollingGroupOptions>,261) -> Self {262let aggs = aggs.as_ref().to_vec();263let options = GroupbyOptions {264#[cfg(feature = "dynamic_group_by")]265dynamic: dynamic_options,266#[cfg(feature = "dynamic_group_by")]267rolling: rolling_options,268slice: None,269};270271DslPlan::GroupBy {272input: Arc::new(self.0),273keys,274predicates,275aggs,276apply,277maintain_order,278options: Arc::new(options),279}280.into()281}282283pub fn build(self) -> DslPlan {284self.0285}286287pub fn from_existing_df(df: DataFrame) -> Self {288let schema = df.schema().clone();289DslPlan::DataFrameScan {290df: Arc::new(df),291schema,292}293.into()294}295296pub fn sort(self, by_column: Vec<Expr>, sort_options: SortMultipleOptions) -> Self {297DslPlan::Sort {298input: Arc::new(self.0),299by_column,300slice: None,301sort_options,302}303.into()304}305306pub fn explode(self, columns: Selector, options: ExplodeOptions, allow_empty: bool) -> Self {307DslPlan::MapFunction {308input: Arc::new(self.0),309function: DslFunction::Explode {310columns,311options,312allow_empty,313},314}315.into()316}317318#[cfg(feature = "pivot")]319#[expect(clippy::too_many_arguments)]320pub fn pivot(321self,322on: Selector,323on_columns: Arc<DataFrame>,324index: Selector,325values: Selector,326agg: Expr,327maintain_order: bool,328separator: PlSmallStr,329) -> Self {330DslPlan::Pivot {331input: Arc::new(self.0),332on,333on_columns,334index,335values,336agg,337maintain_order,338separator,339}340.into()341}342343#[cfg(feature = "pivot")]344pub fn unpivot(self, args: UnpivotArgsDSL) -> Self {345DslPlan::MapFunction {346input: Arc::new(self.0),347function: DslFunction::Unpivot { args },348}349.into()350}351352pub fn row_index(self, name: PlSmallStr, offset: Option<IdxSize>) -> Self {353DslPlan::MapFunction {354input: Arc::new(self.0),355function: DslFunction::RowIndex { name, offset },356}357.into()358}359360pub fn distinct(self, options: DistinctOptionsDSL) -> Self {361DslPlan::Distinct {362input: Arc::new(self.0),363options,364}365.into()366}367368pub fn slice(self, offset: i64, len: IdxSize) -> Self {369DslPlan::Slice {370input: Arc::new(self.0),371offset,372len,373}374.into()375}376377pub fn join(378self,379other: DslPlan,380left_on: Vec<Expr>,381right_on: Vec<Expr>,382options: Arc<JoinOptions>,383) -> Self {384DslPlan::Join {385input_left: Arc::new(self.0),386input_right: Arc::new(other),387left_on,388right_on,389predicates: Default::default(),390options,391}392.into()393}394pub fn map_private(self, function: DslFunction) -> Self {395DslPlan::MapFunction {396input: Arc::new(self.0),397function,398}399.into()400}401402#[cfg(feature = "python")]403pub fn map_python(404self,405function: PythonFunction,406optimizations: AllowedOptimizations,407schema: Option<SchemaRef>,408validate_output: bool,409) -> Self {410DslPlan::MapFunction {411input: Arc::new(self.0),412function: DslFunction::OpaquePython(OpaquePythonUdf {413function,414schema,415predicate_pd: optimizations.contains(OptFlags::PREDICATE_PUSHDOWN),416projection_pd: optimizations.contains(OptFlags::PROJECTION_PUSHDOWN),417streamable: optimizations.contains(OptFlags::NEW_STREAMING),418validate_output,419}),420}421.into()422}423424pub fn map<F>(425self,426function: F,427optimizations: AllowedOptimizations,428schema: Option<Arc<dyn UdfSchema>>,429name: PlSmallStr,430) -> Self431where432F: DataFrameUdf + 'static,433{434let function = Arc::new(function);435436DslPlan::MapFunction {437input: Arc::new(self.0),438function: DslFunction::FunctionIR(FunctionIR::Opaque {439function,440schema,441predicate_pd: optimizations.contains(OptFlags::PREDICATE_PUSHDOWN),442projection_pd: optimizations.contains(OptFlags::PROJECTION_PUSHDOWN),443streamable: optimizations.contains(OptFlags::NEW_STREAMING),444fmt_str: name,445}),446}447.into()448}449}450451452