Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-expr/src/expressions/literal.rs
6940 views
1
use std::borrow::Cow;
2
use std::ops::Deref;
3
4
use arrow::temporal_conversions::NANOSECONDS_IN_DAY;
5
use polars_core::prelude::*;
6
use polars_core::utils::NoNull;
7
use polars_plan::constants::get_literal_name;
8
9
use super::*;
10
use crate::expressions::{AggregationContext, PartitionedAggregation, PhysicalExpr};
11
12
pub struct LiteralExpr(pub LiteralValue, Expr);
13
14
impl LiteralExpr {
15
pub fn new(value: LiteralValue, expr: Expr) -> Self {
16
Self(value, expr)
17
}
18
19
fn as_column(&self) -> PolarsResult<Column> {
20
use LiteralValue as L;
21
let column = match &self.0 {
22
L::Scalar(sc) => {
23
#[cfg(feature = "dtype-time")]
24
if let AnyValue::Time(v) = sc.value() {
25
if !(0..NANOSECONDS_IN_DAY).contains(v) {
26
polars_bail!(
27
InvalidOperation: "value `{v}` is out-of-range for `time` which can be 0 - {}",
28
NANOSECONDS_IN_DAY - 1
29
);
30
}
31
}
32
33
sc.clone().into_column(get_literal_name().clone())
34
},
35
L::Series(s) => s.deref().clone().into_column(),
36
lv @ L::Dyn(_) => polars_core::prelude::Series::from_any_values(
37
get_literal_name().clone(),
38
&[lv.to_any_value().unwrap()],
39
false,
40
)
41
.unwrap()
42
.into_column(),
43
L::Range(RangeLiteralValue { low, high, dtype }) => {
44
let low = *low;
45
let high = *high;
46
match dtype {
47
DataType::Int32 => {
48
polars_ensure!(
49
low >= i32::MIN as i128 && high <= i32::MAX as i128,
50
ComputeError: "range not within bounds of `Int32`: [{}, {}]", low, high
51
);
52
let low = low as i32;
53
let high = high as i32;
54
let ca: NoNull<Int32Chunked> = (low..high).collect();
55
ca.into_inner().into_column()
56
},
57
DataType::Int64 => {
58
polars_ensure!(
59
low >= i64::MIN as i128 && high <= i64::MAX as i128,
60
ComputeError: "range not within bounds of `Int32`: [{}, {}]", low, high
61
);
62
let low = low as i64;
63
let high = high as i64;
64
let ca: NoNull<Int64Chunked> = (low..high).collect();
65
ca.into_inner().into_column()
66
},
67
DataType::UInt32 => {
68
polars_ensure!(
69
low >= u32::MIN as i128 && high <= u32::MAX as i128,
70
ComputeError: "range not within bounds of `UInt32`: [{}, {}]", low, high
71
);
72
let low = low as u32;
73
let high = high as u32;
74
let ca: NoNull<UInt32Chunked> = (low..high).collect();
75
ca.into_inner().into_column()
76
},
77
dt => polars_bail!(
78
InvalidOperation: "datatype `{}` is not supported as range", dt
79
),
80
}
81
},
82
};
83
Ok(column)
84
}
85
}
86
87
impl PhysicalExpr for LiteralExpr {
88
fn as_expression(&self) -> Option<&Expr> {
89
Some(&self.1)
90
}
91
92
fn evaluate(&self, _df: &DataFrame, _state: &ExecutionState) -> PolarsResult<Column> {
93
self.as_column()
94
}
95
96
#[allow(clippy::ptr_arg)]
97
fn evaluate_on_groups<'a>(
98
&self,
99
df: &DataFrame,
100
groups: &'a GroupPositions,
101
state: &ExecutionState,
102
) -> PolarsResult<AggregationContext<'a>> {
103
let s = self.evaluate(df, state)?;
104
105
if self.0.is_scalar() {
106
Ok(AggregationContext::from_agg_state(
107
AggState::LiteralScalar(s),
108
Cow::Borrowed(groups),
109
))
110
} else {
111
// A non-scalar literal value expands to those values for every group.
112
113
let lit_length = s.len() as IdxSize;
114
polars_ensure!(
115
(groups.len() as IdxSize).checked_mul(lit_length).is_some(),
116
bigidx,
117
ctx = "group_by",
118
size = groups.len() as u64 * lit_length as u64
119
);
120
let groups = GroupsType::Slice {
121
groups: (0..groups.len() as IdxSize)
122
.map(|i| [i * lit_length, lit_length])
123
.collect(),
124
rolling: false,
125
};
126
let agg_state = AggState::AggregatedList(Column::new_scalar(
127
s.name().clone(),
128
Scalar::new_list(s.take_materialized_series()),
129
groups.len(),
130
));
131
132
let groups = groups.into_sliceable();
133
Ok(AggregationContext::from_agg_state(
134
agg_state,
135
Cow::Owned(groups),
136
))
137
}
138
}
139
140
fn as_partitioned_aggregator(&self) -> Option<&dyn PartitionedAggregation> {
141
Some(self)
142
}
143
144
fn to_field(&self, _input_schema: &Schema) -> PolarsResult<Field> {
145
let dtype = self.0.get_datatype();
146
Ok(Field::new(PlSmallStr::from_static("literal"), dtype))
147
}
148
fn is_literal(&self) -> bool {
149
true
150
}
151
152
fn is_scalar(&self) -> bool {
153
self.0.is_scalar()
154
}
155
}
156
157
impl PartitionedAggregation for LiteralExpr {
158
fn evaluate_partitioned(
159
&self,
160
df: &DataFrame,
161
_groups: &GroupPositions,
162
state: &ExecutionState,
163
) -> PolarsResult<Column> {
164
self.evaluate(df, state)
165
}
166
167
fn finalize(
168
&self,
169
partitioned: Column,
170
_groups: &GroupPositions,
171
_state: &ExecutionState,
172
) -> PolarsResult<Column> {
173
Ok(partitioned)
174
}
175
}
176
177