Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/ir/schema.rs
8458 views
1
use recursive::recursive;
2
3
use super::*;
4
5
impl IR {
6
/// Get the schema of the logical plan node but don't take projections into account at the scan
7
/// level. This ensures we can apply the predicate
8
pub(crate) fn scan_schema(&self) -> &SchemaRef {
9
use IR::*;
10
match self {
11
Scan { file_info, .. } => &file_info.schema,
12
#[cfg(feature = "python")]
13
PythonScan { options, .. } => &options.schema,
14
_ => unreachable!(),
15
}
16
}
17
18
pub fn name(&self) -> &'static str {
19
use IR::*;
20
match self {
21
Scan { scan_type, .. } => (&**scan_type).into(),
22
#[cfg(feature = "python")]
23
PythonScan { .. } => "python_scan",
24
Slice { .. } => "slice",
25
Filter { .. } => "filter",
26
DataFrameScan { .. } => "df",
27
Select { .. } => "projection",
28
Sort { .. } => "sort",
29
Cache { .. } => "cache",
30
GroupBy { .. } => "aggregate",
31
Join { .. } => "join",
32
HStack { .. } => "hstack",
33
Distinct { .. } => "distinct",
34
MapFunction { .. } => "map_function",
35
Union { .. } => "union",
36
HConcat { .. } => "hconcat",
37
ExtContext { .. } => "ext_context",
38
Sink { payload, .. } => match payload {
39
SinkTypeIR::Memory => "sink (memory)",
40
SinkTypeIR::Callback(..) => "sink (callback)",
41
SinkTypeIR::File { .. } => "sink (file)",
42
SinkTypeIR::Partitioned { .. } => "sink (partition)",
43
},
44
SinkMultiple { .. } => "sink multiple",
45
SimpleProjection { .. } => "simple_projection",
46
#[cfg(feature = "merge_sorted")]
47
MergeSorted { .. } => "merge_sorted",
48
Invalid => "invalid",
49
}
50
}
51
52
pub fn input_schema<'a>(&'a self, arena: &'a Arena<IR>) -> Option<Cow<'a, SchemaRef>> {
53
use IR::*;
54
let schema = match self {
55
#[cfg(feature = "python")]
56
PythonScan { options } => &options.schema,
57
DataFrameScan { schema, .. } => schema,
58
Scan { file_info, .. } => &file_info.schema,
59
node => {
60
let input = node.get_input()?;
61
return Some(arena.get(input).schema(arena));
62
},
63
};
64
Some(Cow::Borrowed(schema))
65
}
66
67
/// Get the schema of the logical plan node.
68
#[recursive]
69
pub fn schema<'a>(&'a self, arena: &'a Arena<IR>) -> Cow<'a, SchemaRef> {
70
use IR::*;
71
let schema = match self {
72
#[cfg(feature = "python")]
73
PythonScan { options } => options.output_schema.as_ref().unwrap_or(&options.schema),
74
Union { inputs, .. } => return arena.get(inputs[0]).schema(arena),
75
HConcat { schema, .. } => schema,
76
Cache { input, .. } => return arena.get(*input).schema(arena),
77
Sort { input, .. } => return arena.get(*input).schema(arena),
78
Scan {
79
output_schema,
80
file_info,
81
..
82
} => output_schema.as_ref().unwrap_or(&file_info.schema),
83
DataFrameScan {
84
schema,
85
output_schema,
86
..
87
} => output_schema.as_ref().unwrap_or(schema),
88
Filter { input, .. } => return arena.get(*input).schema(arena),
89
Select { schema, .. } => schema,
90
SimpleProjection { columns, .. } => columns,
91
GroupBy { schema, .. } => schema,
92
Join { schema, .. } => schema,
93
HStack { schema, .. } => schema,
94
Distinct { input, .. }
95
| Sink {
96
input,
97
payload: SinkTypeIR::Memory,
98
} => return arena.get(*input).schema(arena),
99
Sink { .. } | SinkMultiple { .. } => return Cow::Owned(Arc::new(Schema::default())),
100
Slice { input, .. } => return arena.get(*input).schema(arena),
101
MapFunction { input, function } => {
102
let input_schema = arena.get(*input).schema(arena);
103
104
return match input_schema {
105
Cow::Owned(schema) => {
106
Cow::Owned(function.schema(&schema).unwrap().into_owned())
107
},
108
Cow::Borrowed(schema) => function.schema(schema).unwrap(),
109
};
110
},
111
ExtContext { schema, .. } => schema,
112
#[cfg(feature = "merge_sorted")]
113
MergeSorted { input_left, .. } => return arena.get(*input_left).schema(arena),
114
Invalid => unreachable!(),
115
};
116
Cow::Borrowed(schema)
117
}
118
119
/// Get the schema of the logical plan node, using caching.
120
#[recursive]
121
pub fn schema_with_cache<'a>(
122
node: Node,
123
arena: &'a Arena<IR>,
124
cache: &mut PlHashMap<Node, Arc<Schema>>,
125
) -> Arc<Schema> {
126
use IR::*;
127
if let Some(schema) = cache.get(&node) {
128
return schema.clone();
129
}
130
131
let schema = match arena.get(node) {
132
#[cfg(feature = "python")]
133
PythonScan { options } => options
134
.output_schema
135
.as_ref()
136
.unwrap_or(&options.schema)
137
.clone(),
138
Union { inputs, .. } => IR::schema_with_cache(inputs[0], arena, cache),
139
HConcat { schema, .. } => schema.clone(),
140
Cache { input, .. }
141
| Sort { input, .. }
142
| Filter { input, .. }
143
| Distinct { input, .. }
144
| Sink {
145
input,
146
payload: SinkTypeIR::Memory,
147
}
148
| Slice { input, .. } => IR::schema_with_cache(*input, arena, cache),
149
Sink { .. } | SinkMultiple { .. } => Arc::new(Schema::default()),
150
Scan {
151
output_schema,
152
file_info,
153
..
154
} => output_schema.as_ref().unwrap_or(&file_info.schema).clone(),
155
DataFrameScan {
156
schema,
157
output_schema,
158
..
159
} => output_schema.as_ref().unwrap_or(schema).clone(),
160
Select { schema, .. }
161
| GroupBy { schema, .. }
162
| Join { schema, .. }
163
| HStack { schema, .. }
164
| ExtContext { schema, .. }
165
| SimpleProjection {
166
columns: schema, ..
167
} => schema.clone(),
168
MapFunction { input, function } => {
169
let input_schema = IR::schema_with_cache(*input, arena, cache);
170
function.schema(&input_schema).unwrap().into_owned()
171
},
172
#[cfg(feature = "merge_sorted")]
173
MergeSorted { input_left, .. } => IR::schema_with_cache(*input_left, arena, cache),
174
Invalid => unreachable!(),
175
};
176
cache.insert(node, schema.clone());
177
schema
178
}
179
}
180
181