Path: blob/main/crates/polars-plan/src/dsl/builder_dsl.rs
6939 views
use std::sync::Arc;12use polars_core::prelude::*;3#[cfg(feature = "csv")]4use polars_io::csv::read::CsvReadOptions;5#[cfg(feature = "ipc")]6use polars_io::ipc::IpcScanOptions;7#[cfg(feature = "parquet")]8use polars_io::parquet::read::ParquetOptions;9use polars_utils::unique_id::UniqueId;1011#[cfg(feature = "python")]12use crate::dsl::python_dsl::PythonFunction;13use crate::prelude::*;1415pub struct DslBuilder(pub DslPlan);1617impl From<DslPlan> for DslBuilder {18fn from(lp: DslPlan) -> Self {19DslBuilder(lp)20}21}2223impl DslBuilder {24pub fn anonymous_scan(25function: Arc<dyn AnonymousScan>,26options: AnonymousScanOptions,27unified_scan_args: UnifiedScanArgs,28) -> PolarsResult<Self> {29let schema = unified_scan_args.schema.clone().ok_or_else(|| {30polars_err!(31ComputeError:32"anonymous scan requires schema to be specified in unified_scan_args"33)34})?;3536Ok(DslPlan::Scan {37sources: ScanSources::default(),38unified_scan_args: Box::new(unified_scan_args),39scan_type: Box::new(FileScanDsl::Anonymous {40function,41options: Arc::new(options),42file_info: FileInfo {43schema: schema.clone(),44reader_schema: Some(either::Either::Right(schema)),45..Default::default()46},47}),48cached_ir: Default::default(),49}50.into())51}5253#[cfg(feature = "parquet")]54#[allow(clippy::too_many_arguments)]55pub fn scan_parquet(56sources: ScanSources,57options: ParquetOptions,58unified_scan_args: UnifiedScanArgs,59) -> PolarsResult<Self> {60Ok(DslPlan::Scan {61sources,62unified_scan_args: Box::new(unified_scan_args),63scan_type: Box::new(FileScanDsl::Parquet { options }),64cached_ir: Default::default(),65}66.into())67}6869#[cfg(feature = "ipc")]70#[allow(clippy::too_many_arguments)]71pub fn scan_ipc(72sources: ScanSources,73options: IpcScanOptions,74unified_scan_args: UnifiedScanArgs,75) -> PolarsResult<Self> {76Ok(DslPlan::Scan {77sources,78unified_scan_args: Box::new(unified_scan_args),79scan_type: Box::new(FileScanDsl::Ipc { options }),80cached_ir: Default::default(),81}82.into())83}8485#[allow(clippy::too_many_arguments)]86#[cfg(feature = "csv")]87pub fn scan_csv(88sources: ScanSources,89options: CsvReadOptions,90unified_scan_args: UnifiedScanArgs,91) -> PolarsResult<Self> {92Ok(DslPlan::Scan {93sources,94unified_scan_args: Box::new(unified_scan_args),95scan_type: Box::new(FileScanDsl::Csv { options }),96cached_ir: Default::default(),97}98.into())99}100101#[cfg(feature = "python")]102pub fn scan_python_dataset(103dataset_object: polars_utils::python_function::PythonObject,104) -> DslBuilder {105use super::python_dataset::PythonDatasetProvider;106107DslPlan::Scan {108sources: ScanSources::default(),109unified_scan_args: Default::default(),110scan_type: Box::new(FileScanDsl::PythonDataset {111dataset_object: Arc::new(PythonDatasetProvider::new(dataset_object)),112}),113cached_ir: Default::default(),114}115.into()116}117118pub fn cache(self) -> Self {119let input = Arc::new(self.0);120DslPlan::Cache {121input,122id: UniqueId::new(),123}124.into()125}126127pub fn drop(self, columns: Selector) -> Self {128self.project(vec![Expr::Selector(!columns)], ProjectionOptions::default())129}130131pub fn project(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {132DslPlan::Select {133expr: exprs,134input: Arc::new(self.0),135options,136}137.into()138}139140pub fn fill_null(self, fill_value: Expr) -> Self {141self.project(142vec![all().as_expr().fill_null(fill_value)],143ProjectionOptions {144duplicate_check: false,145..Default::default()146},147)148}149150pub fn drop_nans(self, subset: Option<Selector>) -> Self {151let is_nan = subset152.unwrap_or(DataTypeSelector::Float.as_selector())153.as_expr()154.is_nan();155self.remove(any_horizontal([is_nan]).unwrap())156}157158pub fn drop_nulls(self, subset: Option<Selector>) -> Self {159let is_not_null = subset.unwrap_or(Selector::Wildcard).as_expr().is_not_null();160self.filter(all_horizontal([is_not_null]).unwrap())161}162163pub fn fill_nan(self, fill_value: Expr) -> Self {164self.map_private(DslFunction::FillNan(fill_value))165}166167pub fn with_columns(self, exprs: Vec<Expr>, options: ProjectionOptions) -> Self {168if exprs.is_empty() {169return self;170}171172DslPlan::HStack {173input: Arc::new(self.0),174exprs,175options,176}177.into()178}179180pub fn match_to_schema(181self,182match_schema: SchemaRef,183per_column: Arc<[MatchToSchemaPerColumn]>,184extra_columns: ExtraColumnsPolicy,185) -> Self {186DslPlan::MatchToSchema {187input: Arc::new(self.0),188match_schema,189per_column,190extra_columns,191}192.into()193}194195pub fn pipe_with_schema(self, callback: PlanCallback<(DslPlan, Schema), DslPlan>) -> Self {196DslPlan::PipeWithSchema {197input: Arc::new(self.0),198callback,199}200.into()201}202203pub fn with_context(self, contexts: Vec<DslPlan>) -> Self {204DslPlan::ExtContext {205input: Arc::new(self.0),206contexts,207}208.into()209}210211/// Apply a filter predicate, keeping the rows that match it.212pub fn filter(self, predicate: Expr) -> Self {213DslPlan::Filter {214predicate,215input: Arc::new(self.0),216}217.into()218}219220/// Remove rows matching a filter predicate (note that rows221/// where the predicate resolves to `null` are *not* removed).222pub fn remove(self, predicate: Expr) -> Self {223DslPlan::Filter {224predicate: predicate.neq_missing(lit(true)),225input: Arc::new(self.0),226}227.into()228}229230pub fn group_by<E: AsRef<[Expr]>>(231self,232keys: Vec<Expr>,233aggs: E,234apply: Option<(PlanCallback<DataFrame, DataFrame>, SchemaRef)>,235maintain_order: bool,236#[cfg(feature = "dynamic_group_by")] dynamic_options: Option<DynamicGroupOptions>,237#[cfg(feature = "dynamic_group_by")] rolling_options: Option<RollingGroupOptions>,238) -> Self {239let aggs = aggs.as_ref().to_vec();240let options = GroupbyOptions {241#[cfg(feature = "dynamic_group_by")]242dynamic: dynamic_options,243#[cfg(feature = "dynamic_group_by")]244rolling: rolling_options,245slice: None,246};247248DslPlan::GroupBy {249input: Arc::new(self.0),250keys,251aggs,252apply,253maintain_order,254options: Arc::new(options),255}256.into()257}258259pub fn build(self) -> DslPlan {260self.0261}262263pub fn from_existing_df(df: DataFrame) -> Self {264let schema = df.schema().clone();265DslPlan::DataFrameScan {266df: Arc::new(df),267schema,268}269.into()270}271272pub fn sort(self, by_column: Vec<Expr>, sort_options: SortMultipleOptions) -> Self {273DslPlan::Sort {274input: Arc::new(self.0),275by_column,276slice: None,277sort_options,278}279.into()280}281282pub fn explode(self, columns: Selector, allow_empty: bool) -> Self {283DslPlan::MapFunction {284input: Arc::new(self.0),285function: DslFunction::Explode {286columns,287allow_empty,288},289}290.into()291}292293#[cfg(feature = "pivot")]294pub fn unpivot(self, args: UnpivotArgsDSL) -> Self {295DslPlan::MapFunction {296input: Arc::new(self.0),297function: DslFunction::Unpivot { args },298}299.into()300}301302pub fn row_index(self, name: PlSmallStr, offset: Option<IdxSize>) -> Self {303DslPlan::MapFunction {304input: Arc::new(self.0),305function: DslFunction::RowIndex { name, offset },306}307.into()308}309310pub fn distinct(self, options: DistinctOptionsDSL) -> Self {311DslPlan::Distinct {312input: Arc::new(self.0),313options,314}315.into()316}317318pub fn slice(self, offset: i64, len: IdxSize) -> Self {319DslPlan::Slice {320input: Arc::new(self.0),321offset,322len,323}324.into()325}326327pub fn join(328self,329other: DslPlan,330left_on: Vec<Expr>,331right_on: Vec<Expr>,332options: Arc<JoinOptions>,333) -> Self {334DslPlan::Join {335input_left: Arc::new(self.0),336input_right: Arc::new(other),337left_on,338right_on,339predicates: Default::default(),340options,341}342.into()343}344pub fn map_private(self, function: DslFunction) -> Self {345DslPlan::MapFunction {346input: Arc::new(self.0),347function,348}349.into()350}351352#[cfg(feature = "python")]353pub fn map_python(354self,355function: PythonFunction,356optimizations: AllowedOptimizations,357schema: Option<SchemaRef>,358validate_output: bool,359) -> Self {360DslPlan::MapFunction {361input: Arc::new(self.0),362function: DslFunction::OpaquePython(OpaquePythonUdf {363function,364schema,365predicate_pd: optimizations.contains(OptFlags::PREDICATE_PUSHDOWN),366projection_pd: optimizations.contains(OptFlags::PROJECTION_PUSHDOWN),367streamable: optimizations.contains(OptFlags::NEW_STREAMING),368validate_output,369}),370}371.into()372}373374pub fn map<F>(375self,376function: F,377optimizations: AllowedOptimizations,378schema: Option<Arc<dyn UdfSchema>>,379name: PlSmallStr,380) -> Self381where382F: DataFrameUdf + 'static,383{384let function = Arc::new(function);385386DslPlan::MapFunction {387input: Arc::new(self.0),388function: DslFunction::FunctionIR(FunctionIR::Opaque {389function,390schema,391predicate_pd: optimizations.contains(OptFlags::PREDICATE_PUSHDOWN),392projection_pd: optimizations.contains(OptFlags::PROJECTION_PUSHDOWN),393streamable: optimizations.contains(OptFlags::NEW_STREAMING),394fmt_str: name,395}),396}397.into()398}399}400401402