Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-core/src/frame/column/partitioned.rs
6940 views
1
use std::borrow::Cow;
2
use std::convert::identity;
3
use std::sync::{Arc, OnceLock};
4
5
use polars_error::{PolarsResult, polars_ensure};
6
use polars_utils::IdxSize;
7
use polars_utils::pl_str::PlSmallStr;
8
9
use super::{AnyValue, Column, DataType, Field, IntoColumn, Series};
10
use crate::chunked_array::cast::CastOptions;
11
use crate::frame::Scalar;
12
use crate::series::IsSorted;
13
14
#[derive(Debug, Clone)]
15
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
16
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
17
pub struct PartitionedColumn {
18
name: PlSmallStr,
19
20
values: Series,
21
ends: Arc<[IdxSize]>,
22
23
#[cfg_attr(feature = "serde", serde(skip, default))]
24
materialized: OnceLock<Series>,
25
}
26
27
impl IntoColumn for PartitionedColumn {
28
fn into_column(self) -> Column {
29
Column::Partitioned(self)
30
}
31
}
32
33
impl From<PartitionedColumn> for Column {
34
fn from(value: PartitionedColumn) -> Self {
35
value.into_column()
36
}
37
}
38
39
fn verify_invariants(values: &Series, ends: &[IdxSize]) -> PolarsResult<()> {
40
polars_ensure!(
41
values.len() == ends.len(),
42
ComputeError: "partitioned column `values` length does not match `ends` length ({} != {})",
43
values.len(),
44
ends.len()
45
);
46
47
for vs in ends.windows(2) {
48
polars_ensure!(
49
vs[0] <= vs[1],
50
ComputeError: "partitioned column `ends` are not monotonely non-decreasing",
51
);
52
}
53
54
Ok(())
55
}
56
57
impl PartitionedColumn {
58
pub fn new(name: PlSmallStr, values: Series, ends: Arc<[IdxSize]>) -> Self {
59
Self::try_new(name, values, ends).unwrap()
60
}
61
62
/// # Safety
63
///
64
/// Safe if:
65
/// - `values.len() == ends.len()`
66
/// - all values can have `dtype`
67
/// - `ends` is monotonely non-decreasing
68
pub unsafe fn new_unchecked(name: PlSmallStr, values: Series, ends: Arc<[IdxSize]>) -> Self {
69
if cfg!(debug_assertions) {
70
verify_invariants(&values, ends.as_ref()).unwrap();
71
}
72
73
let values = values.rechunk();
74
Self {
75
name,
76
values,
77
ends,
78
materialized: OnceLock::new(),
79
}
80
}
81
82
pub fn try_new(name: PlSmallStr, values: Series, ends: Arc<[IdxSize]>) -> PolarsResult<Self> {
83
verify_invariants(&values, ends.as_ref())?;
84
85
// SAFETY: Invariants checked before
86
Ok(unsafe { Self::new_unchecked(name, values, ends) })
87
}
88
89
pub fn new_empty(name: PlSmallStr, dtype: DataType) -> Self {
90
Self {
91
name,
92
values: Series::new_empty(PlSmallStr::EMPTY, &dtype),
93
ends: Arc::default(),
94
95
materialized: OnceLock::new(),
96
}
97
}
98
99
pub fn len(&self) -> usize {
100
self.ends.last().map_or(0, |last| *last as usize)
101
}
102
103
pub fn is_empty(&self) -> bool {
104
self.len() == 0
105
}
106
107
pub fn name(&self) -> &PlSmallStr {
108
&self.name
109
}
110
111
pub fn dtype(&self) -> &DataType {
112
self.values.dtype()
113
}
114
115
#[inline]
116
pub fn field(&self) -> Cow<'_, Field> {
117
match self.lazy_as_materialized_series() {
118
None => Cow::Owned(Field::new(self.name().clone(), self.dtype().clone())),
119
Some(s) => s.field(),
120
}
121
}
122
123
pub fn rename(&mut self, name: PlSmallStr) -> &mut Self {
124
self.name = name;
125
self
126
}
127
128
fn _to_series(name: PlSmallStr, values: &Series, ends: &[IdxSize]) -> Series {
129
let dtype = values.dtype();
130
let mut column = Column::Series(Series::new_empty(name, dtype).into());
131
132
let mut prev_offset = 0;
133
for (i, &offset) in ends.iter().enumerate() {
134
// @TODO: Optimize
135
let length = offset - prev_offset;
136
column
137
.extend(&Column::new_scalar(
138
PlSmallStr::EMPTY,
139
Scalar::new(dtype.clone(), values.get(i).unwrap().into_static()),
140
length as usize,
141
))
142
.unwrap();
143
prev_offset = offset;
144
}
145
146
debug_assert_eq!(column.len(), prev_offset as usize);
147
148
column.take_materialized_series()
149
}
150
151
/// Materialize the [`PartitionedColumn`] into a [`Series`].
152
fn to_series(&self) -> Series {
153
Self::_to_series(self.name.clone(), &self.values, &self.ends)
154
}
155
156
/// Get the [`PartitionedColumn`] as [`Series`] if it was already materialized.
157
pub fn lazy_as_materialized_series(&self) -> Option<&Series> {
158
self.materialized.get()
159
}
160
161
/// Get the [`PartitionedColumn`] as [`Series`]
162
///
163
/// This needs to materialize upon the first call. Afterwards, this is cached.
164
pub fn as_materialized_series(&self) -> &Series {
165
self.materialized.get_or_init(|| self.to_series())
166
}
167
168
/// Take the [`PartitionedColumn`] and materialize as a [`Series`] if not already done.
169
pub fn take_materialized_series(self) -> Series {
170
self.materialized
171
.into_inner()
172
.unwrap_or_else(|| Self::_to_series(self.name, &self.values, &self.ends))
173
}
174
175
pub fn apply_unary_elementwise(&self, f: impl Fn(&Series) -> Series) -> Self {
176
let result = f(&self.values).rechunk();
177
assert_eq!(self.values.len(), result.len());
178
unsafe { Self::new_unchecked(self.name.clone(), result, self.ends.clone()) }
179
}
180
181
pub fn try_apply_unary_elementwise(
182
&self,
183
f: impl Fn(&Series) -> PolarsResult<Series>,
184
) -> PolarsResult<Self> {
185
let result = f(&self.values)?.rechunk();
186
assert_eq!(self.values.len(), result.len());
187
Ok(unsafe { Self::new_unchecked(self.name.clone(), result, self.ends.clone()) })
188
}
189
190
pub fn extend_constant(&self, value: AnyValue, n: usize) -> PolarsResult<Self> {
191
let mut new_ends = self.ends.to_vec();
192
// @TODO: IdxSize checks
193
let new_length = (self.len() + n) as IdxSize;
194
195
let values = if !self.is_empty() && self.values.last().value() == &value {
196
*new_ends.last_mut().unwrap() = new_length;
197
self.values.clone()
198
} else {
199
new_ends.push(new_length);
200
self.values.extend_constant(value, 1)?
201
};
202
203
Ok(unsafe { Self::new_unchecked(self.name.clone(), values, new_ends.into()) })
204
}
205
206
pub unsafe fn get_unchecked(&self, index: usize) -> AnyValue<'_> {
207
debug_assert!(index < self.len());
208
209
// Common situation get_unchecked(0)
210
if index < self.ends[0] as usize {
211
return unsafe { self.get_unchecked(0) };
212
}
213
214
let value_idx = self
215
.ends
216
.binary_search(&(index as IdxSize))
217
.map_or_else(identity, identity);
218
219
self.get_unchecked(value_idx)
220
}
221
222
pub fn min_reduce(&self) -> PolarsResult<Scalar> {
223
self.values.min_reduce()
224
}
225
pub fn max_reduce(&self) -> Result<Scalar, polars_error::PolarsError> {
226
self.values.max_reduce()
227
}
228
229
pub fn reverse(&self) -> Self {
230
let values = self.values.reverse();
231
let mut ends = Vec::with_capacity(self.ends.len());
232
233
let mut offset = 0;
234
ends.extend(self.ends.windows(2).rev().map(|vs| {
235
offset += vs[1] - vs[0];
236
offset
237
}));
238
ends.push(self.len() as IdxSize);
239
240
unsafe { Self::new_unchecked(self.name.clone(), values, ends.into()) }
241
}
242
243
pub fn set_sorted_flag(&mut self, sorted: IsSorted) {
244
self.values.set_sorted_flag(sorted);
245
}
246
247
pub fn cast_with_options(&self, dtype: &DataType, options: CastOptions) -> PolarsResult<Self> {
248
let values = self.values.cast_with_options(dtype, options)?;
249
Ok(unsafe { Self::new_unchecked(self.name.clone(), values, self.ends.clone()) })
250
}
251
252
pub fn strict_cast(&self, dtype: &DataType) -> PolarsResult<Self> {
253
let values = self.values.strict_cast(dtype)?;
254
Ok(unsafe { Self::new_unchecked(self.name.clone(), values, self.ends.clone()) })
255
}
256
257
pub fn cast(&self, dtype: &DataType) -> PolarsResult<Self> {
258
let values = self.values.cast(dtype)?;
259
Ok(unsafe { Self::new_unchecked(self.name.clone(), values, self.ends.clone()) })
260
}
261
262
pub unsafe fn cast_unchecked(&self, dtype: &DataType) -> PolarsResult<Self> {
263
let values = unsafe { self.values.cast_unchecked(dtype) }?;
264
Ok(unsafe { Self::new_unchecked(self.name.clone(), values, self.ends.clone()) })
265
}
266
267
pub fn null_count(&self) -> usize {
268
match self.lazy_as_materialized_series() {
269
Some(s) => s.null_count(),
270
None => {
271
// @partition-opt
272
self.as_materialized_series().null_count()
273
},
274
}
275
}
276
277
pub fn clear(&self) -> Self {
278
Self::new_empty(self.name.clone(), self.values.dtype().clone())
279
}
280
281
pub fn partitions(&self) -> &Series {
282
&self.values
283
}
284
pub fn partition_ends(&self) -> &[IdxSize] {
285
&self.ends
286
}
287
288
pub fn partition_ends_ref(&self) -> &Arc<[IdxSize]> {
289
&self.ends
290
}
291
292
pub fn or_reduce(&self) -> PolarsResult<Scalar> {
293
self.values.or_reduce()
294
}
295
296
pub fn and_reduce(&self) -> PolarsResult<Scalar> {
297
self.values.and_reduce()
298
}
299
}
300
301