Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/visitor/hash.rs
8479 views
1
use std::hash::{Hash, Hasher};
2
use std::sync::Arc;
3
4
use polars_utils::arena::Arena;
5
6
use super::*;
7
#[cfg(feature = "python")]
8
use crate::plans::PythonOptions;
9
use crate::plans::{AExpr, IR};
10
use crate::prelude::aexpr::traverse_and_hash_aexpr;
11
use crate::prelude::{ExprIR, PlanCallback};
12
13
impl IRNode {
14
pub(crate) fn hashable_and_cmp<'a>(
15
&'a self,
16
lp_arena: &'a Arena<IR>,
17
expr_arena: &'a Arena<AExpr>,
18
) -> IRHashWrap<'a> {
19
IRHashWrap {
20
node: *self,
21
lp_arena,
22
expr_arena,
23
hash_as_equality: false,
24
}
25
}
26
}
27
28
pub(crate) struct IRHashWrap<'a> {
29
node: IRNode,
30
lp_arena: &'a Arena<IR>,
31
expr_arena: &'a Arena<AExpr>,
32
hash_as_equality: bool,
33
}
34
35
impl IRHashWrap<'_> {
36
pub fn hash_as_equality(mut self) -> Self {
37
self.hash_as_equality = true;
38
self
39
}
40
}
41
42
fn hash_option_expr<H: Hasher>(expr: &Option<ExprIR>, expr_arena: &Arena<AExpr>, state: &mut H) {
43
if let Some(e) = expr {
44
e.traverse_and_hash(expr_arena, state)
45
}
46
}
47
48
fn hash_exprs<H: Hasher>(exprs: &[ExprIR], expr_arena: &Arena<AExpr>, state: &mut H) {
49
for e in exprs {
50
e.traverse_and_hash(expr_arena, state);
51
}
52
}
53
54
/// Specialized Hash that dispatches to `ExprIR::traverse_and_hash` instead of just hashing
55
/// the `Node`.
56
#[cfg(feature = "python")]
57
fn hash_python_predicate<H: Hasher>(
58
pred: &crate::prelude::PythonPredicate,
59
expr_arena: &Arena<AExpr>,
60
state: &mut H,
61
) {
62
use crate::prelude::PythonPredicate;
63
std::mem::discriminant(pred).hash(state);
64
match pred {
65
PythonPredicate::None => {},
66
PythonPredicate::PyArrow(s) => s.hash(state),
67
PythonPredicate::Polars(e) => e.traverse_and_hash(expr_arena, state),
68
}
69
}
70
71
impl Hash for IRHashWrap<'_> {
72
// This hashes the variant, not the whole plan
73
fn hash<H: Hasher>(&self, state: &mut H) {
74
let alp = self.node.to_alp(self.lp_arena);
75
std::mem::discriminant(alp).hash(state);
76
match alp {
77
#[cfg(feature = "python")]
78
IR::PythonScan {
79
options:
80
PythonOptions {
81
scan_fn,
82
schema,
83
output_schema,
84
with_columns,
85
python_source,
86
n_rows,
87
predicate,
88
validate_schema,
89
is_pure,
90
},
91
} => {
92
// Hash the Python function object using the pointer to the object
93
// This should be the same as calling id() in python, but we don't need the GIL
94
95
use std::sync::atomic::AtomicU64;
96
static UNIQUE_COUNT: AtomicU64 = AtomicU64::new(0);
97
if let Some(scan_fn) = scan_fn {
98
let ptr_addr = scan_fn.0.as_ptr() as usize;
99
ptr_addr.hash(state);
100
}
101
// Hash the stable fields
102
// We include the schema since it can be set by the user
103
schema.hash(state);
104
output_schema.hash(state);
105
with_columns.hash(state);
106
python_source.hash(state);
107
n_rows.hash(state);
108
hash_python_predicate(predicate, self.expr_arena, state);
109
validate_schema.hash(state);
110
111
if self.hash_as_equality && !*is_pure {
112
let val = UNIQUE_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
113
val.hash(state)
114
} else {
115
is_pure.hash(state)
116
}
117
},
118
IR::Slice {
119
offset,
120
len,
121
input: _,
122
} => {
123
len.hash(state);
124
offset.hash(state);
125
},
126
IR::Filter {
127
input: _,
128
predicate,
129
} => {
130
predicate.traverse_and_hash(self.expr_arena, state);
131
},
132
IR::Scan {
133
sources,
134
file_info: _,
135
hive_parts: _,
136
predicate,
137
predicate_file_skip_applied: _,
138
output_schema: _,
139
scan_type,
140
unified_scan_args,
141
} => {
142
// We don't have to traverse the schema, hive partitions etc. as they are derivative from the paths.
143
scan_type.hash(state);
144
sources.hash(state);
145
hash_option_expr(predicate, self.expr_arena, state);
146
unified_scan_args.hash(state);
147
},
148
IR::DataFrameScan {
149
df,
150
schema: _,
151
output_schema,
152
..
153
} => {
154
(Arc::as_ptr(df) as usize).hash(state);
155
output_schema.hash(state);
156
},
157
IR::SimpleProjection { columns, input: _ } => {
158
columns.hash(state);
159
},
160
IR::Select {
161
input: _,
162
expr,
163
schema: _,
164
options,
165
} => {
166
hash_exprs(expr, self.expr_arena, state);
167
options.hash(state);
168
},
169
IR::Sort {
170
input: _,
171
by_column,
172
slice,
173
sort_options,
174
} => {
175
hash_exprs(by_column, self.expr_arena, state);
176
slice.hash(state);
177
sort_options.hash(state);
178
},
179
IR::GroupBy {
180
input: _,
181
keys,
182
aggs,
183
schema: _,
184
apply,
185
maintain_order,
186
options,
187
} => {
188
hash_exprs(keys, self.expr_arena, state);
189
hash_exprs(aggs, self.expr_arena, state);
190
191
if let Some(function) = apply {
192
true.hash(state);
193
match function {
194
PlanCallback::Rust(f) => {
195
f.hash(state);
196
},
197
#[cfg(feature = "python")]
198
PlanCallback::Python(f) => {
199
f.hash(state);
200
},
201
}
202
}
203
204
apply.is_none().hash(state);
205
maintain_order.hash(state);
206
options.hash(state);
207
},
208
IR::Join {
209
input_left: _,
210
input_right: _,
211
schema: _,
212
left_on,
213
right_on,
214
options,
215
} => {
216
hash_exprs(left_on, self.expr_arena, state);
217
hash_exprs(right_on, self.expr_arena, state);
218
options.hash(state);
219
},
220
IR::HStack {
221
input: _,
222
exprs,
223
schema: _,
224
options,
225
} => {
226
hash_exprs(exprs, self.expr_arena, state);
227
options.hash(state);
228
},
229
IR::Distinct { input: _, options } => {
230
options.hash(state);
231
},
232
IR::MapFunction { input: _, function } => {
233
function.hash(state);
234
},
235
IR::Union { inputs: _, options } => options.hash(state),
236
IR::HConcat {
237
inputs: _,
238
schema: _,
239
options,
240
} => {
241
options.hash(state);
242
},
243
IR::ExtContext {
244
input: _,
245
contexts,
246
schema: _,
247
} => {
248
for node in contexts {
249
traverse_and_hash_aexpr(*node, self.expr_arena, state);
250
}
251
},
252
IR::Sink { input: _, payload } => {
253
payload.traverse_and_hash(self.expr_arena, state);
254
},
255
IR::SinkMultiple { inputs: _ } => {},
256
IR::Cache { input: _, id } => {
257
id.hash(state);
258
},
259
#[cfg(feature = "merge_sorted")]
260
IR::MergeSorted {
261
input_left: _,
262
input_right: _,
263
key,
264
} => {
265
key.hash(state);
266
},
267
IR::Invalid => unreachable!(),
268
}
269
}
270
}
271
272