Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-ops/src/chunked_array/gather/chunked.rs
6939 views
1
#![allow(unsafe_op_in_unsafe_fn)]
2
use std::fmt::Debug;
3
4
use arrow::array::{Array, BinaryViewArrayGeneric, View, ViewType};
5
use arrow::bitmap::BitmapBuilder;
6
use arrow::buffer::Buffer;
7
use arrow::legacy::trusted_len::TrustedLenPush;
8
use hashbrown::hash_map::Entry;
9
use polars_core::prelude::gather::_update_gather_sorted_flag;
10
use polars_core::prelude::*;
11
use polars_core::series::IsSorted;
12
use polars_core::utils::Container;
13
use polars_core::{with_match_categorical_physical_type, with_match_physical_numeric_polars_type};
14
15
use crate::frame::IntoDf;
16
17
/// Gather by [`ChunkId`]
18
pub trait TakeChunked {
19
/// Gathers elements from a ChunkedArray, specifying for each element a
20
/// chunk index and index within that chunk through ChunkId. If
21
/// avoid_sharing is true the returned data should not share references
22
/// with the original array (like shared buffers in views).
23
///
24
/// # Safety
25
/// This function doesn't do any bound checks.
26
unsafe fn take_chunked_unchecked<const B: u64>(
27
&self,
28
by: &[ChunkId<B>],
29
sorted: IsSorted,
30
avoid_sharing: bool,
31
) -> Self;
32
33
/// # Safety
34
/// This function doesn't do any bound checks.
35
unsafe fn take_opt_chunked_unchecked<const B: u64>(
36
&self,
37
by: &[ChunkId<B>],
38
avoid_sharing: bool,
39
) -> Self;
40
}
41
42
impl TakeChunked for DataFrame {
43
/// Take elements by a slice of [`ChunkId`]s.
44
///
45
/// # Safety
46
/// Does not do any bound checks.
47
/// `sorted` indicates if the chunks are sorted.
48
unsafe fn take_chunked_unchecked<const B: u64>(
49
&self,
50
idx: &[ChunkId<B>],
51
sorted: IsSorted,
52
avoid_sharing: bool,
53
) -> DataFrame {
54
let cols = self
55
.to_df()
56
._apply_columns(&|s| s.take_chunked_unchecked(idx, sorted, avoid_sharing));
57
58
unsafe { DataFrame::new_no_checks_height_from_first(cols) }
59
}
60
61
/// Take elements by a slice of optional [`ChunkId`]s.
62
///
63
/// # Safety
64
/// Does not do any bound checks.
65
unsafe fn take_opt_chunked_unchecked<const B: u64>(
66
&self,
67
idx: &[ChunkId<B>],
68
avoid_sharing: bool,
69
) -> DataFrame {
70
let cols = self
71
.to_df()
72
._apply_columns(&|s| s.take_opt_chunked_unchecked(idx, avoid_sharing));
73
74
unsafe { DataFrame::new_no_checks_height_from_first(cols) }
75
}
76
}
77
78
pub trait TakeChunkedHorPar: IntoDf {
79
/// # Safety
80
/// Doesn't perform any bound checks
81
unsafe fn _take_chunked_unchecked_hor_par<const B: u64>(
82
&self,
83
idx: &[ChunkId<B>],
84
sorted: IsSorted,
85
) -> DataFrame {
86
let cols = self
87
.to_df()
88
._apply_columns_par(&|s| s.take_chunked_unchecked(idx, sorted, false));
89
90
unsafe { DataFrame::new_no_checks_height_from_first(cols) }
91
}
92
93
/// # Safety
94
/// Doesn't perform any bound checks
95
///
96
/// Check for null state in `ChunkId`.
97
unsafe fn _take_opt_chunked_unchecked_hor_par<const B: u64>(
98
&self,
99
idx: &[ChunkId<B>],
100
) -> DataFrame {
101
let cols = self
102
.to_df()
103
._apply_columns_par(&|s| s.take_opt_chunked_unchecked(idx, false));
104
105
unsafe { DataFrame::new_no_checks_height_from_first(cols) }
106
}
107
}
108
109
impl TakeChunkedHorPar for DataFrame {}
110
111
impl TakeChunked for Column {
112
unsafe fn take_chunked_unchecked<const B: u64>(
113
&self,
114
by: &[ChunkId<B>],
115
sorted: IsSorted,
116
avoid_sharing: bool,
117
) -> Self {
118
// @scalar-opt
119
let s = self.as_materialized_series();
120
let s = unsafe { s.take_chunked_unchecked(by, sorted, avoid_sharing) };
121
s.into_column()
122
}
123
124
unsafe fn take_opt_chunked_unchecked<const B: u64>(
125
&self,
126
by: &[ChunkId<B>],
127
avoid_sharing: bool,
128
) -> Self {
129
// @scalar-opt
130
let s = self.as_materialized_series();
131
let s = unsafe { s.take_opt_chunked_unchecked(by, avoid_sharing) };
132
s.into_column()
133
}
134
}
135
136
impl TakeChunked for Series {
137
unsafe fn take_chunked_unchecked<const B: u64>(
138
&self,
139
by: &[ChunkId<B>],
140
sorted: IsSorted,
141
avoid_sharing: bool,
142
) -> Self {
143
use DataType::*;
144
match self.dtype() {
145
dt if dt.is_primitive_numeric() => {
146
with_match_physical_numeric_polars_type!(self.dtype(), |$T| {
147
let ca: &ChunkedArray<$T> = self.as_ref().as_ref().as_ref();
148
ca.take_chunked_unchecked(by, sorted, avoid_sharing).into_series()
149
})
150
},
151
Boolean => {
152
let ca = self.bool().unwrap();
153
ca.take_chunked_unchecked(by, sorted, avoid_sharing)
154
.into_series()
155
},
156
Binary => {
157
let ca = self.binary().unwrap();
158
take_chunked_unchecked_binview(ca, by, sorted, avoid_sharing).into_series()
159
},
160
String => {
161
let ca = self.str().unwrap();
162
take_chunked_unchecked_binview(ca, by, sorted, avoid_sharing).into_series()
163
},
164
List(_) => {
165
let ca = self.list().unwrap();
166
ca.take_chunked_unchecked(by, sorted, avoid_sharing)
167
.into_series()
168
},
169
#[cfg(feature = "dtype-array")]
170
Array(_, _) => {
171
let ca = self.array().unwrap();
172
ca.take_chunked_unchecked(by, sorted, avoid_sharing)
173
.into_series()
174
},
175
#[cfg(feature = "dtype-struct")]
176
Struct(_) => {
177
let ca = self.struct_().unwrap();
178
take_chunked_unchecked_struct(ca, by, sorted, avoid_sharing).into_series()
179
},
180
#[cfg(feature = "object")]
181
Object(_) => take_unchecked_object(self, by, sorted),
182
#[cfg(feature = "dtype-decimal")]
183
Decimal(_, _) => {
184
let ca = self.decimal().unwrap();
185
let out = ca.phys.take_chunked_unchecked(by, sorted, avoid_sharing);
186
out.into_decimal_unchecked(ca.precision(), ca.scale())
187
.into_series()
188
},
189
#[cfg(feature = "dtype-date")]
190
Date => {
191
let ca = self.date().unwrap();
192
ca.physical()
193
.take_chunked_unchecked(by, sorted, avoid_sharing)
194
.into_date()
195
.into_series()
196
},
197
#[cfg(feature = "dtype-datetime")]
198
Datetime(u, z) => {
199
let ca = self.datetime().unwrap();
200
ca.physical()
201
.take_chunked_unchecked(by, sorted, avoid_sharing)
202
.into_datetime(*u, z.clone())
203
.into_series()
204
},
205
#[cfg(feature = "dtype-duration")]
206
Duration(u) => {
207
let ca = self.duration().unwrap();
208
ca.physical()
209
.take_chunked_unchecked(by, sorted, avoid_sharing)
210
.into_duration(*u)
211
.into_series()
212
},
213
#[cfg(feature = "dtype-time")]
214
Time => {
215
let ca = self.time().unwrap();
216
ca.physical()
217
.take_chunked_unchecked(by, sorted, avoid_sharing)
218
.into_time()
219
.into_series()
220
},
221
#[cfg(feature = "dtype-categorical")]
222
Categorical(_, _) | Enum(_, _) => {
223
with_match_categorical_physical_type!(self.dtype().cat_physical().unwrap(), |$C| {
224
let ca = self.cat::<$C>().unwrap();
225
CategoricalChunked::<$C>::from_cats_and_dtype_unchecked(
226
ca.physical().take_chunked_unchecked(by, sorted, avoid_sharing),
227
self.dtype().clone()
228
)
229
.into_series()
230
})
231
},
232
Null => Series::new_null(self.name().clone(), by.len()),
233
_ => unreachable!(),
234
}
235
}
236
237
/// Take function that checks of null state in `ChunkIdx`.
238
unsafe fn take_opt_chunked_unchecked<const B: u64>(
239
&self,
240
by: &[ChunkId<B>],
241
avoid_sharing: bool,
242
) -> Self {
243
use DataType::*;
244
match self.dtype() {
245
dt if dt.is_primitive_numeric() => {
246
with_match_physical_numeric_polars_type!(self.dtype(), |$T| {
247
let ca: &ChunkedArray<$T> = self.as_ref().as_ref().as_ref();
248
ca.take_opt_chunked_unchecked(by, avoid_sharing).into_series()
249
})
250
},
251
Boolean => {
252
let ca = self.bool().unwrap();
253
ca.take_opt_chunked_unchecked(by, avoid_sharing)
254
.into_series()
255
},
256
Binary => {
257
let ca = self.binary().unwrap();
258
take_opt_chunked_unchecked_binview(ca, by, avoid_sharing).into_series()
259
},
260
String => {
261
let ca = self.str().unwrap();
262
take_opt_chunked_unchecked_binview(ca, by, avoid_sharing).into_series()
263
},
264
List(_) => {
265
let ca = self.list().unwrap();
266
ca.take_opt_chunked_unchecked(by, avoid_sharing)
267
.into_series()
268
},
269
#[cfg(feature = "dtype-array")]
270
Array(_, _) => {
271
let ca = self.array().unwrap();
272
ca.take_opt_chunked_unchecked(by, avoid_sharing)
273
.into_series()
274
},
275
#[cfg(feature = "dtype-struct")]
276
Struct(_) => {
277
let ca = self.struct_().unwrap();
278
take_opt_chunked_unchecked_struct(ca, by, avoid_sharing).into_series()
279
},
280
#[cfg(feature = "object")]
281
Object(_) => take_opt_unchecked_object(self, by, avoid_sharing),
282
#[cfg(feature = "dtype-decimal")]
283
Decimal(_, _) => {
284
let ca = self.decimal().unwrap();
285
let out = ca.phys.take_opt_chunked_unchecked(by, avoid_sharing);
286
out.into_decimal_unchecked(ca.precision(), ca.scale())
287
.into_series()
288
},
289
#[cfg(feature = "dtype-date")]
290
Date => {
291
let ca = self.date().unwrap();
292
ca.physical()
293
.take_opt_chunked_unchecked(by, avoid_sharing)
294
.into_date()
295
.into_series()
296
},
297
#[cfg(feature = "dtype-datetime")]
298
Datetime(u, z) => {
299
let ca = self.datetime().unwrap();
300
ca.physical()
301
.take_opt_chunked_unchecked(by, avoid_sharing)
302
.into_datetime(*u, z.clone())
303
.into_series()
304
},
305
#[cfg(feature = "dtype-duration")]
306
Duration(u) => {
307
let ca = self.duration().unwrap();
308
ca.physical()
309
.take_opt_chunked_unchecked(by, avoid_sharing)
310
.into_duration(*u)
311
.into_series()
312
},
313
#[cfg(feature = "dtype-time")]
314
Time => {
315
let ca = self.time().unwrap();
316
ca.physical()
317
.take_opt_chunked_unchecked(by, avoid_sharing)
318
.into_time()
319
.into_series()
320
},
321
#[cfg(feature = "dtype-categorical")]
322
Categorical(_, _) | Enum(_, _) => {
323
with_match_categorical_physical_type!(self.dtype().cat_physical().unwrap(), |$C| {
324
let ca = self.cat::<$C>().unwrap();
325
CategoricalChunked::<$C>::from_cats_and_dtype_unchecked(
326
ca.physical().take_opt_chunked_unchecked(by, avoid_sharing),
327
self.dtype().clone()
328
)
329
.into_series()
330
})
331
},
332
Null => Series::new_null(self.name().clone(), by.len()),
333
_ => unreachable!(),
334
}
335
}
336
}
337
338
impl<T> TakeChunked for ChunkedArray<T>
339
where
340
T: PolarsDataType,
341
T::Array: Debug,
342
{
343
unsafe fn take_chunked_unchecked<const B: u64>(
344
&self,
345
by: &[ChunkId<B>],
346
sorted: IsSorted,
347
_allow_sharing: bool,
348
) -> Self {
349
let arrow_dtype = self.dtype().to_arrow(CompatLevel::newest());
350
351
let mut out = if !self.has_nulls() {
352
let iter = by.iter().map(|chunk_id| {
353
debug_assert!(
354
!chunk_id.is_null(),
355
"null chunks should not hit this branch"
356
);
357
let (chunk_idx, array_idx) = chunk_id.extract();
358
let arr = self.downcast_get_unchecked(chunk_idx as usize);
359
arr.value_unchecked(array_idx as usize)
360
});
361
362
let arr = iter.collect_arr_trusted_with_dtype(arrow_dtype);
363
ChunkedArray::with_chunk(self.name().clone(), arr)
364
} else {
365
let iter = by.iter().map(|chunk_id| {
366
debug_assert!(
367
!chunk_id.is_null(),
368
"null chunks should not hit this branch"
369
);
370
let (chunk_idx, array_idx) = chunk_id.extract();
371
let arr = self.downcast_get_unchecked(chunk_idx as usize);
372
arr.get_unchecked(array_idx as usize)
373
});
374
375
let arr = iter.collect_arr_trusted_with_dtype(arrow_dtype);
376
ChunkedArray::with_chunk(self.name().clone(), arr)
377
};
378
let sorted_flag = _update_gather_sorted_flag(self.is_sorted_flag(), sorted);
379
out.set_sorted_flag(sorted_flag);
380
out
381
}
382
383
// Take function that checks of null state in `ChunkIdx`.
384
unsafe fn take_opt_chunked_unchecked<const B: u64>(
385
&self,
386
by: &[ChunkId<B>],
387
_allow_sharing: bool,
388
) -> Self {
389
let arrow_dtype = self.dtype().to_arrow(CompatLevel::newest());
390
391
if !self.has_nulls() {
392
let arr = by
393
.iter()
394
.map(|chunk_id| {
395
if chunk_id.is_null() {
396
None
397
} else {
398
let (chunk_idx, array_idx) = chunk_id.extract();
399
let arr = self.downcast_get_unchecked(chunk_idx as usize);
400
Some(arr.value_unchecked(array_idx as usize).clone())
401
}
402
})
403
.collect_arr_trusted_with_dtype(arrow_dtype);
404
405
ChunkedArray::with_chunk(self.name().clone(), arr)
406
} else {
407
let arr = by
408
.iter()
409
.map(|chunk_id| {
410
if chunk_id.is_null() {
411
None
412
} else {
413
let (chunk_idx, array_idx) = chunk_id.extract();
414
let arr = self.downcast_get_unchecked(chunk_idx as usize);
415
arr.get_unchecked(array_idx as usize)
416
}
417
})
418
.collect_arr_trusted_with_dtype(arrow_dtype);
419
420
ChunkedArray::with_chunk(self.name().clone(), arr)
421
}
422
}
423
}
424
425
#[cfg(feature = "object")]
426
unsafe fn take_unchecked_object<const B: u64>(
427
s: &Series,
428
by: &[ChunkId<B>],
429
_sorted: IsSorted,
430
) -> Series {
431
use polars_core::chunked_array::object::registry::get_object_builder;
432
433
let mut builder = get_object_builder(s.name().clone(), by.len());
434
435
by.iter().for_each(|chunk_id| {
436
let (chunk_idx, array_idx) = chunk_id.extract();
437
let object = s.get_object_chunked_unchecked(chunk_idx as usize, array_idx as usize);
438
builder.append_option(object.map(|v| v.as_any()))
439
});
440
builder.to_series()
441
}
442
443
#[cfg(feature = "object")]
444
unsafe fn take_opt_unchecked_object<const B: u64>(
445
s: &Series,
446
by: &[ChunkId<B>],
447
_allow_sharing: bool,
448
) -> Series {
449
use polars_core::chunked_array::object::registry::get_object_builder;
450
451
let mut builder = get_object_builder(s.name().clone(), by.len());
452
453
by.iter().for_each(|chunk_id| {
454
if chunk_id.is_null() {
455
builder.append_null()
456
} else {
457
let (chunk_idx, array_idx) = chunk_id.extract();
458
let object = s.get_object_chunked_unchecked(chunk_idx as usize, array_idx as usize);
459
builder.append_option(object.map(|v| v.as_any()))
460
}
461
});
462
builder.to_series()
463
}
464
465
unsafe fn take_chunked_unchecked_binview<const B: u64, T, V>(
466
ca: &ChunkedArray<T>,
467
by: &[ChunkId<B>],
468
sorted: IsSorted,
469
avoid_sharing: bool,
470
) -> ChunkedArray<T>
471
where
472
T: PolarsDataType<Array = BinaryViewArrayGeneric<V>>,
473
T::Array: Debug,
474
V: ViewType + ?Sized,
475
{
476
if avoid_sharing {
477
return ca.take_chunked_unchecked(by, sorted, avoid_sharing);
478
}
479
480
let mut views = Vec::with_capacity(by.len());
481
let (validity, arc_data_buffers);
482
483
// If we can cheaply clone the list of buffers from the ChunkedArray we will,
484
// otherwise we will only clone those buffers we need.
485
if ca.n_chunks() == 1 {
486
let arr = ca.downcast_iter().next().unwrap();
487
let arr_views = arr.views();
488
489
validity = if arr.has_nulls() {
490
let mut validity = BitmapBuilder::with_capacity(by.len());
491
for id in by.iter() {
492
let (chunk_idx, array_idx) = id.extract();
493
debug_assert!(chunk_idx == 0);
494
if arr.is_null_unchecked(array_idx as usize) {
495
views.push_unchecked(View::default());
496
validity.push_unchecked(false);
497
} else {
498
views.push_unchecked(*arr_views.get_unchecked(array_idx as usize));
499
validity.push_unchecked(true);
500
}
501
}
502
Some(validity.freeze())
503
} else {
504
for id in by.iter() {
505
let (chunk_idx, array_idx) = id.extract();
506
debug_assert!(chunk_idx == 0);
507
views.push_unchecked(*arr_views.get_unchecked(array_idx as usize));
508
}
509
None
510
};
511
512
arc_data_buffers = arr.data_buffers().clone();
513
}
514
// Dedup the buffers while creating the views.
515
else if by.len() < ca.n_chunks() {
516
let mut buffer_idxs = PlHashMap::with_capacity(8);
517
let mut buffers = Vec::with_capacity(8);
518
519
validity = if ca.has_nulls() {
520
let mut validity = BitmapBuilder::with_capacity(by.len());
521
for id in by.iter() {
522
let (chunk_idx, array_idx) = id.extract();
523
524
let arr = ca.downcast_get_unchecked(chunk_idx as usize);
525
if arr.is_null_unchecked(array_idx as usize) {
526
views.push_unchecked(View::default());
527
validity.push_unchecked(false);
528
} else {
529
let view = *arr.views().get_unchecked(array_idx as usize);
530
views.push_unchecked(update_view_and_dedup(
531
view,
532
arr.data_buffers(),
533
&mut buffer_idxs,
534
&mut buffers,
535
));
536
validity.push_unchecked(true);
537
}
538
}
539
Some(validity.freeze())
540
} else {
541
for id in by.iter() {
542
let (chunk_idx, array_idx) = id.extract();
543
544
let arr = ca.downcast_get_unchecked(chunk_idx as usize);
545
let view = *arr.views().get_unchecked(array_idx as usize);
546
views.push_unchecked(update_view_and_dedup(
547
view,
548
arr.data_buffers(),
549
&mut buffer_idxs,
550
&mut buffers,
551
));
552
}
553
None
554
};
555
556
arc_data_buffers = buffers.into();
557
}
558
// Dedup the buffers up front
559
else {
560
let (buffers, buffer_offsets) = dedup_buffers_by_arc(ca);
561
562
validity = if ca.has_nulls() {
563
let mut validity = BitmapBuilder::with_capacity(by.len());
564
for id in by.iter() {
565
let (chunk_idx, array_idx) = id.extract();
566
567
let arr = ca.downcast_get_unchecked(chunk_idx as usize);
568
if arr.is_null_unchecked(array_idx as usize) {
569
views.push_unchecked(View::default());
570
validity.push_unchecked(false);
571
} else {
572
let view = *arr.views().get_unchecked(array_idx as usize);
573
let view = rewrite_view(view, chunk_idx, &buffer_offsets);
574
views.push_unchecked(view);
575
validity.push_unchecked(true);
576
}
577
}
578
Some(validity.freeze())
579
} else {
580
for id in by.iter() {
581
let (chunk_idx, array_idx) = id.extract();
582
583
let arr = ca.downcast_get_unchecked(chunk_idx as usize);
584
let view = *arr.views().get_unchecked(array_idx as usize);
585
let view = rewrite_view(view, chunk_idx, &buffer_offsets);
586
views.push_unchecked(view);
587
}
588
None
589
};
590
591
arc_data_buffers = buffers.into();
592
};
593
594
let arr = BinaryViewArrayGeneric::<V>::new_unchecked_unknown_md(
595
V::DATA_TYPE,
596
views.into(),
597
arc_data_buffers,
598
validity,
599
None,
600
);
601
602
let mut out = ChunkedArray::with_chunk(ca.name().clone(), arr.maybe_gc());
603
let sorted_flag = _update_gather_sorted_flag(ca.is_sorted_flag(), sorted);
604
out.set_sorted_flag(sorted_flag);
605
out
606
}
607
608
#[allow(clippy::unnecessary_cast)]
609
#[inline(always)]
610
unsafe fn rewrite_view(mut view: View, chunk_idx: IdxSize, buffer_offsets: &[u32]) -> View {
611
if view.length > 12 {
612
let base_offset = *buffer_offsets.get_unchecked(chunk_idx as usize);
613
view.buffer_idx += base_offset;
614
}
615
view
616
}
617
618
unsafe fn update_view_and_dedup(
619
mut view: View,
620
orig_buffers: &[Buffer<u8>],
621
buffer_idxs: &mut PlHashMap<(*const u8, usize), u32>,
622
buffers: &mut Vec<Buffer<u8>>,
623
) -> View {
624
if view.length > 12 {
625
// Dedup on pointer + length.
626
let orig_buffer = orig_buffers.get_unchecked(view.buffer_idx as usize);
627
view.buffer_idx =
628
match buffer_idxs.entry((orig_buffer.as_slice().as_ptr(), orig_buffer.len())) {
629
Entry::Occupied(o) => *o.get(),
630
Entry::Vacant(v) => {
631
let buffer_idx = buffers.len() as u32;
632
buffers.push(orig_buffer.clone());
633
v.insert(buffer_idx);
634
buffer_idx
635
},
636
};
637
}
638
view
639
}
640
641
fn dedup_buffers_by_arc<T, V>(ca: &ChunkedArray<T>) -> (Vec<Buffer<u8>>, Vec<u32>)
642
where
643
T: PolarsDataType<Array = BinaryViewArrayGeneric<V>>,
644
V: ViewType + ?Sized,
645
{
646
// Dedup buffers up front. Note: don't do this during view update, as this is often is much
647
// more costly.
648
let mut buffers = Vec::with_capacity(ca.chunks().len());
649
// Dont need to include the length, as we look at the arc pointers, which are immutable.
650
let mut buffers_dedup = PlHashMap::with_capacity(ca.chunks().len());
651
let mut buffer_offsets = Vec::with_capacity(ca.chunks().len() + 1);
652
653
for arr in ca.downcast_iter() {
654
let data_buffers = arr.data_buffers();
655
let arc_ptr = data_buffers.as_ptr();
656
let offset = match buffers_dedup.entry(arc_ptr) {
657
Entry::Occupied(o) => *o.get(),
658
Entry::Vacant(v) => {
659
let offset = buffers.len() as u32;
660
buffers.extend(data_buffers.iter().cloned());
661
v.insert(offset);
662
offset
663
},
664
};
665
buffer_offsets.push(offset);
666
}
667
(buffers, buffer_offsets)
668
}
669
670
unsafe fn take_opt_chunked_unchecked_binview<const B: u64, T, V>(
671
ca: &ChunkedArray<T>,
672
by: &[ChunkId<B>],
673
avoid_sharing: bool,
674
) -> ChunkedArray<T>
675
where
676
T: PolarsDataType<Array = BinaryViewArrayGeneric<V>>,
677
T::Array: Debug,
678
V: ViewType + ?Sized,
679
{
680
if avoid_sharing {
681
return ca.take_opt_chunked_unchecked(by, avoid_sharing);
682
}
683
684
let mut views = Vec::with_capacity(by.len());
685
let mut validity = BitmapBuilder::with_capacity(by.len());
686
687
// If we can cheaply clone the list of buffers from the ChunkedArray we will,
688
// otherwise we will only clone those buffers we need.
689
let arc_data_buffers = if ca.n_chunks() == 1 {
690
let arr = ca.downcast_iter().next().unwrap();
691
let arr_views = arr.views();
692
693
if arr.has_nulls() {
694
for id in by.iter() {
695
let (chunk_idx, array_idx) = id.extract();
696
debug_assert!(id.is_null() || chunk_idx == 0);
697
if id.is_null() || arr.is_null_unchecked(array_idx as usize) {
698
views.push_unchecked(View::default());
699
validity.push_unchecked(false);
700
} else {
701
views.push_unchecked(*arr_views.get_unchecked(array_idx as usize));
702
validity.push_unchecked(true);
703
}
704
}
705
} else {
706
for id in by.iter() {
707
let (chunk_idx, array_idx) = id.extract();
708
debug_assert!(id.is_null() || chunk_idx == 0);
709
if id.is_null() {
710
views.push_unchecked(View::default());
711
validity.push_unchecked(false);
712
} else {
713
views.push_unchecked(*arr_views.get_unchecked(array_idx as usize));
714
validity.push_unchecked(true);
715
}
716
}
717
}
718
719
arr.data_buffers().clone()
720
}
721
// Dedup the buffers while creating the views.
722
else if by.len() < ca.n_chunks() {
723
let mut buffer_idxs = PlHashMap::with_capacity(8);
724
let mut buffers = Vec::with_capacity(8);
725
726
if ca.has_nulls() {
727
for id in by.iter() {
728
let (chunk_idx, array_idx) = id.extract();
729
730
if id.is_null() {
731
views.push_unchecked(View::default());
732
validity.push_unchecked(false);
733
} else {
734
let arr = ca.downcast_get_unchecked(chunk_idx as usize);
735
if arr.is_null_unchecked(array_idx as usize) {
736
views.push_unchecked(View::default());
737
validity.push_unchecked(false);
738
} else {
739
let view = *arr.views().get_unchecked(array_idx as usize);
740
views.push_unchecked(update_view_and_dedup(
741
view,
742
arr.data_buffers(),
743
&mut buffer_idxs,
744
&mut buffers,
745
));
746
validity.push_unchecked(true);
747
}
748
}
749
}
750
} else {
751
for id in by.iter() {
752
let (chunk_idx, array_idx) = id.extract();
753
754
if id.is_null() {
755
views.push_unchecked(View::default());
756
validity.push_unchecked(false);
757
} else {
758
let arr = ca.downcast_get_unchecked(chunk_idx as usize);
759
let view = *arr.views().get_unchecked(array_idx as usize);
760
views.push_unchecked(update_view_and_dedup(
761
view,
762
arr.data_buffers(),
763
&mut buffer_idxs,
764
&mut buffers,
765
));
766
validity.push_unchecked(true);
767
}
768
}
769
};
770
771
buffers.into()
772
}
773
// Dedup the buffers up front
774
else {
775
let (buffers, buffer_offsets) = dedup_buffers_by_arc(ca);
776
777
if ca.has_nulls() {
778
for id in by.iter() {
779
let (chunk_idx, array_idx) = id.extract();
780
781
if id.is_null() {
782
views.push_unchecked(View::default());
783
validity.push_unchecked(false);
784
} else {
785
let arr = ca.downcast_get_unchecked(chunk_idx as usize);
786
if arr.is_null_unchecked(array_idx as usize) {
787
views.push_unchecked(View::default());
788
validity.push_unchecked(false);
789
} else {
790
let view = *arr.views().get_unchecked(array_idx as usize);
791
let view = rewrite_view(view, chunk_idx, &buffer_offsets);
792
views.push_unchecked(view);
793
validity.push_unchecked(true);
794
}
795
}
796
}
797
} else {
798
for id in by.iter() {
799
let (chunk_idx, array_idx) = id.extract();
800
801
if id.is_null() {
802
views.push_unchecked(View::default());
803
validity.push_unchecked(false);
804
} else {
805
let arr = ca.downcast_get_unchecked(chunk_idx as usize);
806
let view = *arr.views().get_unchecked(array_idx as usize);
807
let view = rewrite_view(view, chunk_idx, &buffer_offsets);
808
views.push_unchecked(view);
809
validity.push_unchecked(true);
810
}
811
}
812
};
813
814
buffers.into()
815
};
816
817
let arr = BinaryViewArrayGeneric::<V>::new_unchecked_unknown_md(
818
V::DATA_TYPE,
819
views.into(),
820
arc_data_buffers,
821
Some(validity.freeze()),
822
None,
823
);
824
825
ChunkedArray::with_chunk(ca.name().clone(), arr.maybe_gc())
826
}
827
828
#[cfg(feature = "dtype-struct")]
829
unsafe fn take_chunked_unchecked_struct<const B: u64>(
830
ca: &StructChunked,
831
by: &[ChunkId<B>],
832
sorted: IsSorted,
833
avoid_sharing: bool,
834
) -> StructChunked {
835
let fields = ca
836
.fields_as_series()
837
.iter()
838
.map(|s| s.take_chunked_unchecked(by, sorted, avoid_sharing))
839
.collect::<Vec<_>>();
840
let mut out = StructChunked::from_series(ca.name().clone(), by.len(), fields.iter()).unwrap();
841
842
if !ca.has_nulls() {
843
return out;
844
}
845
846
let mut validity = BitmapBuilder::with_capacity(by.len());
847
if ca.n_chunks() == 1 {
848
let arr = ca.downcast_as_array();
849
let bitmap = arr.validity().unwrap();
850
for id in by.iter() {
851
let (chunk_idx, array_idx) = id.extract();
852
debug_assert!(chunk_idx == 0);
853
validity.push_unchecked(bitmap.get_bit_unchecked(array_idx as usize));
854
}
855
} else {
856
for id in by.iter() {
857
let (chunk_idx, array_idx) = id.extract();
858
let arr = ca.downcast_get_unchecked(chunk_idx as usize);
859
if let Some(bitmap) = arr.validity() {
860
validity.push_unchecked(bitmap.get_bit_unchecked(array_idx as usize));
861
} else {
862
validity.push_unchecked(true);
863
}
864
}
865
}
866
867
out.rechunk_mut(); // Should be a no-op.
868
out.downcast_iter_mut()
869
.next()
870
.unwrap()
871
.set_validity(validity.into_opt_validity());
872
out
873
}
874
875
#[cfg(feature = "dtype-struct")]
876
unsafe fn take_opt_chunked_unchecked_struct<const B: u64>(
877
ca: &StructChunked,
878
by: &[ChunkId<B>],
879
avoid_sharing: bool,
880
) -> StructChunked {
881
let fields = ca
882
.fields_as_series()
883
.iter()
884
.map(|s| s.take_opt_chunked_unchecked(by, avoid_sharing))
885
.collect::<Vec<_>>();
886
let mut out = StructChunked::from_series(ca.name().clone(), by.len(), fields.iter()).unwrap();
887
888
let mut validity = BitmapBuilder::with_capacity(by.len());
889
if ca.n_chunks() == 1 {
890
let arr = ca.downcast_as_array();
891
if let Some(bitmap) = arr.validity() {
892
for id in by.iter() {
893
if id.is_null() {
894
validity.push_unchecked(false);
895
} else {
896
let (chunk_idx, array_idx) = id.extract();
897
debug_assert!(chunk_idx == 0);
898
validity.push_unchecked(bitmap.get_bit_unchecked(array_idx as usize));
899
}
900
}
901
} else {
902
for id in by.iter() {
903
validity.push_unchecked(!id.is_null());
904
}
905
}
906
} else {
907
for id in by.iter() {
908
if id.is_null() {
909
validity.push_unchecked(false);
910
} else {
911
let (chunk_idx, array_idx) = id.extract();
912
let arr = ca.downcast_get_unchecked(chunk_idx as usize);
913
if let Some(bitmap) = arr.validity() {
914
validity.push_unchecked(bitmap.get_bit_unchecked(array_idx as usize));
915
} else {
916
validity.push_unchecked(true);
917
}
918
}
919
}
920
}
921
922
out.rechunk_mut(); // Should be a no-op.
923
out.downcast_iter_mut()
924
.next()
925
.unwrap()
926
.set_validity(validity.into_opt_validity());
927
out
928
}
929
930
#[cfg(test)]
931
mod test {
932
use super::*;
933
934
#[test]
935
fn test_binview_chunked_gather() {
936
unsafe {
937
// # Series without nulls;
938
let mut s_1 = Series::new(
939
"a".into(),
940
&["1 loooooooooooong string", "2 loooooooooooong string"],
941
);
942
let s_2 = Series::new(
943
"a".into(),
944
&["11 loooooooooooong string", "22 loooooooooooong string"],
945
);
946
let s_3 = Series::new(
947
"a".into(),
948
&[
949
"111 loooooooooooong string",
950
"222 loooooooooooong string",
951
"small", // this tests we don't mess with the inlined view
952
],
953
);
954
s_1.append(&s_2).unwrap();
955
s_1.append(&s_3).unwrap();
956
957
assert_eq!(s_1.n_chunks(), 3);
958
959
// ## Ids without nulls;
960
let by: [ChunkId<24>; 7] = [
961
ChunkId::store(0, 0),
962
ChunkId::store(0, 1),
963
ChunkId::store(1, 1),
964
ChunkId::store(1, 0),
965
ChunkId::store(2, 0),
966
ChunkId::store(2, 1),
967
ChunkId::store(2, 2),
968
];
969
970
let out = s_1.take_chunked_unchecked(&by, IsSorted::Not, true);
971
let idx = IdxCa::new("".into(), [0, 1, 3, 2, 4, 5, 6]);
972
let expected = s_1.rechunk().take(&idx).unwrap();
973
assert!(out.equals(&expected));
974
975
// ## Ids with nulls;
976
let by: [ChunkId<24>; 4] = [
977
ChunkId::null(),
978
ChunkId::store(0, 1),
979
ChunkId::store(1, 1),
980
ChunkId::store(1, 0),
981
];
982
let out = s_1.take_opt_chunked_unchecked(&by, true);
983
984
let idx = IdxCa::new("".into(), [None, Some(1), Some(3), Some(2)]);
985
let expected = s_1.rechunk().take(&idx).unwrap();
986
assert!(out.equals_missing(&expected));
987
988
// # Series with nulls;
989
let mut s_1 = Series::new(
990
"a".into(),
991
&["1 loooooooooooong string 1", "2 loooooooooooong string 2"],
992
);
993
let s_2 = Series::new("a".into(), &[Some("11 loooooooooooong string 11"), None]);
994
s_1.append(&s_2).unwrap();
995
996
// ## Ids without nulls;
997
let by: [ChunkId<24>; 4] = [
998
ChunkId::store(0, 0),
999
ChunkId::store(0, 1),
1000
ChunkId::store(1, 1),
1001
ChunkId::store(1, 0),
1002
];
1003
1004
let out = s_1.take_chunked_unchecked(&by, IsSorted::Not, true);
1005
let idx = IdxCa::new("".into(), [0, 1, 3, 2]);
1006
let expected = s_1.rechunk().take(&idx).unwrap();
1007
assert!(out.equals_missing(&expected));
1008
1009
// ## Ids with nulls;
1010
let by: [ChunkId<24>; 4] = [
1011
ChunkId::null(),
1012
ChunkId::store(0, 1),
1013
ChunkId::store(1, 1),
1014
ChunkId::store(1, 0),
1015
];
1016
let out = s_1.take_opt_chunked_unchecked(&by, true);
1017
1018
let idx = IdxCa::new("".into(), [None, Some(1), Some(3), Some(2)]);
1019
let expected = s_1.rechunk().take(&idx).unwrap();
1020
assert!(out.equals_missing(&expected));
1021
}
1022
}
1023
}
1024
1025