Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-expr/src/expressions/window.rs
6940 views
1
use std::fmt::Write;
2
3
use arrow::array::PrimitiveArray;
4
use arrow::bitmap::Bitmap;
5
use polars_core::prelude::*;
6
use polars_core::series::IsSorted;
7
use polars_core::utils::_split_offsets;
8
use polars_core::{POOL, downcast_as_macro_arg_physical};
9
use polars_ops::frame::SeriesJoin;
10
use polars_ops::frame::join::{ChunkJoinOptIds, private_left_join_multiple_keys};
11
use polars_ops::prelude::*;
12
use polars_plan::prelude::*;
13
use polars_utils::sort::perfect_sort;
14
use polars_utils::sync::SyncPtr;
15
use rayon::prelude::*;
16
17
use super::*;
18
19
pub struct WindowExpr {
20
/// the root column that the Function will be applied on.
21
/// This will be used to create a smaller DataFrame to prevent taking unneeded columns by index
22
pub(crate) group_by: Vec<Arc<dyn PhysicalExpr>>,
23
pub(crate) order_by: Option<(Arc<dyn PhysicalExpr>, SortOptions)>,
24
pub(crate) apply_columns: Vec<PlSmallStr>,
25
/// A function Expr. i.e. Mean, Median, Max, etc.
26
pub(crate) function: Expr,
27
pub(crate) phys_function: Arc<dyn PhysicalExpr>,
28
pub(crate) mapping: WindowMapping,
29
pub(crate) expr: Expr,
30
pub(crate) has_different_group_sources: bool,
31
}
32
33
#[cfg_attr(debug_assertions, derive(Debug))]
34
enum MapStrategy {
35
// Join by key, this the most expensive
36
// for reduced aggregations
37
Join,
38
// explode now
39
Explode,
40
// Use an arg_sort to map the values back
41
Map,
42
Nothing,
43
}
44
45
impl WindowExpr {
46
fn map_list_agg_by_arg_sort(
47
&self,
48
out_column: Column,
49
flattened: &Column,
50
mut ac: AggregationContext,
51
gb: GroupBy,
52
) -> PolarsResult<IdxCa> {
53
// idx (new-idx, original-idx)
54
let mut idx_mapping = Vec::with_capacity(out_column.len());
55
56
// we already set this buffer so we can reuse the `original_idx` buffer
57
// that saves an allocation
58
let mut take_idx = vec![];
59
60
// groups are not changed, we can map by doing a standard arg_sort.
61
if std::ptr::eq(ac.groups().as_ref(), gb.get_groups()) {
62
let mut iter = 0..flattened.len() as IdxSize;
63
match ac.groups().as_ref().as_ref() {
64
GroupsType::Idx(groups) => {
65
for g in groups.all() {
66
idx_mapping.extend(g.iter().copied().zip(&mut iter));
67
}
68
},
69
GroupsType::Slice { groups, .. } => {
70
for &[first, len] in groups {
71
idx_mapping.extend((first..first + len).zip(&mut iter));
72
}
73
},
74
}
75
}
76
// groups are changed, we use the new group indexes as arguments of the arg_sort
77
// and sort by the old indexes
78
else {
79
let mut original_idx = Vec::with_capacity(out_column.len());
80
match gb.get_groups().as_ref() {
81
GroupsType::Idx(groups) => {
82
for g in groups.all() {
83
original_idx.extend_from_slice(g)
84
}
85
},
86
GroupsType::Slice { groups, .. } => {
87
for &[first, len] in groups {
88
original_idx.extend(first..first + len)
89
}
90
},
91
};
92
93
let mut original_idx_iter = original_idx.iter().copied();
94
95
match ac.groups().as_ref().as_ref() {
96
GroupsType::Idx(groups) => {
97
for g in groups.all() {
98
idx_mapping.extend(g.iter().copied().zip(&mut original_idx_iter));
99
}
100
},
101
GroupsType::Slice { groups, .. } => {
102
for &[first, len] in groups {
103
idx_mapping.extend((first..first + len).zip(&mut original_idx_iter));
104
}
105
},
106
}
107
original_idx.clear();
108
take_idx = original_idx;
109
}
110
// SAFETY:
111
// we only have unique indices ranging from 0..len
112
unsafe { perfect_sort(&POOL, &idx_mapping, &mut take_idx) };
113
Ok(IdxCa::from_vec(PlSmallStr::EMPTY, take_idx))
114
}
115
116
#[allow(clippy::too_many_arguments)]
117
fn map_by_arg_sort(
118
&self,
119
df: &DataFrame,
120
out_column: Column,
121
flattened: &Column,
122
mut ac: AggregationContext,
123
group_by_columns: &[Column],
124
gb: GroupBy,
125
cache_key: String,
126
state: &ExecutionState,
127
) -> PolarsResult<Column> {
128
// we use an arg_sort to map the values back
129
130
// This is a bit more complicated because the final group tuples may differ from the original
131
// so we use the original indices as idx values to arg_sort the original column
132
//
133
// The example below shows the naive version without group tuple mapping
134
135
// columns
136
// a b a a
137
//
138
// agg list
139
// [0, 2, 3]
140
// [1]
141
//
142
// flatten
143
//
144
// [0, 2, 3, 1]
145
//
146
// arg_sort
147
//
148
// [0, 3, 1, 2]
149
//
150
// take by arg_sorted indexes and voila groups mapped
151
// [0, 1, 2, 3]
152
153
if flattened.len() != df.height() {
154
let ca = out_column.list().unwrap();
155
let non_matching_group =
156
ca.into_iter()
157
.zip(ac.groups().iter())
158
.find(|(output, group)| {
159
if let Some(output) = output {
160
output.as_ref().len() != group.len()
161
} else {
162
false
163
}
164
});
165
166
if let Some((output, group)) = non_matching_group {
167
let first = group.first();
168
let group = group_by_columns
169
.iter()
170
.map(|s| format!("{}", s.get(first as usize).unwrap()))
171
.collect::<Vec<_>>();
172
polars_bail!(
173
expr = self.expr, ShapeMismatch:
174
"the length of the window expression did not match that of the group\
175
\n> group: {}\n> group length: {}\n> output: '{:?}'",
176
comma_delimited(String::new(), &group), group.len(), output.unwrap()
177
);
178
} else {
179
polars_bail!(
180
expr = self.expr, ShapeMismatch:
181
"the length of the window expression did not match that of the group"
182
);
183
};
184
}
185
186
let idx = if state.cache_window() {
187
if let Some(idx) = state.window_cache.get_map(&cache_key) {
188
idx
189
} else {
190
let idx = Arc::new(self.map_list_agg_by_arg_sort(out_column, flattened, ac, gb)?);
191
state.window_cache.insert_map(cache_key, idx.clone());
192
idx
193
}
194
} else {
195
Arc::new(self.map_list_agg_by_arg_sort(out_column, flattened, ac, gb)?)
196
};
197
198
// SAFETY:
199
// groups should always be in bounds.
200
unsafe { Ok(flattened.take_unchecked(&idx)) }
201
}
202
203
fn run_aggregation<'a>(
204
&self,
205
df: &DataFrame,
206
state: &ExecutionState,
207
gb: &'a GroupBy,
208
) -> PolarsResult<AggregationContext<'a>> {
209
let ac = self
210
.phys_function
211
.evaluate_on_groups(df, gb.get_groups(), state)?;
212
Ok(ac)
213
}
214
215
fn is_explicit_list_agg(&self) -> bool {
216
// col("foo").implode()
217
// col("foo").implode().alias()
218
// ..
219
// col("foo").implode().alias().alias()
220
//
221
// but not:
222
// col("foo").implode().sum().alias()
223
// ..
224
// col("foo").min()
225
let mut explicit_list = false;
226
for e in &self.expr {
227
if let Expr::Window { function, .. } = e {
228
// or list().alias
229
let mut finishes_list = false;
230
for e in &**function {
231
match e {
232
Expr::Agg(AggExpr::Implode(_)) => {
233
finishes_list = true;
234
},
235
Expr::Alias(_, _) => {},
236
_ => break,
237
}
238
}
239
explicit_list = finishes_list;
240
}
241
}
242
243
explicit_list
244
}
245
246
fn is_simple_column_expr(&self) -> bool {
247
// col()
248
// or col().alias()
249
let mut simple_col = false;
250
for e in &self.expr {
251
if let Expr::Window { function, .. } = e {
252
// or list().alias
253
for e in &**function {
254
match e {
255
Expr::Column(_) => {
256
simple_col = true;
257
},
258
Expr::Alias(_, _) => {},
259
_ => break,
260
}
261
}
262
}
263
}
264
simple_col
265
}
266
267
fn is_aggregation(&self) -> bool {
268
// col()
269
// or col().agg()
270
let mut agg_col = false;
271
for e in &self.expr {
272
if let Expr::Window { function, .. } = e {
273
// or list().alias
274
for e in &**function {
275
match e {
276
Expr::Agg(_) => {
277
agg_col = true;
278
},
279
Expr::Alias(_, _) => {},
280
_ => break,
281
}
282
}
283
}
284
}
285
agg_col
286
}
287
288
fn determine_map_strategy(
289
&self,
290
agg_state: &AggState,
291
gb: &GroupBy,
292
) -> PolarsResult<MapStrategy> {
293
match (self.mapping, agg_state) {
294
// Explode
295
// `(col("x").sum() * col("y")).list().over("groups").flatten()`
296
(WindowMapping::Explode, _) => Ok(MapStrategy::Explode),
297
// // explicit list
298
// // `(col("x").sum() * col("y")).list().over("groups")`
299
// (false, false, _) => Ok(MapStrategy::Join),
300
// aggregations
301
//`sum("foo").over("groups")`
302
(_, AggState::AggregatedScalar(_)) => Ok(MapStrategy::Join),
303
// no explicit aggregations, map over the groups
304
//`(col("x").sum() * col("y")).over("groups")`
305
(WindowMapping::Join, AggState::AggregatedList(_)) => Ok(MapStrategy::Join),
306
// no explicit aggregations, map over the groups
307
//`(col("x").sum() * col("y")).over("groups")`
308
(WindowMapping::GroupsToRows, AggState::AggregatedList(_)) => {
309
if let GroupsType::Slice { .. } = gb.get_groups().as_ref() {
310
// Result can be directly exploded if the input was sorted.
311
Ok(MapStrategy::Explode)
312
} else {
313
Ok(MapStrategy::Map)
314
}
315
},
316
// no aggregations, just return column
317
// or an aggregation that has been flattened
318
// we have to check which one
319
//`col("foo").over("groups")`
320
(WindowMapping::GroupsToRows, AggState::NotAggregated(_)) => {
321
// col()
322
// or col().alias()
323
if self.is_simple_column_expr() {
324
Ok(MapStrategy::Nothing)
325
} else {
326
Ok(MapStrategy::Map)
327
}
328
},
329
(WindowMapping::Join, AggState::NotAggregated(_)) => Ok(MapStrategy::Join),
330
// literals, do nothing and let broadcast
331
(_, AggState::LiteralScalar(_)) => Ok(MapStrategy::Nothing),
332
}
333
}
334
}
335
336
// Utility to create partitions and cache keys
337
pub fn window_function_format_order_by(to: &mut String, e: &Expr, k: &SortOptions) {
338
write!(to, "_PL_{:?}{}_{}", e, k.descending, k.nulls_last).unwrap();
339
}
340
341
impl PhysicalExpr for WindowExpr {
342
// Note: this was first implemented with expression evaluation but this performed really bad.
343
// Therefore we choose the group_by -> apply -> self join approach
344
345
// This first cached the group_by and the join tuples, but rayon under a mutex leads to deadlocks:
346
// https://github.com/rayon-rs/rayon/issues/592
347
fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {
348
// This method does the following:
349
// 1. determine group_by tuples based on the group_column
350
// 2. apply an aggregation function
351
// 3. join the results back to the original dataframe
352
// this stores all group values on the original df size
353
//
354
// we have several strategies for this
355
// - 3.1 JOIN
356
// Use a join for aggregations like
357
// `sum("foo").over("groups")`
358
// and explicit `list` aggregations
359
// `(col("x").sum() * col("y")).list().over("groups")`
360
//
361
// - 3.2 EXPLODE
362
// Explicit list aggregations that are followed by `over().flatten()`
363
// # the fastest method to do things over groups when the groups are sorted.
364
// # note that it will require an explicit `list()` call from now on.
365
// `(col("x").sum() * col("y")).list().over("groups").flatten()`
366
//
367
// - 3.3. MAP to original locations
368
// This will be done for list aggregations that are not explicitly aggregated as list
369
// `(col("x").sum() * col("y")).over("groups")
370
// This can be used to reverse, sort, shuffle etc. the values in a group
371
372
// 4. select the final column and return
373
374
if df.is_empty() {
375
let field = self.phys_function.to_field(df.schema())?;
376
match self.mapping {
377
WindowMapping::Join => {
378
return Ok(Column::full_null(
379
field.name().clone(),
380
0,
381
&DataType::List(Box::new(field.dtype().clone())),
382
));
383
},
384
_ => {
385
return Ok(Column::full_null(field.name().clone(), 0, field.dtype()));
386
},
387
}
388
}
389
390
let group_by_columns = self
391
.group_by
392
.iter()
393
.map(|e| e.evaluate(df, state))
394
.collect::<PolarsResult<Vec<_>>>()?;
395
396
// if the keys are sorted
397
let sorted_keys = group_by_columns.iter().all(|s| {
398
matches!(
399
s.is_sorted_flag(),
400
IsSorted::Ascending | IsSorted::Descending
401
)
402
});
403
let explicit_list_agg = self.is_explicit_list_agg();
404
405
// if we flatten this column we need to make sure the groups are sorted.
406
let mut sort_groups = matches!(self.mapping, WindowMapping::Explode) ||
407
// if not
408
// `col().over()`
409
// and not
410
// `col().list().over`
411
// and not
412
// `col().sum()`
413
// and keys are sorted
414
// we may optimize with explode call
415
(!self.is_simple_column_expr() && !explicit_list_agg && sorted_keys && !self.is_aggregation());
416
417
// overwrite sort_groups for some expressions
418
// TODO: fully understand the rationale is here.
419
if self.has_different_group_sources {
420
sort_groups = true
421
}
422
423
let create_groups = || {
424
let gb = df.group_by_with_series(group_by_columns.clone(), true, sort_groups)?;
425
let mut groups = gb.take_groups();
426
427
if let Some((order_by, options)) = &self.order_by {
428
let order_by = order_by.evaluate(df, state)?;
429
polars_ensure!(order_by.len() == df.height(), ShapeMismatch: "the order by expression evaluated to a length: {} that doesn't match the input DataFrame: {}", order_by.len(), df.height());
430
groups = update_groups_sort_by(&groups, order_by.as_materialized_series(), options)?
431
.into_sliceable()
432
}
433
434
let out: PolarsResult<GroupPositions> = Ok(groups);
435
out
436
};
437
438
// Try to get cached grouptuples
439
let (mut groups, cache_key) = if state.cache_window() {
440
let mut cache_key = String::with_capacity(32 * group_by_columns.len());
441
write!(&mut cache_key, "{}", state.branch_idx).unwrap();
442
for s in &group_by_columns {
443
cache_key.push_str(s.name());
444
}
445
if let Some((e, options)) = &self.order_by {
446
let e = match e.as_expression() {
447
Some(e) => e,
448
None => {
449
polars_bail!(InvalidOperation: "cannot order by this expression in window function")
450
},
451
};
452
window_function_format_order_by(&mut cache_key, e, options)
453
}
454
455
let groups = match state.window_cache.get_groups(&cache_key) {
456
Some(groups) => groups,
457
None => create_groups()?,
458
};
459
(groups, cache_key)
460
} else {
461
(create_groups()?, "".to_string())
462
};
463
464
// 2. create GroupBy object and apply aggregation
465
let apply_columns = self.apply_columns.clone();
466
467
// some window expressions need sorted groups
468
// to make sure that the caches align we sort
469
// the groups, so that the cached groups and join keys
470
// are consistent among all windows
471
if sort_groups || state.cache_window() {
472
groups.sort();
473
state
474
.window_cache
475
.insert_groups(cache_key.clone(), groups.clone());
476
}
477
let gb = GroupBy::new(df, group_by_columns.clone(), groups, Some(apply_columns));
478
479
let mut ac = self.run_aggregation(df, state, &gb)?;
480
481
use MapStrategy::*;
482
match self.determine_map_strategy(ac.agg_state(), &gb)? {
483
Nothing => {
484
let mut out = ac.flat_naive().into_owned();
485
486
if ac.is_literal() {
487
out = out.new_from_index(0, df.height())
488
}
489
Ok(out.into_column())
490
},
491
Explode => {
492
let out = ac.aggregated().explode(false)?;
493
Ok(out.into_column())
494
},
495
Map => {
496
// TODO!
497
// investigate if sorted arrays can be return directly
498
let out_column = ac.aggregated();
499
let flattened = out_column.explode(false)?;
500
// we extend the lifetime as we must convince the compiler that ac lives
501
// long enough. We drop `GrouBy` when we are done with `ac`.
502
let ac = unsafe {
503
std::mem::transmute::<AggregationContext<'_>, AggregationContext<'static>>(ac)
504
};
505
self.map_by_arg_sort(
506
df,
507
out_column,
508
&flattened,
509
ac,
510
&group_by_columns,
511
gb,
512
cache_key,
513
state,
514
)
515
},
516
Join => {
517
let out_column = ac.aggregated();
518
// we try to flatten/extend the array by repeating the aggregated value n times
519
// where n is the number of members in that group. That way we can try to reuse
520
// the same map by arg_sort logic as done for listed aggregations
521
let update_groups = !matches!(&ac.update_groups, UpdateGroups::No);
522
match (
523
&ac.update_groups,
524
set_by_groups(&out_column, &ac, df.height(), update_groups),
525
) {
526
// for aggregations that reduce like sum, mean, first and are numeric
527
// we take the group locations to directly map them to the right place
528
(UpdateGroups::No, Some(out)) => Ok(out.into_column()),
529
(_, _) => {
530
let keys = gb.keys();
531
532
let get_join_tuples = || {
533
if group_by_columns.len() == 1 {
534
let mut left = group_by_columns[0].clone();
535
// group key from right column
536
let mut right = keys[0].clone();
537
538
let (left, right) = if left.dtype().is_nested() {
539
(
540
ChunkedArray::<BinaryOffsetType>::with_chunk(
541
"".into(),
542
row_encode::_get_rows_encoded_unordered(&[
543
left.clone()
544
])?
545
.into_array(),
546
)
547
.into_series(),
548
ChunkedArray::<BinaryOffsetType>::with_chunk(
549
"".into(),
550
row_encode::_get_rows_encoded_unordered(&[
551
right.clone()
552
])?
553
.into_array(),
554
)
555
.into_series(),
556
)
557
} else {
558
(
559
left.into_materialized_series().clone(),
560
right.into_materialized_series().clone(),
561
)
562
};
563
564
PolarsResult::Ok(Arc::new(
565
left.hash_join_left(&right, JoinValidation::ManyToMany, true)
566
.unwrap()
567
.1,
568
))
569
} else {
570
let df_right =
571
unsafe { DataFrame::new_no_checks_height_from_first(keys) };
572
let df_left = unsafe {
573
DataFrame::new_no_checks_height_from_first(group_by_columns)
574
};
575
Ok(Arc::new(
576
private_left_join_multiple_keys(&df_left, &df_right, true)?.1,
577
))
578
}
579
};
580
581
// try to get cached join_tuples
582
let join_opt_ids = if state.cache_window() {
583
if let Some(jt) = state.window_cache.get_join(&cache_key) {
584
jt
585
} else {
586
let jt = get_join_tuples()?;
587
state.window_cache.insert_join(cache_key, jt.clone());
588
jt
589
}
590
} else {
591
get_join_tuples()?
592
};
593
594
let out = materialize_column(&join_opt_ids, &out_column);
595
Ok(out.into_column())
596
},
597
}
598
},
599
}
600
}
601
602
fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
603
self.function.to_field(input_schema)
604
}
605
606
fn is_scalar(&self) -> bool {
607
false
608
}
609
610
#[allow(clippy::ptr_arg)]
611
fn evaluate_on_groups<'a>(
612
&self,
613
_df: &DataFrame,
614
_groups: &'a GroupPositions,
615
_state: &ExecutionState,
616
) -> PolarsResult<AggregationContext<'a>> {
617
polars_bail!(InvalidOperation: "window expression not allowed in aggregation");
618
}
619
620
fn as_expression(&self) -> Option<&Expr> {
621
Some(&self.expr)
622
}
623
}
624
625
fn materialize_column(join_opt_ids: &ChunkJoinOptIds, out_column: &Column) -> Column {
626
{
627
use arrow::Either;
628
use polars_ops::chunked_array::TakeChunked;
629
630
match join_opt_ids {
631
Either::Left(ids) => unsafe {
632
IdxCa::with_nullable_idx(ids, |idx| out_column.take_unchecked(idx))
633
},
634
Either::Right(ids) => unsafe { out_column.take_opt_chunked_unchecked(ids, false) },
635
}
636
}
637
}
638
639
/// Simple reducing aggregation can be set by the groups
640
fn set_by_groups(
641
s: &Column,
642
ac: &AggregationContext,
643
len: usize,
644
update_groups: bool,
645
) -> Option<Column> {
646
if update_groups || !ac.original_len {
647
return None;
648
}
649
if s.dtype().to_physical().is_primitive_numeric() {
650
let dtype = s.dtype();
651
let s = s.to_physical_repr();
652
653
macro_rules! dispatch {
654
($ca:expr) => {{ Some(set_numeric($ca, &ac.groups, len)) }};
655
}
656
downcast_as_macro_arg_physical!(&s, dispatch)
657
.map(|s| unsafe { s.from_physical_unchecked(dtype) }.unwrap())
658
.map(Column::from)
659
} else {
660
None
661
}
662
}
663
664
fn set_numeric<T: PolarsNumericType>(
665
ca: &ChunkedArray<T>,
666
groups: &GroupsType,
667
len: usize,
668
) -> Series {
669
let mut values = Vec::with_capacity(len);
670
let ptr: *mut T::Native = values.as_mut_ptr();
671
// SAFETY:
672
// we will write from different threads but we will never alias.
673
let sync_ptr_values = unsafe { SyncPtr::new(ptr) };
674
675
if ca.null_count() == 0 {
676
let ca = ca.rechunk();
677
match groups {
678
GroupsType::Idx(groups) => {
679
let agg_vals = ca.cont_slice().expect("rechunked");
680
POOL.install(|| {
681
agg_vals
682
.par_iter()
683
.zip(groups.all().par_iter())
684
.for_each(|(v, g)| {
685
let ptr = sync_ptr_values.get();
686
for idx in g.as_slice() {
687
debug_assert!((*idx as usize) < len);
688
unsafe { *ptr.add(*idx as usize) = *v }
689
}
690
})
691
})
692
},
693
GroupsType::Slice { groups, .. } => {
694
let agg_vals = ca.cont_slice().expect("rechunked");
695
POOL.install(|| {
696
agg_vals
697
.par_iter()
698
.zip(groups.par_iter())
699
.for_each(|(v, [start, g_len])| {
700
let ptr = sync_ptr_values.get();
701
let start = *start as usize;
702
let end = start + *g_len as usize;
703
for idx in start..end {
704
debug_assert!(idx < len);
705
unsafe { *ptr.add(idx) = *v }
706
}
707
})
708
});
709
},
710
}
711
712
// SAFETY: we have written all slots
713
unsafe { values.set_len(len) }
714
ChunkedArray::<T>::new_vec(ca.name().clone(), values).into_series()
715
} else {
716
// We don't use a mutable bitmap as bits will have race conditions!
717
// A single byte might alias if we write from single threads.
718
let mut validity: Vec<bool> = vec![false; len];
719
let validity_ptr = validity.as_mut_ptr();
720
let sync_ptr_validity = unsafe { SyncPtr::new(validity_ptr) };
721
722
let n_threads = POOL.current_num_threads();
723
let offsets = _split_offsets(ca.len(), n_threads);
724
725
match groups {
726
GroupsType::Idx(groups) => offsets.par_iter().for_each(|(offset, offset_len)| {
727
let offset = *offset;
728
let offset_len = *offset_len;
729
let ca = ca.slice(offset as i64, offset_len);
730
let groups = &groups.all()[offset..offset + offset_len];
731
let values_ptr = sync_ptr_values.get();
732
let validity_ptr = sync_ptr_validity.get();
733
734
ca.iter().zip(groups.iter()).for_each(|(opt_v, g)| {
735
for idx in g.as_slice() {
736
let idx = *idx as usize;
737
debug_assert!(idx < len);
738
unsafe {
739
match opt_v {
740
Some(v) => {
741
*values_ptr.add(idx) = v;
742
*validity_ptr.add(idx) = true;
743
},
744
None => {
745
*values_ptr.add(idx) = T::Native::default();
746
*validity_ptr.add(idx) = false;
747
},
748
};
749
}
750
}
751
})
752
}),
753
GroupsType::Slice { groups, .. } => {
754
offsets.par_iter().for_each(|(offset, offset_len)| {
755
let offset = *offset;
756
let offset_len = *offset_len;
757
let ca = ca.slice(offset as i64, offset_len);
758
let groups = &groups[offset..offset + offset_len];
759
let values_ptr = sync_ptr_values.get();
760
let validity_ptr = sync_ptr_validity.get();
761
762
for (opt_v, [start, g_len]) in ca.iter().zip(groups.iter()) {
763
let start = *start as usize;
764
let end = start + *g_len as usize;
765
for idx in start..end {
766
debug_assert!(idx < len);
767
unsafe {
768
match opt_v {
769
Some(v) => {
770
*values_ptr.add(idx) = v;
771
*validity_ptr.add(idx) = true;
772
},
773
None => {
774
*values_ptr.add(idx) = T::Native::default();
775
*validity_ptr.add(idx) = false;
776
},
777
};
778
}
779
}
780
}
781
})
782
},
783
}
784
// SAFETY: we have written all slots
785
unsafe { values.set_len(len) }
786
let validity = Bitmap::from(validity);
787
let arr = PrimitiveArray::new(
788
T::get_static_dtype()
789
.to_physical()
790
.to_arrow(CompatLevel::newest()),
791
values.into(),
792
Some(validity),
793
);
794
Series::try_from((ca.name().clone(), arr.boxed())).unwrap()
795
}
796
}
797
798