Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/schema.rs
8430 views
1
use std::borrow::Cow;
2
use std::ops::Deref;
3
use std::sync::Mutex;
4
5
use arrow::datatypes::ArrowSchemaRef;
6
use either::Either;
7
use polars_core::prelude::*;
8
use polars_utils::idx_vec::UnitVec;
9
use polars_utils::{format_pl_smallstr, unitvec};
10
#[cfg(feature = "serde")]
11
use serde::{Deserialize, Serialize};
12
13
use crate::prelude::*;
14
15
impl DslPlan {
16
// Warning! This should not be used on the DSL internally.
17
// All schema resolving should be done during conversion to [`IR`].
18
19
/// Compute the schema. This requires conversion to [`IR`] and type-resolving.
20
pub fn compute_schema(&self) -> PolarsResult<SchemaRef> {
21
let mut lp_arena = Default::default();
22
let mut expr_arena = Default::default();
23
let node = to_alp(
24
self.clone(),
25
&mut expr_arena,
26
&mut lp_arena,
27
&mut OptFlags::schema_only(),
28
)?;
29
30
Ok(lp_arena.get(node).schema(&lp_arena).into_owned())
31
}
32
}
33
34
#[derive(Clone, Debug)]
35
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
36
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
37
pub struct FileInfo {
38
/// Schema of the physical file.
39
///
40
/// Notes:
41
/// - Does not include logical columns like `include_file_path` and row index.
42
/// - Always includes all hive columns.
43
pub schema: SchemaRef,
44
/// Stores the schema used for the reader, as the main schema can contain
45
/// extra hive columns.
46
pub reader_schema: Option<Either<ArrowSchemaRef, SchemaRef>>,
47
/// - known size
48
/// - estimated size (set to usize::max if unknown).
49
pub row_estimation: (Option<usize>, usize),
50
}
51
52
// Manual default because `row_estimation.1` needs to be `usize::MAX`.
53
impl Default for FileInfo {
54
fn default() -> Self {
55
FileInfo {
56
schema: Default::default(),
57
reader_schema: None,
58
row_estimation: (None, usize::MAX),
59
}
60
}
61
}
62
63
impl FileInfo {
64
/// Constructs a new [`FileInfo`].
65
pub fn new(
66
schema: SchemaRef,
67
reader_schema: Option<Either<ArrowSchemaRef, SchemaRef>>,
68
row_estimation: (Option<usize>, usize),
69
) -> Self {
70
Self {
71
schema,
72
reader_schema,
73
row_estimation,
74
}
75
}
76
77
/// Merge the [`Schema`] of a [`HivePartitions`] with the schema of this [`FileInfo`].
78
pub fn update_schema_with_hive_schema(&mut self, hive_schema: SchemaRef) {
79
let schema = Arc::make_mut(&mut self.schema);
80
81
for field in hive_schema.iter_fields() {
82
if let Some(existing) = schema.get_mut(&field.name) {
83
*existing = field.dtype().clone();
84
} else {
85
schema
86
.insert_at_index(schema.len(), field.name, field.dtype.clone())
87
.unwrap();
88
}
89
}
90
}
91
92
pub fn iter_reader_schema_names(
93
&self,
94
) -> Option<impl '_ + ExactSizeIterator<Item = &PlSmallStr>> {
95
let reader_schema = self.reader_schema.as_ref()?;
96
97
let len = match reader_schema {
98
Either::Left(v) => v.len(),
99
Either::Right(v) => v.len(),
100
};
101
102
Some((0..len).map(move |i| match reader_schema {
103
Either::Left(v) => v.get_at_index(i).unwrap().0,
104
Either::Right(v) => v.get_at_index(i).unwrap().0,
105
}))
106
}
107
}
108
109
pub(crate) fn det_join_schema(
110
schema_left: &SchemaRef,
111
schema_right: &SchemaRef,
112
left_on: &[ExprIR],
113
right_on: &[ExprIR],
114
options: &JoinOptionsIR,
115
expr_arena: &Arena<AExpr>,
116
) -> PolarsResult<SchemaRef> {
117
match &options.args.how {
118
// semi and anti joins are just filtering operations
119
// the schema will never change.
120
#[cfg(feature = "semi_anti_join")]
121
JoinType::Semi | JoinType::Anti => Ok(schema_left.clone()),
122
// Right-join with coalesce enabled will coalesce LHS columns into RHS columns (i.e. LHS columns
123
// are removed). This is the opposite of what a left join does so it has its own codepath.
124
//
125
// E.g. df(cols=[A, B]).right_join(df(cols=[A, B]), on=A, coalesce=True)
126
//
127
// will result in
128
//
129
// df(cols=[B, A, B_right])
130
JoinType::Right if options.args.should_coalesce() => {
131
// Get join names.
132
let mut join_on_left: PlHashSet<_> = PlHashSet::with_capacity(left_on.len());
133
for e in left_on {
134
let field = e.field(schema_left, expr_arena)?;
135
join_on_left.insert(field.name);
136
}
137
138
let mut join_on_right: PlHashSet<_> = PlHashSet::with_capacity(right_on.len());
139
for e in right_on {
140
let field = e.field(schema_right, expr_arena)?;
141
join_on_right.insert(field.name);
142
}
143
144
// For the error message
145
let mut suffixed = None;
146
147
let new_schema = Schema::with_capacity(schema_left.len() + schema_right.len())
148
// Columns from left, excluding those used as join keys
149
.hstack(schema_left.iter().filter_map(|(name, dtype)| {
150
if join_on_left.contains(name) {
151
return None;
152
}
153
154
Some((name.clone(), dtype.clone()))
155
}))?
156
// Columns from right
157
.hstack(schema_right.iter().map(|(name, dtype)| {
158
suffixed = None;
159
160
let in_left_schema = schema_left.contains(name.as_str());
161
let is_coalesced = join_on_left.contains(name.as_str());
162
163
if in_left_schema && !is_coalesced {
164
suffixed = Some(format_pl_smallstr!("{}{}", name, options.args.suffix()));
165
(suffixed.clone().unwrap(), dtype.clone())
166
} else {
167
(name.clone(), dtype.clone())
168
}
169
}))
170
.map_err(|e| {
171
if let Some(column) = suffixed {
172
join_suffix_duplicate_help_msg(&column)
173
} else {
174
e
175
}
176
})?;
177
178
Ok(Arc::new(new_schema))
179
},
180
how => {
181
let mut new_schema = Schema::with_capacity(schema_left.len() + schema_right.len())
182
.hstack(schema_left.iter_fields())?;
183
184
let is_coalesced = options.args.should_coalesce();
185
186
let mut join_on_right: PlIndexSet<_> = PlIndexSet::with_capacity(right_on.len());
187
for e in right_on {
188
let field = e.field(schema_right, expr_arena)?;
189
join_on_right.insert(field.name);
190
}
191
192
let mut right_by: PlHashSet<&PlSmallStr> = PlHashSet::default();
193
#[cfg(feature = "asof_join")]
194
if let JoinType::AsOf(asof_options) = &options.args.how {
195
if let Some(v) = &asof_options.right_by {
196
right_by.extend(v.iter());
197
}
198
}
199
200
for (name, dtype) in schema_right.iter() {
201
// Asof join by columns are coalesced
202
if right_by.contains(name) {
203
// Do not add suffix. The column of the left table will be used
204
continue;
205
}
206
207
if is_coalesced
208
&& let Some(idx) = join_on_right.get_index_of(name)
209
&& {
210
let mut need_to_include_column = false;
211
212
// Handles coalescing of asof-joins.
213
// Asof joins are not equi-joins
214
// so the columns that are joined on, may have different
215
// values so if the right has a different name, it is added to the schema
216
#[cfg(feature = "asof_join")]
217
if matches!(how, JoinType::AsOf(_)) {
218
let field_left = left_on[idx].field(schema_left, expr_arena)?;
219
need_to_include_column = field_left.name != name;
220
}
221
222
!need_to_include_column
223
}
224
{
225
// Column will be coalesced into an already added LHS column.
226
continue;
227
}
228
229
// For the error message.
230
let mut suffixed = None;
231
let (name, dtype) = if schema_left.contains(name) {
232
suffixed = Some(format_pl_smallstr!("{}{}", name, options.args.suffix()));
233
(suffixed.clone().unwrap(), dtype.clone())
234
} else {
235
(name.clone(), dtype.clone())
236
};
237
238
new_schema.try_insert(name, dtype).map_err(|e| {
239
if let Some(column) = suffixed {
240
join_suffix_duplicate_help_msg(&column)
241
} else {
242
e
243
}
244
})?;
245
}
246
247
Ok(Arc::new(new_schema))
248
},
249
}
250
}
251
252
pub(crate) fn validate_arrow_schema_conversion(
253
input_schema: &Schema,
254
expected_arrow_schema: &ArrowSchema,
255
compat_level: CompatLevel,
256
) -> PolarsResult<()> {
257
polars_ensure!(
258
input_schema.len() == expected_arrow_schema.len()
259
&& input_schema
260
.iter_names()
261
.zip(expected_arrow_schema.iter_names())
262
.all(|(l, r)| l == r),
263
SchemaMismatch:
264
"schema names in arrow_schema differ: {:?} != arrow schema names: {:?}",
265
input_schema.iter_names().collect::<Vec<_>>().as_slice(),
266
expected_arrow_schema.iter_names().collect::<Vec<_>>().as_slice(),
267
);
268
269
for (input_pl_dtype, output_arrow_field) in input_schema
270
.iter_values()
271
.zip(expected_arrow_schema.iter_values())
272
{
273
Series::new_empty(PlSmallStr::EMPTY, input_pl_dtype).to_arrow_with_field(
274
0,
275
compat_level,
276
Some(output_arrow_field),
277
)?;
278
}
279
280
Ok(())
281
}
282
283
fn join_suffix_duplicate_help_msg(column_name: &str) -> PolarsError {
284
polars_err!(
285
Duplicate:
286
"\
287
column with name '{}' already exists
288
289
You may want to try:
290
- renaming the column prior to joining
291
- using the `suffix` parameter to specify a suffix different to the default one ('_right')",
292
column_name
293
)
294
}
295
296
// We don't use an `Arc<Mutex>` because caches should live in different query plans.
297
// For that reason we have a specialized deep clone.
298
#[derive(Default)]
299
pub struct CachedSchema(Mutex<Option<SchemaRef>>);
300
301
impl AsRef<Mutex<Option<SchemaRef>>> for CachedSchema {
302
fn as_ref(&self) -> &Mutex<Option<SchemaRef>> {
303
&self.0
304
}
305
}
306
307
impl Deref for CachedSchema {
308
type Target = Mutex<Option<SchemaRef>>;
309
310
fn deref(&self) -> &Self::Target {
311
&self.0
312
}
313
}
314
315
impl Clone for CachedSchema {
316
fn clone(&self) -> Self {
317
let inner = self.0.lock().unwrap();
318
Self(Mutex::new(inner.clone()))
319
}
320
}
321
322
impl CachedSchema {
323
pub fn get(&self) -> Option<SchemaRef> {
324
self.0.lock().unwrap().clone()
325
}
326
}
327
328
pub fn get_input(lp_arena: &Arena<IR>, lp_node: Node) -> UnitVec<Node> {
329
let plan = lp_arena.get(lp_node);
330
let mut inputs: UnitVec<Node> = unitvec!();
331
332
// Used to get the schema of the input.
333
if is_scan(plan) {
334
inputs.push(lp_node);
335
} else {
336
plan.copy_inputs(&mut inputs);
337
};
338
inputs
339
}
340
341
/// Retrieves the schema of the first LP input, or that of the `lp_node` if there
342
/// are no inputs.
343
///
344
/// # Panics
345
/// Panics if this `lp_node` does not have inputs and is not a `Scan` or `PythonScan`.
346
pub fn get_input_schema(lp_arena: &Arena<IR>, lp_node: Node) -> Cow<'_, SchemaRef> {
347
let inputs = get_input(lp_arena, lp_node);
348
if inputs.is_empty() {
349
// Files don't have an input, so we must take their schema.
350
Cow::Borrowed(lp_arena.get(lp_node).scan_schema())
351
} else {
352
let input = inputs[0];
353
lp_arena.get(input).schema(lp_arena)
354
}
355
}
356
357