Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/ir/schema.rs
6940 views
1
use recursive::recursive;
2
3
use super::*;
4
5
impl IR {
6
/// Get the schema of the logical plan node but don't take projections into account at the scan
7
/// level. This ensures we can apply the predicate
8
pub(crate) fn scan_schema(&self) -> &SchemaRef {
9
use IR::*;
10
match self {
11
Scan { file_info, .. } => &file_info.schema,
12
#[cfg(feature = "python")]
13
PythonScan { options, .. } => &options.schema,
14
_ => unreachable!(),
15
}
16
}
17
18
pub fn name(&self) -> &'static str {
19
use IR::*;
20
match self {
21
Scan { scan_type, .. } => (&**scan_type).into(),
22
#[cfg(feature = "python")]
23
PythonScan { .. } => "python_scan",
24
Slice { .. } => "slice",
25
Filter { .. } => "filter",
26
DataFrameScan { .. } => "df",
27
Select { .. } => "projection",
28
Sort { .. } => "sort",
29
Cache { .. } => "cache",
30
GroupBy { .. } => "aggregate",
31
Join { .. } => "join",
32
HStack { .. } => "hstack",
33
Distinct { .. } => "distinct",
34
MapFunction { .. } => "map_function",
35
Union { .. } => "union",
36
HConcat { .. } => "hconcat",
37
ExtContext { .. } => "ext_context",
38
Sink { payload, .. } => match payload {
39
SinkTypeIR::Memory => "sink (memory)",
40
SinkTypeIR::File { .. } => "sink (file)",
41
SinkTypeIR::Partition { .. } => "sink (partition)",
42
},
43
SinkMultiple { .. } => "sink multiple",
44
SimpleProjection { .. } => "simple_projection",
45
#[cfg(feature = "merge_sorted")]
46
MergeSorted { .. } => "merge_sorted",
47
Invalid => "invalid",
48
}
49
}
50
51
pub fn input_schema<'a>(&'a self, arena: &'a Arena<IR>) -> Option<Cow<'a, SchemaRef>> {
52
use IR::*;
53
let schema = match self {
54
#[cfg(feature = "python")]
55
PythonScan { options } => &options.schema,
56
DataFrameScan { schema, .. } => schema,
57
Scan { file_info, .. } => &file_info.schema,
58
node => {
59
let input = node.get_input()?;
60
return Some(arena.get(input).schema(arena));
61
},
62
};
63
Some(Cow::Borrowed(schema))
64
}
65
66
/// Get the schema of the logical plan node.
67
#[recursive]
68
pub fn schema<'a>(&'a self, arena: &'a Arena<IR>) -> Cow<'a, SchemaRef> {
69
use IR::*;
70
let schema = match self {
71
#[cfg(feature = "python")]
72
PythonScan { options } => options.output_schema.as_ref().unwrap_or(&options.schema),
73
Union { inputs, .. } => return arena.get(inputs[0]).schema(arena),
74
HConcat { schema, .. } => schema,
75
Cache { input, .. } => return arena.get(*input).schema(arena),
76
Sort { input, .. } => return arena.get(*input).schema(arena),
77
Scan {
78
output_schema,
79
file_info,
80
..
81
} => output_schema.as_ref().unwrap_or(&file_info.schema),
82
DataFrameScan {
83
schema,
84
output_schema,
85
..
86
} => output_schema.as_ref().unwrap_or(schema),
87
Filter { input, .. } => return arena.get(*input).schema(arena),
88
Select { schema, .. } => schema,
89
SimpleProjection { columns, .. } => columns,
90
GroupBy { schema, .. } => schema,
91
Join { schema, .. } => schema,
92
HStack { schema, .. } => schema,
93
Distinct { input, .. }
94
| Sink {
95
input,
96
payload: SinkTypeIR::Memory,
97
} => return arena.get(*input).schema(arena),
98
Sink { .. } | SinkMultiple { .. } => return Cow::Owned(Arc::new(Schema::default())),
99
Slice { input, .. } => return arena.get(*input).schema(arena),
100
MapFunction { input, function } => {
101
let input_schema = arena.get(*input).schema(arena);
102
103
return match input_schema {
104
Cow::Owned(schema) => {
105
Cow::Owned(function.schema(&schema).unwrap().into_owned())
106
},
107
Cow::Borrowed(schema) => function.schema(schema).unwrap(),
108
};
109
},
110
ExtContext { schema, .. } => schema,
111
#[cfg(feature = "merge_sorted")]
112
MergeSorted { input_left, .. } => return arena.get(*input_left).schema(arena),
113
Invalid => unreachable!(),
114
};
115
Cow::Borrowed(schema)
116
}
117
118
/// Get the schema of the logical plan node, using caching.
119
#[recursive]
120
pub fn schema_with_cache<'a>(
121
node: Node,
122
arena: &'a Arena<IR>,
123
cache: &mut PlHashMap<Node, Arc<Schema>>,
124
) -> Arc<Schema> {
125
use IR::*;
126
if let Some(schema) = cache.get(&node) {
127
return schema.clone();
128
}
129
130
let schema = match arena.get(node) {
131
#[cfg(feature = "python")]
132
PythonScan { options } => options
133
.output_schema
134
.as_ref()
135
.unwrap_or(&options.schema)
136
.clone(),
137
Union { inputs, .. } => IR::schema_with_cache(inputs[0], arena, cache),
138
HConcat { schema, .. } => schema.clone(),
139
Cache { input, .. }
140
| Sort { input, .. }
141
| Filter { input, .. }
142
| Distinct { input, .. }
143
| Sink {
144
input,
145
payload: SinkTypeIR::Memory,
146
}
147
| Slice { input, .. } => IR::schema_with_cache(*input, arena, cache),
148
Sink { .. } | SinkMultiple { .. } => Arc::new(Schema::default()),
149
Scan {
150
output_schema,
151
file_info,
152
..
153
} => output_schema.as_ref().unwrap_or(&file_info.schema).clone(),
154
DataFrameScan {
155
schema,
156
output_schema,
157
..
158
} => output_schema.as_ref().unwrap_or(schema).clone(),
159
Select { schema, .. }
160
| GroupBy { schema, .. }
161
| Join { schema, .. }
162
| HStack { schema, .. }
163
| ExtContext { schema, .. }
164
| SimpleProjection {
165
columns: schema, ..
166
} => schema.clone(),
167
MapFunction { input, function } => {
168
let input_schema = IR::schema_with_cache(*input, arena, cache);
169
function.schema(&input_schema).unwrap().into_owned()
170
},
171
#[cfg(feature = "merge_sorted")]
172
MergeSorted { input_left, .. } => IR::schema_with_cache(*input_left, arena, cache),
173
Invalid => unreachable!(),
174
};
175
cache.insert(node, schema.clone());
176
schema
177
}
178
}
179
180