Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/schema.rs
6940 views
1
use std::borrow::Cow;
2
use std::ops::Deref;
3
use std::sync::Mutex;
4
5
use arrow::datatypes::ArrowSchemaRef;
6
use either::Either;
7
use polars_core::prelude::*;
8
use polars_utils::idx_vec::UnitVec;
9
use polars_utils::{format_pl_smallstr, unitvec};
10
#[cfg(feature = "serde")]
11
use serde::{Deserialize, Serialize};
12
13
use crate::prelude::*;
14
15
impl DslPlan {
16
// Warning! This should not be used on the DSL internally.
17
// All schema resolving should be done during conversion to [`IR`].
18
19
/// Compute the schema. This requires conversion to [`IR`] and type-resolving.
20
pub fn compute_schema(&self) -> PolarsResult<SchemaRef> {
21
let mut lp_arena = Default::default();
22
let mut expr_arena = Default::default();
23
let node = to_alp(
24
self.clone(),
25
&mut expr_arena,
26
&mut lp_arena,
27
&mut OptFlags::schema_only(),
28
)?;
29
30
Ok(lp_arena.get(node).schema(&lp_arena).into_owned())
31
}
32
}
33
34
#[derive(Clone, Debug)]
35
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
36
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
37
pub struct FileInfo {
38
/// Schema of the physical file.
39
///
40
/// Notes:
41
/// - Does not include logical columns like `include_file_path` and row index.
42
/// - Always includes all hive columns.
43
pub schema: SchemaRef,
44
/// Stores the schema used for the reader, as the main schema can contain
45
/// extra hive columns.
46
pub reader_schema: Option<Either<ArrowSchemaRef, SchemaRef>>,
47
/// - known size
48
/// - estimated size (set to usize::max if unknown).
49
pub row_estimation: (Option<usize>, usize),
50
}
51
52
// Manual default because `row_estimation.1` needs to be `usize::MAX`.
53
impl Default for FileInfo {
54
fn default() -> Self {
55
FileInfo {
56
schema: Default::default(),
57
reader_schema: None,
58
row_estimation: (None, usize::MAX),
59
}
60
}
61
}
62
63
impl FileInfo {
64
/// Constructs a new [`FileInfo`].
65
pub fn new(
66
schema: SchemaRef,
67
reader_schema: Option<Either<ArrowSchemaRef, SchemaRef>>,
68
row_estimation: (Option<usize>, usize),
69
) -> Self {
70
Self {
71
schema,
72
reader_schema,
73
row_estimation,
74
}
75
}
76
77
/// Merge the [`Schema`] of a [`HivePartitions`] with the schema of this [`FileInfo`].
78
pub fn update_schema_with_hive_schema(&mut self, hive_schema: SchemaRef) {
79
let schema = Arc::make_mut(&mut self.schema);
80
81
for field in hive_schema.iter_fields() {
82
if let Some(existing) = schema.get_mut(&field.name) {
83
*existing = field.dtype().clone();
84
} else {
85
schema
86
.insert_at_index(schema.len(), field.name, field.dtype.clone())
87
.unwrap();
88
}
89
}
90
}
91
}
92
93
pub(crate) fn det_join_schema(
94
schema_left: &SchemaRef,
95
schema_right: &SchemaRef,
96
left_on: &[ExprIR],
97
right_on: &[ExprIR],
98
options: &JoinOptionsIR,
99
expr_arena: &Arena<AExpr>,
100
) -> PolarsResult<SchemaRef> {
101
match &options.args.how {
102
// semi and anti joins are just filtering operations
103
// the schema will never change.
104
#[cfg(feature = "semi_anti_join")]
105
JoinType::Semi | JoinType::Anti => Ok(schema_left.clone()),
106
// Right-join with coalesce enabled will coalesce LHS columns into RHS columns (i.e. LHS columns
107
// are removed). This is the opposite of what a left join does so it has its own codepath.
108
//
109
// E.g. df(cols=[A, B]).right_join(df(cols=[A, B]), on=A, coalesce=True)
110
//
111
// will result in
112
//
113
// df(cols=[B, A, B_right])
114
JoinType::Right if options.args.should_coalesce() => {
115
// Get join names.
116
let mut join_on_left: PlHashSet<_> = PlHashSet::with_capacity(left_on.len());
117
for e in left_on {
118
let field = e.field(schema_left, expr_arena)?;
119
join_on_left.insert(field.name);
120
}
121
122
let mut join_on_right: PlHashSet<_> = PlHashSet::with_capacity(right_on.len());
123
for e in right_on {
124
let field = e.field(schema_right, expr_arena)?;
125
join_on_right.insert(field.name);
126
}
127
128
// For the error message
129
let mut suffixed = None;
130
131
let new_schema = Schema::with_capacity(schema_left.len() + schema_right.len())
132
// Columns from left, excluding those used as join keys
133
.hstack(schema_left.iter().filter_map(|(name, dtype)| {
134
if join_on_left.contains(name) {
135
return None;
136
}
137
138
Some((name.clone(), dtype.clone()))
139
}))?
140
// Columns from right
141
.hstack(schema_right.iter().map(|(name, dtype)| {
142
suffixed = None;
143
144
let in_left_schema = schema_left.contains(name.as_str());
145
let is_coalesced = join_on_left.contains(name.as_str());
146
147
if in_left_schema && !is_coalesced {
148
suffixed = Some(format_pl_smallstr!("{}{}", name, options.args.suffix()));
149
(suffixed.clone().unwrap(), dtype.clone())
150
} else {
151
(name.clone(), dtype.clone())
152
}
153
}))
154
.map_err(|e| {
155
if let Some(column) = suffixed {
156
join_suffix_duplicate_help_msg(&column)
157
} else {
158
e
159
}
160
})?;
161
162
Ok(Arc::new(new_schema))
163
},
164
how => {
165
let mut new_schema = Schema::with_capacity(schema_left.len() + schema_right.len())
166
.hstack(schema_left.iter_fields())?;
167
168
let is_coalesced = options.args.should_coalesce();
169
170
let mut join_on_right: PlIndexSet<_> = PlIndexSet::with_capacity(right_on.len());
171
for e in right_on {
172
let field = e.field(schema_right, expr_arena)?;
173
join_on_right.insert(field.name);
174
}
175
176
let mut right_by: PlHashSet<&PlSmallStr> = PlHashSet::default();
177
#[cfg(feature = "asof_join")]
178
if let JoinType::AsOf(asof_options) = &options.args.how {
179
if let Some(v) = &asof_options.right_by {
180
right_by.extend(v.iter());
181
}
182
}
183
184
for (name, dtype) in schema_right.iter() {
185
// Asof join by columns are coalesced
186
if right_by.contains(name) {
187
// Do not add suffix. The column of the left table will be used
188
continue;
189
}
190
191
if is_coalesced
192
&& let Some(idx) = join_on_right.get_index_of(name)
193
&& {
194
let mut need_to_include_column = false;
195
196
// Handles coalescing of asof-joins.
197
// Asof joins are not equi-joins
198
// so the columns that are joined on, may have different
199
// values so if the right has a different name, it is added to the schema
200
#[cfg(feature = "asof_join")]
201
if matches!(how, JoinType::AsOf(_)) {
202
let field_left = left_on[idx].field(schema_left, expr_arena)?;
203
need_to_include_column = field_left.name != name;
204
}
205
206
!need_to_include_column
207
}
208
{
209
// Column will be coalesced into an already added LHS column.
210
continue;
211
}
212
213
// For the error message.
214
let mut suffixed = None;
215
let (name, dtype) = if schema_left.contains(name) {
216
suffixed = Some(format_pl_smallstr!("{}{}", name, options.args.suffix()));
217
(suffixed.clone().unwrap(), dtype.clone())
218
} else {
219
(name.clone(), dtype.clone())
220
};
221
222
new_schema.try_insert(name, dtype).map_err(|e| {
223
if let Some(column) = suffixed {
224
join_suffix_duplicate_help_msg(&column)
225
} else {
226
e
227
}
228
})?;
229
}
230
231
Ok(Arc::new(new_schema))
232
},
233
}
234
}
235
236
fn join_suffix_duplicate_help_msg(column_name: &str) -> PolarsError {
237
polars_err!(
238
Duplicate:
239
"\
240
column with name '{}' already exists
241
242
You may want to try:
243
- renaming the column prior to joining
244
- using the `suffix` parameter to specify a suffix different to the default one ('_right')",
245
column_name
246
)
247
}
248
249
// We don't use an `Arc<Mutex>` because caches should live in different query plans.
250
// For that reason we have a specialized deep clone.
251
#[derive(Default)]
252
pub struct CachedSchema(Mutex<Option<SchemaRef>>);
253
254
impl AsRef<Mutex<Option<SchemaRef>>> for CachedSchema {
255
fn as_ref(&self) -> &Mutex<Option<SchemaRef>> {
256
&self.0
257
}
258
}
259
260
impl Deref for CachedSchema {
261
type Target = Mutex<Option<SchemaRef>>;
262
263
fn deref(&self) -> &Self::Target {
264
&self.0
265
}
266
}
267
268
impl Clone for CachedSchema {
269
fn clone(&self) -> Self {
270
let inner = self.0.lock().unwrap();
271
Self(Mutex::new(inner.clone()))
272
}
273
}
274
275
impl CachedSchema {
276
pub fn get(&self) -> Option<SchemaRef> {
277
self.0.lock().unwrap().clone()
278
}
279
}
280
281
pub fn get_input(lp_arena: &Arena<IR>, lp_node: Node) -> UnitVec<Node> {
282
let plan = lp_arena.get(lp_node);
283
let mut inputs: UnitVec<Node> = unitvec!();
284
285
// Used to get the schema of the input.
286
if is_scan(plan) {
287
inputs.push(lp_node);
288
} else {
289
plan.copy_inputs(&mut inputs);
290
};
291
inputs
292
}
293
294
/// Retrieves the schema of the first LP input, or that of the `lp_node` if there
295
/// are no inputs.
296
///
297
/// # Panics
298
/// Panics if this `lp_node` does not have inputs and is not a `Scan` or `PythonScan`.
299
pub fn get_input_schema(lp_arena: &Arena<IR>, lp_node: Node) -> Cow<'_, SchemaRef> {
300
let inputs = get_input(lp_arena, lp_node);
301
if inputs.is_empty() {
302
// Files don't have an input, so we must take their schema.
303
Cow::Borrowed(lp_arena.get(lp_node).scan_schema())
304
} else {
305
let input = inputs[0];
306
lp_arena.get(input).schema(lp_arena)
307
}
308
}
309
310