Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-ops/src/chunked_array/gather/chunked.rs
8421 views
1
#![allow(unsafe_op_in_unsafe_fn)]
2
use std::fmt::Debug;
3
4
use arrow::array::{Array, BinaryViewArrayGeneric, View, ViewType};
5
use arrow::bitmap::BitmapBuilder;
6
use arrow::legacy::trusted_len::TrustedLenPush;
7
use hashbrown::hash_map::Entry;
8
use polars_buffer::Buffer;
9
use polars_core::prelude::gather::_update_gather_sorted_flag;
10
use polars_core::prelude::*;
11
use polars_core::series::IsSorted;
12
use polars_core::utils::Container;
13
use polars_core::{with_match_categorical_physical_type, with_match_physical_numeric_polars_type};
14
15
use crate::frame::IntoDf;
16
17
/// Gather by [`ChunkId`]
18
pub trait TakeChunked {
19
/// Gathers elements from a ChunkedArray, specifying for each element a
20
/// chunk index and index within that chunk through ChunkId. If
21
/// avoid_sharing is true the returned data should not share references
22
/// with the original array (like shared buffers in views).
23
///
24
/// # Safety
25
/// This function doesn't do any bound checks.
26
unsafe fn take_chunked_unchecked<const B: u64>(
27
&self,
28
by: &[ChunkId<B>],
29
sorted: IsSorted,
30
avoid_sharing: bool,
31
) -> Self;
32
33
/// # Safety
34
/// This function doesn't do any bound checks.
35
unsafe fn take_opt_chunked_unchecked<const B: u64>(
36
&self,
37
by: &[ChunkId<B>],
38
avoid_sharing: bool,
39
) -> Self;
40
}
41
42
impl TakeChunked for DataFrame {
43
/// Take elements by a slice of [`ChunkId`]s.
44
///
45
/// # Safety
46
/// Does not do any bound checks.
47
/// `sorted` indicates if the chunks are sorted.
48
unsafe fn take_chunked_unchecked<const B: u64>(
49
&self,
50
idx: &[ChunkId<B>],
51
sorted: IsSorted,
52
avoid_sharing: bool,
53
) -> DataFrame {
54
let cols = self
55
.to_df()
56
.apply_columns(|s| s.take_chunked_unchecked(idx, sorted, avoid_sharing));
57
58
unsafe { DataFrame::new_unchecked_infer_height(cols) }
59
}
60
61
/// Take elements by a slice of optional [`ChunkId`]s.
62
///
63
/// # Safety
64
/// Does not do any bound checks.
65
unsafe fn take_opt_chunked_unchecked<const B: u64>(
66
&self,
67
idx: &[ChunkId<B>],
68
avoid_sharing: bool,
69
) -> DataFrame {
70
let cols = self
71
.to_df()
72
.apply_columns(|s| s.take_opt_chunked_unchecked(idx, avoid_sharing));
73
74
unsafe { DataFrame::new_unchecked_infer_height(cols) }
75
}
76
}
77
78
pub trait TakeChunkedHorPar: IntoDf {
79
/// # Safety
80
/// Doesn't perform any bound checks
81
unsafe fn _take_chunked_unchecked_hor_par<const B: u64>(
82
&self,
83
idx: &[ChunkId<B>],
84
sorted: IsSorted,
85
) -> DataFrame {
86
let cols = self
87
.to_df()
88
.apply_columns_par(|s| s.take_chunked_unchecked(idx, sorted, false));
89
90
unsafe { DataFrame::new_unchecked_infer_height(cols) }
91
}
92
93
/// # Safety
94
/// Doesn't perform any bound checks
95
///
96
/// Check for null state in `ChunkId`.
97
unsafe fn _take_opt_chunked_unchecked_hor_par<const B: u64>(
98
&self,
99
idx: &[ChunkId<B>],
100
) -> DataFrame {
101
let cols = self
102
.to_df()
103
.apply_columns_par(|s| s.take_opt_chunked_unchecked(idx, false));
104
105
unsafe { DataFrame::new_unchecked_infer_height(cols) }
106
}
107
}
108
109
impl TakeChunkedHorPar for DataFrame {}
110
111
impl TakeChunked for Column {
112
unsafe fn take_chunked_unchecked<const B: u64>(
113
&self,
114
by: &[ChunkId<B>],
115
sorted: IsSorted,
116
avoid_sharing: bool,
117
) -> Self {
118
// @scalar-opt
119
let s = self.as_materialized_series();
120
let s = unsafe { s.take_chunked_unchecked(by, sorted, avoid_sharing) };
121
s.into_column()
122
}
123
124
unsafe fn take_opt_chunked_unchecked<const B: u64>(
125
&self,
126
by: &[ChunkId<B>],
127
avoid_sharing: bool,
128
) -> Self {
129
// @scalar-opt
130
let s = self.as_materialized_series();
131
let s = unsafe { s.take_opt_chunked_unchecked(by, avoid_sharing) };
132
s.into_column()
133
}
134
}
135
136
impl TakeChunked for Series {
137
unsafe fn take_chunked_unchecked<const B: u64>(
138
&self,
139
by: &[ChunkId<B>],
140
sorted: IsSorted,
141
avoid_sharing: bool,
142
) -> Self {
143
use DataType::*;
144
match self.dtype() {
145
dt if dt.is_primitive_numeric() => {
146
with_match_physical_numeric_polars_type!(self.dtype(), |$T| {
147
let ca: &ChunkedArray<$T> = self.as_ref().as_ref().as_ref();
148
ca.take_chunked_unchecked(by, sorted, avoid_sharing).into_series()
149
})
150
},
151
Boolean => {
152
let ca = self.bool().unwrap();
153
ca.take_chunked_unchecked(by, sorted, avoid_sharing)
154
.into_series()
155
},
156
Binary => {
157
let ca = self.binary().unwrap();
158
take_chunked_unchecked_binview(ca, by, sorted, avoid_sharing).into_series()
159
},
160
String => {
161
let ca = self.str().unwrap();
162
take_chunked_unchecked_binview(ca, by, sorted, avoid_sharing).into_series()
163
},
164
List(_) => {
165
let ca = self.list().unwrap();
166
ca.take_chunked_unchecked(by, sorted, avoid_sharing)
167
.into_series()
168
},
169
#[cfg(feature = "dtype-array")]
170
Array(_, _) => {
171
let ca = self.array().unwrap();
172
ca.take_chunked_unchecked(by, sorted, avoid_sharing)
173
.into_series()
174
},
175
#[cfg(feature = "dtype-struct")]
176
Struct(_) => {
177
let ca = self.struct_().unwrap();
178
take_chunked_unchecked_struct(ca, by, sorted, avoid_sharing).into_series()
179
},
180
#[cfg(feature = "object")]
181
Object(_) => take_unchecked_object(self, by, sorted),
182
#[cfg(feature = "dtype-decimal")]
183
Decimal(_, _) => {
184
let ca = self.decimal().unwrap();
185
let out = ca.phys.take_chunked_unchecked(by, sorted, avoid_sharing);
186
out.into_decimal_unchecked(ca.precision(), ca.scale())
187
.into_series()
188
},
189
#[cfg(feature = "dtype-date")]
190
Date => {
191
let ca = self.date().unwrap();
192
ca.physical()
193
.take_chunked_unchecked(by, sorted, avoid_sharing)
194
.into_date()
195
.into_series()
196
},
197
#[cfg(feature = "dtype-datetime")]
198
Datetime(u, z) => {
199
let ca = self.datetime().unwrap();
200
ca.physical()
201
.take_chunked_unchecked(by, sorted, avoid_sharing)
202
.into_datetime(*u, z.clone())
203
.into_series()
204
},
205
#[cfg(feature = "dtype-duration")]
206
Duration(u) => {
207
let ca = self.duration().unwrap();
208
ca.physical()
209
.take_chunked_unchecked(by, sorted, avoid_sharing)
210
.into_duration(*u)
211
.into_series()
212
},
213
#[cfg(feature = "dtype-time")]
214
Time => {
215
let ca = self.time().unwrap();
216
ca.physical()
217
.take_chunked_unchecked(by, sorted, avoid_sharing)
218
.into_time()
219
.into_series()
220
},
221
#[cfg(feature = "dtype-categorical")]
222
Categorical(_, _) | Enum(_, _) => {
223
with_match_categorical_physical_type!(self.dtype().cat_physical().unwrap(), |$C| {
224
let ca = self.cat::<$C>().unwrap();
225
CategoricalChunked::<$C>::from_cats_and_dtype_unchecked(
226
ca.physical().take_chunked_unchecked(by, sorted, avoid_sharing),
227
self.dtype().clone()
228
)
229
.into_series()
230
})
231
},
232
Null => Series::new_null(self.name().clone(), by.len()),
233
_ => unreachable!(),
234
}
235
}
236
237
/// Take function that checks of null state in `ChunkIdx`.
238
unsafe fn take_opt_chunked_unchecked<const B: u64>(
239
&self,
240
by: &[ChunkId<B>],
241
avoid_sharing: bool,
242
) -> Self {
243
use DataType::*;
244
match self.dtype() {
245
dt if dt.is_primitive_numeric() => {
246
with_match_physical_numeric_polars_type!(self.dtype(), |$T| {
247
let ca: &ChunkedArray<$T> = self.as_ref().as_ref().as_ref();
248
ca.take_opt_chunked_unchecked(by, avoid_sharing).into_series()
249
})
250
},
251
Boolean => {
252
let ca = self.bool().unwrap();
253
ca.take_opt_chunked_unchecked(by, avoid_sharing)
254
.into_series()
255
},
256
Binary => {
257
let ca = self.binary().unwrap();
258
take_opt_chunked_unchecked_binview(ca, by, avoid_sharing).into_series()
259
},
260
String => {
261
let ca = self.str().unwrap();
262
take_opt_chunked_unchecked_binview(ca, by, avoid_sharing).into_series()
263
},
264
List(_) => {
265
let ca = self.list().unwrap();
266
ca.take_opt_chunked_unchecked(by, avoid_sharing)
267
.into_series()
268
},
269
#[cfg(feature = "dtype-array")]
270
Array(_, _) => {
271
let ca = self.array().unwrap();
272
ca.take_opt_chunked_unchecked(by, avoid_sharing)
273
.into_series()
274
},
275
#[cfg(feature = "dtype-struct")]
276
Struct(_) => {
277
let ca = self.struct_().unwrap();
278
take_opt_chunked_unchecked_struct(ca, by, avoid_sharing).into_series()
279
},
280
#[cfg(feature = "object")]
281
Object(_) => take_opt_unchecked_object(self, by, avoid_sharing),
282
#[cfg(feature = "dtype-decimal")]
283
Decimal(_, _) => {
284
let ca = self.decimal().unwrap();
285
let out = ca.phys.take_opt_chunked_unchecked(by, avoid_sharing);
286
out.into_decimal_unchecked(ca.precision(), ca.scale())
287
.into_series()
288
},
289
#[cfg(feature = "dtype-date")]
290
Date => {
291
let ca = self.date().unwrap();
292
ca.physical()
293
.take_opt_chunked_unchecked(by, avoid_sharing)
294
.into_date()
295
.into_series()
296
},
297
#[cfg(feature = "dtype-datetime")]
298
Datetime(u, z) => {
299
let ca = self.datetime().unwrap();
300
ca.physical()
301
.take_opt_chunked_unchecked(by, avoid_sharing)
302
.into_datetime(*u, z.clone())
303
.into_series()
304
},
305
#[cfg(feature = "dtype-duration")]
306
Duration(u) => {
307
let ca = self.duration().unwrap();
308
ca.physical()
309
.take_opt_chunked_unchecked(by, avoid_sharing)
310
.into_duration(*u)
311
.into_series()
312
},
313
#[cfg(feature = "dtype-time")]
314
Time => {
315
let ca = self.time().unwrap();
316
ca.physical()
317
.take_opt_chunked_unchecked(by, avoid_sharing)
318
.into_time()
319
.into_series()
320
},
321
#[cfg(feature = "dtype-categorical")]
322
Categorical(_, _) | Enum(_, _) => {
323
with_match_categorical_physical_type!(self.dtype().cat_physical().unwrap(), |$C| {
324
let ca = self.cat::<$C>().unwrap();
325
CategoricalChunked::<$C>::from_cats_and_dtype_unchecked(
326
ca.physical().take_opt_chunked_unchecked(by, avoid_sharing),
327
self.dtype().clone()
328
)
329
.into_series()
330
})
331
},
332
Null => Series::new_null(self.name().clone(), by.len()),
333
_ => unreachable!(),
334
}
335
}
336
}
337
338
impl<T> TakeChunked for ChunkedArray<T>
339
where
340
T: PolarsDataType,
341
T::Array: Debug,
342
{
343
unsafe fn take_chunked_unchecked<const B: u64>(
344
&self,
345
by: &[ChunkId<B>],
346
sorted: IsSorted,
347
_allow_sharing: bool,
348
) -> Self {
349
let arrow_dtype = self.dtype().to_arrow(CompatLevel::newest());
350
351
let mut out = if !self.has_nulls() {
352
let iter = by.iter().map(|chunk_id| {
353
debug_assert!(
354
!chunk_id.is_null(),
355
"null chunks should not hit this branch"
356
);
357
let (chunk_idx, array_idx) = chunk_id.extract();
358
let arr = self.downcast_get_unchecked(chunk_idx as usize);
359
arr.value_unchecked(array_idx as usize)
360
});
361
362
let arr = iter.collect_arr_trusted_with_dtype(arrow_dtype);
363
ChunkedArray::with_chunk_like(self, arr)
364
} else {
365
let iter = by.iter().map(|chunk_id| {
366
debug_assert!(
367
!chunk_id.is_null(),
368
"null chunks should not hit this branch"
369
);
370
let (chunk_idx, array_idx) = chunk_id.extract();
371
let arr = self.downcast_get_unchecked(chunk_idx as usize);
372
arr.get_unchecked(array_idx as usize)
373
});
374
375
let arr = iter.collect_arr_trusted_with_dtype(arrow_dtype);
376
ChunkedArray::with_chunk_like(self, arr)
377
};
378
let sorted_flag = _update_gather_sorted_flag(self.is_sorted_flag(), sorted);
379
out.set_sorted_flag(sorted_flag);
380
out
381
}
382
383
// Take function that checks of null state in `ChunkIdx`.
384
unsafe fn take_opt_chunked_unchecked<const B: u64>(
385
&self,
386
by: &[ChunkId<B>],
387
_allow_sharing: bool,
388
) -> Self {
389
let arrow_dtype = self.dtype().to_arrow(CompatLevel::newest());
390
391
if !self.has_nulls() {
392
let arr = by
393
.iter()
394
.map(|chunk_id| {
395
if chunk_id.is_null() {
396
None
397
} else {
398
let (chunk_idx, array_idx) = chunk_id.extract();
399
let arr = self.downcast_get_unchecked(chunk_idx as usize);
400
Some(arr.value_unchecked(array_idx as usize).clone())
401
}
402
})
403
.collect_arr_trusted_with_dtype(arrow_dtype);
404
ChunkedArray::with_chunk_like(self, arr)
405
} else {
406
let arr = by
407
.iter()
408
.map(|chunk_id| {
409
if chunk_id.is_null() {
410
None
411
} else {
412
let (chunk_idx, array_idx) = chunk_id.extract();
413
let arr = self.downcast_get_unchecked(chunk_idx as usize);
414
arr.get_unchecked(array_idx as usize)
415
}
416
})
417
.collect_arr_trusted_with_dtype(arrow_dtype);
418
419
ChunkedArray::with_chunk_like(self, arr)
420
}
421
}
422
}
423
424
#[cfg(feature = "object")]
425
unsafe fn take_unchecked_object<const B: u64>(
426
s: &Series,
427
by: &[ChunkId<B>],
428
_sorted: IsSorted,
429
) -> Series {
430
use polars_core::chunked_array::object::registry::get_object_builder;
431
432
let mut builder = get_object_builder(s.name().clone(), by.len());
433
434
by.iter().for_each(|chunk_id| {
435
let (chunk_idx, array_idx) = chunk_id.extract();
436
let object = s.get_object_chunked_unchecked(chunk_idx as usize, array_idx as usize);
437
builder.append_option(object.map(|v| v.as_any()))
438
});
439
builder.to_series()
440
}
441
442
#[cfg(feature = "object")]
443
unsafe fn take_opt_unchecked_object<const B: u64>(
444
s: &Series,
445
by: &[ChunkId<B>],
446
_allow_sharing: bool,
447
) -> Series {
448
use polars_core::chunked_array::object::registry::get_object_builder;
449
450
let mut builder = get_object_builder(s.name().clone(), by.len());
451
452
by.iter().for_each(|chunk_id| {
453
if chunk_id.is_null() {
454
builder.append_null()
455
} else {
456
let (chunk_idx, array_idx) = chunk_id.extract();
457
let object = s.get_object_chunked_unchecked(chunk_idx as usize, array_idx as usize);
458
builder.append_option(object.map(|v| v.as_any()))
459
}
460
});
461
builder.to_series()
462
}
463
464
unsafe fn take_chunked_unchecked_binview<const B: u64, T, V>(
465
ca: &ChunkedArray<T>,
466
by: &[ChunkId<B>],
467
sorted: IsSorted,
468
avoid_sharing: bool,
469
) -> ChunkedArray<T>
470
where
471
T: PolarsDataType<Array = BinaryViewArrayGeneric<V>>,
472
T::Array: Debug,
473
V: ViewType + ?Sized,
474
{
475
if avoid_sharing {
476
return ca.take_chunked_unchecked(by, sorted, avoid_sharing);
477
}
478
479
let mut views = Vec::with_capacity(by.len());
480
let (validity, arc_data_buffers);
481
482
// If we can cheaply clone the list of buffers from the ChunkedArray we will,
483
// otherwise we will only clone those buffers we need.
484
if ca.n_chunks() == 1 {
485
let arr = ca.downcast_iter().next().unwrap();
486
let arr_views = arr.views();
487
488
validity = if arr.has_nulls() {
489
let mut validity = BitmapBuilder::with_capacity(by.len());
490
for id in by.iter() {
491
let (chunk_idx, array_idx) = id.extract();
492
debug_assert!(chunk_idx == 0);
493
if arr.is_null_unchecked(array_idx as usize) {
494
views.push_unchecked(View::default());
495
validity.push_unchecked(false);
496
} else {
497
views.push_unchecked(*arr_views.get_unchecked(array_idx as usize));
498
validity.push_unchecked(true);
499
}
500
}
501
Some(validity.freeze())
502
} else {
503
for id in by.iter() {
504
let (chunk_idx, array_idx) = id.extract();
505
debug_assert!(chunk_idx == 0);
506
views.push_unchecked(*arr_views.get_unchecked(array_idx as usize));
507
}
508
None
509
};
510
511
arc_data_buffers = arr.data_buffers().clone();
512
}
513
// Dedup the buffers while creating the views.
514
else if by.len() < ca.n_chunks() {
515
let mut buffer_idxs = PlHashMap::with_capacity(8);
516
let mut buffers = Vec::with_capacity(8);
517
518
validity = if ca.has_nulls() {
519
let mut validity = BitmapBuilder::with_capacity(by.len());
520
for id in by.iter() {
521
let (chunk_idx, array_idx) = id.extract();
522
523
let arr = ca.downcast_get_unchecked(chunk_idx as usize);
524
if arr.is_null_unchecked(array_idx as usize) {
525
views.push_unchecked(View::default());
526
validity.push_unchecked(false);
527
} else {
528
let view = *arr.views().get_unchecked(array_idx as usize);
529
views.push_unchecked(update_view_and_dedup(
530
view,
531
arr.data_buffers(),
532
&mut buffer_idxs,
533
&mut buffers,
534
));
535
validity.push_unchecked(true);
536
}
537
}
538
Some(validity.freeze())
539
} else {
540
for id in by.iter() {
541
let (chunk_idx, array_idx) = id.extract();
542
543
let arr = ca.downcast_get_unchecked(chunk_idx as usize);
544
let view = *arr.views().get_unchecked(array_idx as usize);
545
views.push_unchecked(update_view_and_dedup(
546
view,
547
arr.data_buffers(),
548
&mut buffer_idxs,
549
&mut buffers,
550
));
551
}
552
None
553
};
554
555
arc_data_buffers = buffers.into();
556
}
557
// Dedup the buffers up front
558
else {
559
let (buffers, buffer_offsets) = dedup_buffers_by_arc(ca);
560
561
validity = if ca.has_nulls() {
562
let mut validity = BitmapBuilder::with_capacity(by.len());
563
for id in by.iter() {
564
let (chunk_idx, array_idx) = id.extract();
565
566
let arr = ca.downcast_get_unchecked(chunk_idx as usize);
567
if arr.is_null_unchecked(array_idx as usize) {
568
views.push_unchecked(View::default());
569
validity.push_unchecked(false);
570
} else {
571
let view = *arr.views().get_unchecked(array_idx as usize);
572
let view = rewrite_view(view, chunk_idx, &buffer_offsets);
573
views.push_unchecked(view);
574
validity.push_unchecked(true);
575
}
576
}
577
Some(validity.freeze())
578
} else {
579
for id in by.iter() {
580
let (chunk_idx, array_idx) = id.extract();
581
582
let arr = ca.downcast_get_unchecked(chunk_idx as usize);
583
let view = *arr.views().get_unchecked(array_idx as usize);
584
let view = rewrite_view(view, chunk_idx, &buffer_offsets);
585
views.push_unchecked(view);
586
}
587
None
588
};
589
590
arc_data_buffers = buffers.into();
591
};
592
593
let arr = BinaryViewArrayGeneric::<V>::new_unchecked_unknown_md(
594
V::DATA_TYPE,
595
views.into(),
596
arc_data_buffers,
597
validity,
598
None,
599
);
600
601
let mut out = ChunkedArray::with_chunk(ca.name().clone(), arr.maybe_gc());
602
let sorted_flag = _update_gather_sorted_flag(ca.is_sorted_flag(), sorted);
603
out.set_sorted_flag(sorted_flag);
604
out
605
}
606
607
#[allow(clippy::unnecessary_cast)]
608
#[inline(always)]
609
unsafe fn rewrite_view(mut view: View, chunk_idx: IdxSize, buffer_offsets: &[u32]) -> View {
610
if view.length > 12 {
611
let base_offset = *buffer_offsets.get_unchecked(chunk_idx as usize);
612
view.buffer_idx += base_offset;
613
}
614
view
615
}
616
617
unsafe fn update_view_and_dedup(
618
mut view: View,
619
orig_buffers: &[Buffer<u8>],
620
buffer_idxs: &mut PlHashMap<(*const u8, usize), u32>,
621
buffers: &mut Vec<Buffer<u8>>,
622
) -> View {
623
if view.length > 12 {
624
// Dedup on pointer + length.
625
let orig_buffer = orig_buffers.get_unchecked(view.buffer_idx as usize);
626
view.buffer_idx =
627
match buffer_idxs.entry((orig_buffer.as_slice().as_ptr(), orig_buffer.len())) {
628
Entry::Occupied(o) => *o.get(),
629
Entry::Vacant(v) => {
630
let buffer_idx = buffers.len() as u32;
631
buffers.push(orig_buffer.clone());
632
v.insert(buffer_idx);
633
buffer_idx
634
},
635
};
636
}
637
view
638
}
639
640
fn dedup_buffers_by_arc<T, V>(ca: &ChunkedArray<T>) -> (Vec<Buffer<u8>>, Vec<u32>)
641
where
642
T: PolarsDataType<Array = BinaryViewArrayGeneric<V>>,
643
V: ViewType + ?Sized,
644
{
645
// Dedup buffers up front. Note: don't do this during view update, as this is often is much
646
// more costly.
647
let mut buffers = Vec::with_capacity(ca.chunks().len());
648
// Dont need to include the length, as we look at the arc pointers, which are immutable.
649
let mut buffers_dedup = PlHashMap::with_capacity(ca.chunks().len());
650
let mut buffer_offsets = Vec::with_capacity(ca.chunks().len() + 1);
651
652
for arr in ca.downcast_iter() {
653
let data_buffers = arr.data_buffers();
654
let arc_ptr = data_buffers.as_ptr();
655
let offset = match buffers_dedup.entry(arc_ptr) {
656
Entry::Occupied(o) => *o.get(),
657
Entry::Vacant(v) => {
658
let offset = buffers.len() as u32;
659
buffers.extend(data_buffers.iter().cloned());
660
v.insert(offset);
661
offset
662
},
663
};
664
buffer_offsets.push(offset);
665
}
666
(buffers, buffer_offsets)
667
}
668
669
unsafe fn take_opt_chunked_unchecked_binview<const B: u64, T, V>(
670
ca: &ChunkedArray<T>,
671
by: &[ChunkId<B>],
672
avoid_sharing: bool,
673
) -> ChunkedArray<T>
674
where
675
T: PolarsDataType<Array = BinaryViewArrayGeneric<V>>,
676
T::Array: Debug,
677
V: ViewType + ?Sized,
678
{
679
if avoid_sharing {
680
return ca.take_opt_chunked_unchecked(by, avoid_sharing);
681
}
682
683
let mut views = Vec::with_capacity(by.len());
684
let mut validity = BitmapBuilder::with_capacity(by.len());
685
686
// If we can cheaply clone the list of buffers from the ChunkedArray we will,
687
// otherwise we will only clone those buffers we need.
688
let arc_data_buffers = if ca.n_chunks() == 1 {
689
let arr = ca.downcast_iter().next().unwrap();
690
let arr_views = arr.views();
691
692
if arr.has_nulls() {
693
for id in by.iter() {
694
let (chunk_idx, array_idx) = id.extract();
695
debug_assert!(id.is_null() || chunk_idx == 0);
696
if id.is_null() || arr.is_null_unchecked(array_idx as usize) {
697
views.push_unchecked(View::default());
698
validity.push_unchecked(false);
699
} else {
700
views.push_unchecked(*arr_views.get_unchecked(array_idx as usize));
701
validity.push_unchecked(true);
702
}
703
}
704
} else {
705
for id in by.iter() {
706
let (chunk_idx, array_idx) = id.extract();
707
debug_assert!(id.is_null() || chunk_idx == 0);
708
if id.is_null() {
709
views.push_unchecked(View::default());
710
validity.push_unchecked(false);
711
} else {
712
views.push_unchecked(*arr_views.get_unchecked(array_idx as usize));
713
validity.push_unchecked(true);
714
}
715
}
716
}
717
718
arr.data_buffers().clone()
719
}
720
// Dedup the buffers while creating the views.
721
else if by.len() < ca.n_chunks() {
722
let mut buffer_idxs = PlHashMap::with_capacity(8);
723
let mut buffers = Vec::with_capacity(8);
724
725
if ca.has_nulls() {
726
for id in by.iter() {
727
let (chunk_idx, array_idx) = id.extract();
728
729
if id.is_null() {
730
views.push_unchecked(View::default());
731
validity.push_unchecked(false);
732
} else {
733
let arr = ca.downcast_get_unchecked(chunk_idx as usize);
734
if arr.is_null_unchecked(array_idx as usize) {
735
views.push_unchecked(View::default());
736
validity.push_unchecked(false);
737
} else {
738
let view = *arr.views().get_unchecked(array_idx as usize);
739
views.push_unchecked(update_view_and_dedup(
740
view,
741
arr.data_buffers(),
742
&mut buffer_idxs,
743
&mut buffers,
744
));
745
validity.push_unchecked(true);
746
}
747
}
748
}
749
} else {
750
for id in by.iter() {
751
let (chunk_idx, array_idx) = id.extract();
752
753
if id.is_null() {
754
views.push_unchecked(View::default());
755
validity.push_unchecked(false);
756
} else {
757
let arr = ca.downcast_get_unchecked(chunk_idx as usize);
758
let view = *arr.views().get_unchecked(array_idx as usize);
759
views.push_unchecked(update_view_and_dedup(
760
view,
761
arr.data_buffers(),
762
&mut buffer_idxs,
763
&mut buffers,
764
));
765
validity.push_unchecked(true);
766
}
767
}
768
};
769
770
buffers.into()
771
}
772
// Dedup the buffers up front
773
else {
774
let (buffers, buffer_offsets) = dedup_buffers_by_arc(ca);
775
776
if ca.has_nulls() {
777
for id in by.iter() {
778
let (chunk_idx, array_idx) = id.extract();
779
780
if id.is_null() {
781
views.push_unchecked(View::default());
782
validity.push_unchecked(false);
783
} else {
784
let arr = ca.downcast_get_unchecked(chunk_idx as usize);
785
if arr.is_null_unchecked(array_idx as usize) {
786
views.push_unchecked(View::default());
787
validity.push_unchecked(false);
788
} else {
789
let view = *arr.views().get_unchecked(array_idx as usize);
790
let view = rewrite_view(view, chunk_idx, &buffer_offsets);
791
views.push_unchecked(view);
792
validity.push_unchecked(true);
793
}
794
}
795
}
796
} else {
797
for id in by.iter() {
798
let (chunk_idx, array_idx) = id.extract();
799
800
if id.is_null() {
801
views.push_unchecked(View::default());
802
validity.push_unchecked(false);
803
} else {
804
let arr = ca.downcast_get_unchecked(chunk_idx as usize);
805
let view = *arr.views().get_unchecked(array_idx as usize);
806
let view = rewrite_view(view, chunk_idx, &buffer_offsets);
807
views.push_unchecked(view);
808
validity.push_unchecked(true);
809
}
810
}
811
};
812
813
buffers.into()
814
};
815
816
let arr = BinaryViewArrayGeneric::<V>::new_unchecked_unknown_md(
817
V::DATA_TYPE,
818
views.into(),
819
arc_data_buffers,
820
Some(validity.freeze()),
821
None,
822
);
823
824
ChunkedArray::with_chunk(ca.name().clone(), arr.maybe_gc())
825
}
826
827
#[cfg(feature = "dtype-struct")]
828
unsafe fn take_chunked_unchecked_struct<const B: u64>(
829
ca: &StructChunked,
830
by: &[ChunkId<B>],
831
sorted: IsSorted,
832
avoid_sharing: bool,
833
) -> StructChunked {
834
let fields = ca
835
.fields_as_series()
836
.iter()
837
.map(|s| s.take_chunked_unchecked(by, sorted, avoid_sharing))
838
.collect::<Vec<_>>();
839
let mut out = StructChunked::from_series(ca.name().clone(), by.len(), fields.iter()).unwrap();
840
841
if !ca.has_nulls() {
842
return out;
843
}
844
845
let mut validity = BitmapBuilder::with_capacity(by.len());
846
if ca.n_chunks() == 1 {
847
let arr = ca.downcast_as_array();
848
let bitmap = arr.validity().unwrap();
849
for id in by.iter() {
850
let (chunk_idx, array_idx) = id.extract();
851
debug_assert!(chunk_idx == 0);
852
validity.push_unchecked(bitmap.get_bit_unchecked(array_idx as usize));
853
}
854
} else {
855
for id in by.iter() {
856
let (chunk_idx, array_idx) = id.extract();
857
let arr = ca.downcast_get_unchecked(chunk_idx as usize);
858
if let Some(bitmap) = arr.validity() {
859
validity.push_unchecked(bitmap.get_bit_unchecked(array_idx as usize));
860
} else {
861
validity.push_unchecked(true);
862
}
863
}
864
}
865
866
out.rechunk_mut(); // Should be a no-op.
867
out.downcast_iter_mut()
868
.next()
869
.unwrap()
870
.set_validity(validity.into_opt_validity());
871
out
872
}
873
874
#[cfg(feature = "dtype-struct")]
875
unsafe fn take_opt_chunked_unchecked_struct<const B: u64>(
876
ca: &StructChunked,
877
by: &[ChunkId<B>],
878
avoid_sharing: bool,
879
) -> StructChunked {
880
let fields = ca
881
.fields_as_series()
882
.iter()
883
.map(|s| s.take_opt_chunked_unchecked(by, avoid_sharing))
884
.collect::<Vec<_>>();
885
let mut out = StructChunked::from_series(ca.name().clone(), by.len(), fields.iter()).unwrap();
886
887
let mut validity = BitmapBuilder::with_capacity(by.len());
888
if ca.n_chunks() == 1 {
889
let arr = ca.downcast_as_array();
890
if let Some(bitmap) = arr.validity() {
891
for id in by.iter() {
892
if id.is_null() {
893
validity.push_unchecked(false);
894
} else {
895
let (chunk_idx, array_idx) = id.extract();
896
debug_assert!(chunk_idx == 0);
897
validity.push_unchecked(bitmap.get_bit_unchecked(array_idx as usize));
898
}
899
}
900
} else {
901
for id in by.iter() {
902
validity.push_unchecked(!id.is_null());
903
}
904
}
905
} else {
906
for id in by.iter() {
907
if id.is_null() {
908
validity.push_unchecked(false);
909
} else {
910
let (chunk_idx, array_idx) = id.extract();
911
let arr = ca.downcast_get_unchecked(chunk_idx as usize);
912
if let Some(bitmap) = arr.validity() {
913
validity.push_unchecked(bitmap.get_bit_unchecked(array_idx as usize));
914
} else {
915
validity.push_unchecked(true);
916
}
917
}
918
}
919
}
920
921
out.rechunk_mut(); // Should be a no-op.
922
out.downcast_iter_mut()
923
.next()
924
.unwrap()
925
.set_validity(validity.into_opt_validity());
926
out
927
}
928
929
#[cfg(test)]
930
mod test {
931
use super::*;
932
933
#[test]
934
fn test_binview_chunked_gather() {
935
unsafe {
936
// # Series without nulls;
937
let mut s_1 = Series::new(
938
"a".into(),
939
&["1 loooooooooooong string", "2 loooooooooooong string"],
940
);
941
let s_2 = Series::new(
942
"a".into(),
943
&["11 loooooooooooong string", "22 loooooooooooong string"],
944
);
945
let s_3 = Series::new(
946
"a".into(),
947
&[
948
"111 loooooooooooong string",
949
"222 loooooooooooong string",
950
"small", // this tests we don't mess with the inlined view
951
],
952
);
953
s_1.append(&s_2).unwrap();
954
s_1.append(&s_3).unwrap();
955
956
assert_eq!(s_1.n_chunks(), 3);
957
958
// ## Ids without nulls;
959
let by: [ChunkId<24>; 7] = [
960
ChunkId::store(0, 0),
961
ChunkId::store(0, 1),
962
ChunkId::store(1, 1),
963
ChunkId::store(1, 0),
964
ChunkId::store(2, 0),
965
ChunkId::store(2, 1),
966
ChunkId::store(2, 2),
967
];
968
969
let out = s_1.take_chunked_unchecked(&by, IsSorted::Not, true);
970
let idx = IdxCa::new("".into(), [0, 1, 3, 2, 4, 5, 6]);
971
let expected = s_1.rechunk().take(&idx).unwrap();
972
assert!(out.equals(&expected));
973
974
// ## Ids with nulls;
975
let by: [ChunkId<24>; 4] = [
976
ChunkId::null(),
977
ChunkId::store(0, 1),
978
ChunkId::store(1, 1),
979
ChunkId::store(1, 0),
980
];
981
let out = s_1.take_opt_chunked_unchecked(&by, true);
982
983
let idx = IdxCa::new("".into(), [None, Some(1), Some(3), Some(2)]);
984
let expected = s_1.rechunk().take(&idx).unwrap();
985
assert!(out.equals_missing(&expected));
986
987
// # Series with nulls;
988
let mut s_1 = Series::new(
989
"a".into(),
990
&["1 loooooooooooong string 1", "2 loooooooooooong string 2"],
991
);
992
let s_2 = Series::new("a".into(), &[Some("11 loooooooooooong string 11"), None]);
993
s_1.append(&s_2).unwrap();
994
995
// ## Ids without nulls;
996
let by: [ChunkId<24>; 4] = [
997
ChunkId::store(0, 0),
998
ChunkId::store(0, 1),
999
ChunkId::store(1, 1),
1000
ChunkId::store(1, 0),
1001
];
1002
1003
let out = s_1.take_chunked_unchecked(&by, IsSorted::Not, true);
1004
let idx = IdxCa::new("".into(), [0, 1, 3, 2]);
1005
let expected = s_1.rechunk().take(&idx).unwrap();
1006
assert!(out.equals_missing(&expected));
1007
1008
// ## Ids with nulls;
1009
let by: [ChunkId<24>; 4] = [
1010
ChunkId::null(),
1011
ChunkId::store(0, 1),
1012
ChunkId::store(1, 1),
1013
ChunkId::store(1, 0),
1014
];
1015
let out = s_1.take_opt_chunked_unchecked(&by, true);
1016
1017
let idx = IdxCa::new("".into(), [None, Some(1), Some(3), Some(2)]);
1018
let expected = s_1.rechunk().take(&idx).unwrap();
1019
assert!(out.equals_missing(&expected));
1020
}
1021
}
1022
1023
#[test]
1024
#[cfg(feature = "dtype-categorical")]
1025
fn test_list_categorical_dtype_preserved_after_take() {
1026
use polars_core::prelude::*;
1027
1028
unsafe {
1029
// Create List(String) and convert to List(Categorical)
1030
let mut builder = ListStringChunkedBuilder::new("a".into(), 2, 3);
1031
builder.append_values_iter(["a", "b"].iter().copied());
1032
builder.append_values_iter(["c", "d"].iter().copied());
1033
let list_str = builder.finish().into_series();
1034
1035
let list_cat = list_str
1036
.list()
1037
.unwrap()
1038
.apply_to_inner(&|s| s.cast(&DataType::from_categories(Categories::global())))
1039
.unwrap()
1040
.into_series();
1041
1042
// Append to create chunked series
1043
let mut chunked = list_cat.clone();
1044
chunked.append(&list_cat).unwrap();
1045
assert_eq!(chunked.n_chunks(), 2);
1046
1047
// Perform chunked take
1048
let by: [ChunkId<24>; 2] = [ChunkId::store(0, 0), ChunkId::store(1, 0)];
1049
let out = chunked.take_chunked_unchecked(&by, IsSorted::Not, false);
1050
1051
// Verify the Polars dtype is preserved
1052
// The bug was that List(Categorical) was becoming List(UInt32) after take
1053
assert!(
1054
matches!(out.dtype(), DataType::List(inner) if matches!(inner.as_ref(), DataType::Categorical(_, _))),
1055
"List(Categorical) dtype should be preserved after take_chunked_unchecked. Got: {:?}",
1056
out.dtype()
1057
);
1058
}
1059
}
1060
}
1061
1062