Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/functions/schema.rs
6940 views
1
#[cfg(feature = "pivot")]
2
use polars_core::utils::try_get_supertype;
3
4
use super::*;
5
use crate::constants::get_len_name;
6
7
impl FunctionIR {
8
pub(crate) fn clear_cached_schema(&self) {
9
use FunctionIR::*;
10
// We will likely add more branches later
11
#[allow(clippy::single_match)]
12
match self {
13
#[cfg(feature = "pivot")]
14
Unpivot { schema, .. } => {
15
let mut guard = schema.lock().unwrap();
16
*guard = None;
17
},
18
RowIndex { schema, .. } | Explode { schema, .. } => {
19
let mut guard = schema.lock().unwrap();
20
*guard = None;
21
},
22
_ => {},
23
}
24
}
25
26
pub(crate) fn schema<'a>(
27
&self,
28
input_schema: &'a SchemaRef,
29
) -> PolarsResult<Cow<'a, SchemaRef>> {
30
use FunctionIR::*;
31
match self {
32
Opaque { schema, .. } => match schema {
33
None => Ok(Cow::Borrowed(input_schema)),
34
Some(schema_fn) => {
35
let output_schema = schema_fn.get_schema(input_schema)?;
36
Ok(Cow::Owned(output_schema))
37
},
38
},
39
#[cfg(feature = "python")]
40
OpaquePython(OpaquePythonUdf { schema, .. }) => Ok(schema
41
.as_ref()
42
.map(|schema| Cow::Owned(schema.clone()))
43
.unwrap_or_else(|| Cow::Borrowed(input_schema))),
44
FastCount { alias, .. } => {
45
let mut schema: Schema = Schema::with_capacity(1);
46
let name = alias.clone().unwrap_or_else(get_len_name);
47
schema.insert_at_index(0, name, IDX_DTYPE)?;
48
Ok(Cow::Owned(Arc::new(schema)))
49
},
50
Rechunk => Ok(Cow::Borrowed(input_schema)),
51
Unnest { columns: _columns } => {
52
#[cfg(feature = "dtype-struct")]
53
{
54
let mut new_schema = Schema::with_capacity(input_schema.len() * 2);
55
for (name, dtype) in input_schema.iter() {
56
if _columns.iter().any(|item| item == name) {
57
match dtype {
58
DataType::Struct(flds) => {
59
for fld in flds {
60
new_schema
61
.with_column(fld.name().clone(), fld.dtype().clone());
62
}
63
},
64
DataType::Unknown(_) => {
65
// pass through unknown
66
},
67
_ => {
68
polars_bail!(
69
SchemaMismatch: "expected struct dtype, got: `{}`", dtype
70
);
71
},
72
}
73
} else {
74
new_schema.with_column(name.clone(), dtype.clone());
75
}
76
}
77
78
Ok(Cow::Owned(Arc::new(new_schema)))
79
}
80
#[cfg(not(feature = "dtype-struct"))]
81
{
82
panic!("activate feature 'dtype-struct'")
83
}
84
},
85
RowIndex { schema, name, .. } => Ok(Cow::Owned(row_index_schema(
86
schema,
87
input_schema,
88
name.clone(),
89
))),
90
Explode { schema, columns } => explode_schema(schema, input_schema, columns),
91
#[cfg(feature = "pivot")]
92
Unpivot { schema, args } => unpivot_schema(args, schema, input_schema),
93
}
94
}
95
}
96
97
fn row_index_schema(
98
cached_schema: &CachedSchema,
99
input_schema: &SchemaRef,
100
name: PlSmallStr,
101
) -> SchemaRef {
102
let mut guard = cached_schema.lock().unwrap();
103
if let Some(schema) = &*guard {
104
return schema.clone();
105
}
106
let mut schema = (**input_schema).clone();
107
schema.insert_at_index(0, name, IDX_DTYPE).unwrap();
108
let schema_ref = Arc::new(schema);
109
*guard = Some(schema_ref.clone());
110
schema_ref
111
}
112
113
fn explode_schema<'a>(
114
cached_schema: &CachedSchema,
115
schema: &'a Schema,
116
columns: &[PlSmallStr],
117
) -> PolarsResult<Cow<'a, SchemaRef>> {
118
let mut guard = cached_schema.lock().unwrap();
119
if let Some(schema) = &*guard {
120
return Ok(Cow::Owned(schema.clone()));
121
}
122
let mut schema = schema.clone();
123
124
// columns to string
125
columns.iter().try_for_each(|name| {
126
match schema.try_get(name)? {
127
DataType::List(inner) => {
128
schema.with_column(name.clone(), inner.as_ref().clone());
129
},
130
#[cfg(feature = "dtype-array")]
131
DataType::Array(inner, _) => {
132
schema.with_column(name.clone(), inner.as_ref().clone());
133
},
134
_ => {},
135
}
136
137
PolarsResult::Ok(())
138
})?;
139
let schema = Arc::new(schema);
140
*guard = Some(schema.clone());
141
Ok(Cow::Owned(schema))
142
}
143
144
#[cfg(feature = "pivot")]
145
fn unpivot_schema<'a>(
146
args: &UnpivotArgsIR,
147
cached_schema: &CachedSchema,
148
input_schema: &'a Schema,
149
) -> PolarsResult<Cow<'a, SchemaRef>> {
150
let mut guard = cached_schema.lock().unwrap();
151
if let Some(schema) = &*guard {
152
return Ok(Cow::Owned(schema.clone()));
153
}
154
155
let mut new_schema = args
156
.index
157
.iter()
158
.map(|id| Ok(Field::new(id.clone(), input_schema.try_get(id)?.clone())))
159
.collect::<PolarsResult<Schema>>()?;
160
let variable_name = args
161
.variable_name
162
.as_ref()
163
.cloned()
164
.unwrap_or_else(|| "variable".into());
165
let value_name = args
166
.value_name
167
.as_ref()
168
.cloned()
169
.unwrap_or_else(|| "value".into());
170
171
new_schema.with_column(variable_name, DataType::String);
172
173
// We need to determine the supertype of all value columns.
174
let mut supertype = DataType::Null;
175
176
// take all columns that are not in `id_vars` as `value_var`
177
if args.on.is_empty() {
178
let id_vars = PlHashSet::from_iter(&args.index);
179
for (name, dtype) in input_schema.iter() {
180
if !id_vars.contains(name) {
181
supertype = try_get_supertype(&supertype, dtype)?;
182
}
183
}
184
} else {
185
for name in &args.on {
186
let dtype = input_schema.try_get(name)?;
187
supertype = try_get_supertype(&supertype, dtype)?;
188
}
189
}
190
new_schema.with_column(value_name, supertype);
191
let schema = Arc::new(new_schema);
192
*guard = Some(schema.clone());
193
Ok(Cow::Owned(schema))
194
}
195
196