Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/options.rs
8430 views
1
use bitflags::bitflags;
2
use polars_core::prelude::*;
3
use polars_core::utils::SuperTypeOptions;
4
use polars_utils::bool::UnsafeBool;
5
#[cfg(feature = "serde")]
6
use serde::{Deserialize, Serialize};
7
8
use crate::plans::PlSmallStr;
9
10
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
11
#[cfg_attr(feature = "ir_serde", derive(Serialize, Deserialize))]
12
pub struct DistinctOptionsIR {
13
/// Subset of columns that will be taken into account.
14
pub subset: Option<Arc<[PlSmallStr]>>,
15
/// This will maintain the order of the input.
16
/// Note that this is more expensive.
17
/// `maintain_order` is not supported in the streaming
18
/// engine.
19
pub maintain_order: bool,
20
/// Which rows to keep.
21
pub keep_strategy: UniqueKeepStrategy,
22
/// Take only a slice of the result
23
pub slice: Option<(i64, usize)>,
24
}
25
26
#[cfg(feature = "dsl-schema")]
27
impl schemars::JsonSchema for FunctionFlags {
28
fn schema_name() -> std::borrow::Cow<'static, str> {
29
"FunctionFlags".into()
30
}
31
32
fn schema_id() -> std::borrow::Cow<'static, str> {
33
std::borrow::Cow::Borrowed(concat!(module_path!(), "::", "FunctionFlags"))
34
}
35
36
fn json_schema(_generator: &mut schemars::SchemaGenerator) -> schemars::Schema {
37
use schemars::json_schema;
38
use serde_json::{Map, Value};
39
40
// Add a map of flag names and bit patterns to detect schema changes
41
let name_to_bits: Map<String, Value> = Self::all()
42
.iter_names()
43
.map(|(name, flag)| (name.to_owned(), flag.bits().into()))
44
.collect();
45
46
json_schema!({
47
"type": "string",
48
"format": "bitflags",
49
"bitflags": name_to_bits
50
})
51
}
52
}
53
54
bitflags!(
55
#[repr(transparent)]
56
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
57
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
58
pub struct FunctionFlags: u16 {
59
/// The physical expression may rename the output of this function.
60
/// If set to `false` the physical engine will ensure the left input
61
/// expression is the output name.
62
const ALLOW_RENAME = 1 << 0;
63
/// if set, then the `Series` passed to the function in the group_by operation
64
/// will ensure the name is set. This is an extra heap allocation per group.
65
const PASS_NAME_TO_APPLY = 1 << 1;
66
/// There can be two ways of expanding wildcards:
67
///
68
/// Say the schema is 'a', 'b' and there is a function `f`. In this case, `f('*')` can expand
69
/// to:
70
/// 1. `f('a', 'b')`
71
/// 2. `f('a'), f('b')`
72
///
73
/// Setting this to true, will lead to behavior 1.
74
///
75
/// This also accounts for regex expansion.
76
const INPUT_WILDCARD_EXPANSION = 1 << 2;
77
/// Automatically explode on unit length if it ran as final aggregation.
78
///
79
/// this is the case for aggregations like sum, min, covariance etc.
80
/// We need to know this because we cannot see the difference between
81
/// the following functions based on the output type and number of elements:
82
///
83
/// x: {1, 2, 3}
84
///
85
/// head_1(x) -> {1}
86
/// sum(x) -> {4}
87
///
88
/// mutually exclusive with `RETURNS_SCALAR`
89
const RETURNS_SCALAR = 1 << 3;
90
/// This can happen with UDF's that use Polars within the UDF.
91
/// This can lead to recursively entering the engine and sometimes deadlocks.
92
/// This flag must be set to handle that.
93
const OPTIONAL_RE_ENTRANT = 1 << 4;
94
/// Whether this function allows no inputs.
95
const ALLOW_EMPTY_INPUTS = 1 << 5;
96
97
/// Given a function f and a column of values [v1, ..., vn]
98
/// f is row-separable i.f.f.
99
/// f([v1, ..., vn]) = concat(f(v1, ... vm), f(vm+1, ..., vn))
100
const ROW_SEPARABLE = 1 << 6;
101
/// Given a function f and a column of values [v1, ..., vn]
102
/// f is length preserving i.f.f. len(f([v1, ..., vn])) = n
103
///
104
/// mutually exclusive with `RETURNS_SCALAR`
105
const LENGTH_PRESERVING = 1 << 7;
106
/// NULLs on the first input are propagated to the output.
107
const PRESERVES_NULL_FIRST_INPUT = 1 << 8;
108
/// NULLs on any input are propagated to the output.
109
const PRESERVES_NULL_ALL_INPUTS = 1 << 9;
110
111
/// Indicates that this expression does not observe the ordering of its input(s).
112
const NON_ORDER_OBSERVING = 1 << 10;
113
114
/// Indicates that the ordering of the inputs to this expression is not observable
115
/// in its output.
116
const TERMINATES_INPUT_ORDER = 1 << 11;
117
118
/// Indicates that this expression does not produce any ordering into its output.
119
const NON_ORDER_PRODUCING = 1 << 12;
120
}
121
);
122
123
impl FunctionFlags {
124
pub fn set_elementwise(&mut self) {
125
*self |= Self::ROW_SEPARABLE | Self::LENGTH_PRESERVING;
126
}
127
128
pub fn is_elementwise(self) -> bool {
129
self.contains(Self::ROW_SEPARABLE | Self::LENGTH_PRESERVING)
130
}
131
132
pub fn is_row_separable(self) -> bool {
133
self.contains(Self::ROW_SEPARABLE)
134
}
135
136
pub fn is_length_preserving(self) -> bool {
137
self.contains(Self::LENGTH_PRESERVING)
138
}
139
140
pub fn observes_input_order(self) -> bool {
141
let non_order_observing =
142
self.contains(Self::NON_ORDER_OBSERVING) | self.contains(Self::ROW_SEPARABLE);
143
144
!non_order_observing
145
}
146
147
pub fn terminates_input_order(self) -> bool {
148
self.contains(Self::TERMINATES_INPUT_ORDER) | self.contains(Self::RETURNS_SCALAR)
149
}
150
151
pub fn non_order_producing(self) -> bool {
152
self.contains(Self::NON_ORDER_PRODUCING)
153
| self.contains(Self::RETURNS_SCALAR)
154
| self.is_elementwise()
155
}
156
157
pub fn returns_scalar(self) -> bool {
158
self.contains(Self::RETURNS_SCALAR)
159
}
160
}
161
162
impl Default for FunctionFlags {
163
fn default() -> Self {
164
Self::from_bits_truncate(0)
165
}
166
}
167
168
#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash)]
169
pub enum CastingRules {
170
/// Whether information may be lost during cast. E.g. a float to int is considered lossy,
171
/// whereas int to int is considered lossless.
172
/// Overflowing is not considered in this flag, that's handled in `strict` casting
173
FirstArgLossless,
174
Supertype(SuperTypeOptions),
175
}
176
177
impl CastingRules {
178
pub fn cast_to_supertypes() -> CastingRules {
179
Self::Supertype(Default::default())
180
}
181
}
182
183
#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash, Default)]
184
#[cfg_attr(any(feature = "serde"), derive(Serialize, Deserialize))]
185
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
186
pub struct FunctionOptions {
187
// Validate the output of a `map`.
188
// this should always be true or we could OOB
189
pub check_lengths: UnsafeBool,
190
pub flags: FunctionFlags,
191
192
/// Options used when deciding how to cast the arguments of the function.
193
#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]
194
pub cast_options: Option<CastingRules>,
195
}
196
197
impl FunctionOptions {
198
#[cfg(feature = "fused")]
199
pub(crate) unsafe fn no_check_lengths(&mut self) {
200
unsafe { self.check_lengths = UnsafeBool::new_false() };
201
}
202
pub fn check_lengths(&self) -> bool {
203
*self.check_lengths
204
}
205
206
pub fn set_elementwise(&mut self) {
207
self.flags.set_elementwise();
208
}
209
210
pub fn is_elementwise(&self) -> bool {
211
self.flags.is_elementwise()
212
}
213
214
pub fn is_length_preserving(&self) -> bool {
215
self.flags.contains(FunctionFlags::LENGTH_PRESERVING)
216
}
217
218
pub fn is_row_separable(&self) -> bool {
219
self.flags.is_row_separable()
220
}
221
222
pub fn returns_scalar(&self) -> bool {
223
self.flags.returns_scalar()
224
}
225
226
pub fn elementwise() -> FunctionOptions {
227
FunctionOptions {
228
..Default::default()
229
}
230
.with_flags(|f| f | FunctionFlags::ROW_SEPARABLE | FunctionFlags::LENGTH_PRESERVING)
231
}
232
233
pub fn elementwise_with_infer() -> FunctionOptions {
234
Self::length_preserving()
235
}
236
237
pub fn row_separable() -> FunctionOptions {
238
FunctionOptions {
239
..Default::default()
240
}
241
.with_flags(|f| f | FunctionFlags::ROW_SEPARABLE)
242
}
243
244
pub fn length_preserving() -> FunctionOptions {
245
FunctionOptions {
246
..Default::default()
247
}
248
.with_flags(|f| f | FunctionFlags::LENGTH_PRESERVING)
249
}
250
251
pub fn groupwise() -> FunctionOptions {
252
FunctionOptions {
253
..Default::default()
254
}
255
}
256
257
pub fn aggregation() -> FunctionOptions {
258
let mut options = Self::groupwise();
259
options.flags |= FunctionFlags::RETURNS_SCALAR;
260
options
261
}
262
263
pub fn with_supertyping(self, supertype_options: SuperTypeOptions) -> FunctionOptions {
264
self.with_casting_rules(CastingRules::Supertype(supertype_options))
265
}
266
267
pub fn with_casting_rules(mut self, casting_rules: CastingRules) -> FunctionOptions {
268
self.cast_options = Some(casting_rules);
269
self
270
}
271
272
pub fn flag(mut self, flags: FunctionFlags) -> FunctionOptions {
273
self.flags |= flags;
274
self
275
}
276
277
pub fn with_flags(mut self, f: impl Fn(FunctionFlags) -> FunctionFlags) -> FunctionOptions {
278
self.flags = f(self.flags);
279
self
280
}
281
}
282
283
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
284
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
285
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
286
pub struct ProjectionOptions {
287
pub run_parallel: bool,
288
pub duplicate_check: bool,
289
// Should length-1 Series be broadcast to the length of the dataframe.
290
// Only used by CSE optimizer
291
pub should_broadcast: bool,
292
}
293
294
impl Default for ProjectionOptions {
295
fn default() -> Self {
296
Self {
297
run_parallel: true,
298
duplicate_check: true,
299
should_broadcast: true,
300
}
301
}
302
}
303
304
impl ProjectionOptions {
305
/// Conservatively merge the options of two [`ProjectionOptions`]
306
pub fn merge_options(&self, other: &Self) -> Self {
307
Self {
308
run_parallel: self.run_parallel & other.run_parallel,
309
duplicate_check: self.duplicate_check & other.duplicate_check,
310
should_broadcast: self.should_broadcast | other.should_broadcast,
311
}
312
}
313
}
314
315