Path: blob/main/crates/polars-plan/src/plans/ir/schema.rs
8458 views
use recursive::recursive;12use super::*;34impl IR {5/// Get the schema of the logical plan node but don't take projections into account at the scan6/// level. This ensures we can apply the predicate7pub(crate) fn scan_schema(&self) -> &SchemaRef {8use IR::*;9match self {10Scan { file_info, .. } => &file_info.schema,11#[cfg(feature = "python")]12PythonScan { options, .. } => &options.schema,13_ => unreachable!(),14}15}1617pub fn name(&self) -> &'static str {18use IR::*;19match self {20Scan { scan_type, .. } => (&**scan_type).into(),21#[cfg(feature = "python")]22PythonScan { .. } => "python_scan",23Slice { .. } => "slice",24Filter { .. } => "filter",25DataFrameScan { .. } => "df",26Select { .. } => "projection",27Sort { .. } => "sort",28Cache { .. } => "cache",29GroupBy { .. } => "aggregate",30Join { .. } => "join",31HStack { .. } => "hstack",32Distinct { .. } => "distinct",33MapFunction { .. } => "map_function",34Union { .. } => "union",35HConcat { .. } => "hconcat",36ExtContext { .. } => "ext_context",37Sink { payload, .. } => match payload {38SinkTypeIR::Memory => "sink (memory)",39SinkTypeIR::Callback(..) => "sink (callback)",40SinkTypeIR::File { .. } => "sink (file)",41SinkTypeIR::Partitioned { .. } => "sink (partition)",42},43SinkMultiple { .. } => "sink multiple",44SimpleProjection { .. } => "simple_projection",45#[cfg(feature = "merge_sorted")]46MergeSorted { .. } => "merge_sorted",47Invalid => "invalid",48}49}5051pub fn input_schema<'a>(&'a self, arena: &'a Arena<IR>) -> Option<Cow<'a, SchemaRef>> {52use IR::*;53let schema = match self {54#[cfg(feature = "python")]55PythonScan { options } => &options.schema,56DataFrameScan { schema, .. } => schema,57Scan { file_info, .. } => &file_info.schema,58node => {59let input = node.get_input()?;60return Some(arena.get(input).schema(arena));61},62};63Some(Cow::Borrowed(schema))64}6566/// Get the schema of the logical plan node.67#[recursive]68pub fn schema<'a>(&'a self, arena: &'a Arena<IR>) -> Cow<'a, SchemaRef> {69use IR::*;70let schema = match self {71#[cfg(feature = "python")]72PythonScan { options } => options.output_schema.as_ref().unwrap_or(&options.schema),73Union { inputs, .. } => return arena.get(inputs[0]).schema(arena),74HConcat { schema, .. } => schema,75Cache { input, .. } => return arena.get(*input).schema(arena),76Sort { input, .. } => return arena.get(*input).schema(arena),77Scan {78output_schema,79file_info,80..81} => output_schema.as_ref().unwrap_or(&file_info.schema),82DataFrameScan {83schema,84output_schema,85..86} => output_schema.as_ref().unwrap_or(schema),87Filter { input, .. } => return arena.get(*input).schema(arena),88Select { schema, .. } => schema,89SimpleProjection { columns, .. } => columns,90GroupBy { schema, .. } => schema,91Join { schema, .. } => schema,92HStack { schema, .. } => schema,93Distinct { input, .. }94| Sink {95input,96payload: SinkTypeIR::Memory,97} => return arena.get(*input).schema(arena),98Sink { .. } | SinkMultiple { .. } => return Cow::Owned(Arc::new(Schema::default())),99Slice { input, .. } => return arena.get(*input).schema(arena),100MapFunction { input, function } => {101let input_schema = arena.get(*input).schema(arena);102103return match input_schema {104Cow::Owned(schema) => {105Cow::Owned(function.schema(&schema).unwrap().into_owned())106},107Cow::Borrowed(schema) => function.schema(schema).unwrap(),108};109},110ExtContext { schema, .. } => schema,111#[cfg(feature = "merge_sorted")]112MergeSorted { input_left, .. } => return arena.get(*input_left).schema(arena),113Invalid => unreachable!(),114};115Cow::Borrowed(schema)116}117118/// Get the schema of the logical plan node, using caching.119#[recursive]120pub fn schema_with_cache<'a>(121node: Node,122arena: &'a Arena<IR>,123cache: &mut PlHashMap<Node, Arc<Schema>>,124) -> Arc<Schema> {125use IR::*;126if let Some(schema) = cache.get(&node) {127return schema.clone();128}129130let schema = match arena.get(node) {131#[cfg(feature = "python")]132PythonScan { options } => options133.output_schema134.as_ref()135.unwrap_or(&options.schema)136.clone(),137Union { inputs, .. } => IR::schema_with_cache(inputs[0], arena, cache),138HConcat { schema, .. } => schema.clone(),139Cache { input, .. }140| Sort { input, .. }141| Filter { input, .. }142| Distinct { input, .. }143| Sink {144input,145payload: SinkTypeIR::Memory,146}147| Slice { input, .. } => IR::schema_with_cache(*input, arena, cache),148Sink { .. } | SinkMultiple { .. } => Arc::new(Schema::default()),149Scan {150output_schema,151file_info,152..153} => output_schema.as_ref().unwrap_or(&file_info.schema).clone(),154DataFrameScan {155schema,156output_schema,157..158} => output_schema.as_ref().unwrap_or(schema).clone(),159Select { schema, .. }160| GroupBy { schema, .. }161| Join { schema, .. }162| HStack { schema, .. }163| ExtContext { schema, .. }164| SimpleProjection {165columns: schema, ..166} => schema.clone(),167MapFunction { input, function } => {168let input_schema = IR::schema_with_cache(*input, arena, cache);169function.schema(&input_schema).unwrap().into_owned()170},171#[cfg(feature = "merge_sorted")]172MergeSorted { input_left, .. } => IR::schema_with_cache(*input_left, arena, cache),173Invalid => unreachable!(),174};175cache.insert(node, schema.clone());176schema177}178}179180181