Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/compute/concatenate.rs
8479 views
1
use hashbrown::hash_map::Entry;
2
use polars_buffer::Buffer;
3
use polars_error::{PolarsResult, polars_bail};
4
use polars_utils::aliases::{InitHashMaps, PlHashMap};
5
use polars_utils::itertools::Itertools;
6
use polars_utils::vec::PushUnchecked;
7
8
use crate::array::*;
9
use crate::bitmap::{Bitmap, BitmapBuilder};
10
use crate::datatypes::PhysicalType;
11
use crate::offset::Offsets;
12
use crate::types::{NativeType, Offset};
13
use crate::with_match_primitive_type_full;
14
15
/// Concatenate multiple [`Array`] of the same type into a single [`Array`].
16
pub fn concatenate(arrays: &[&dyn Array]) -> PolarsResult<Box<dyn Array>> {
17
if arrays.is_empty() {
18
polars_bail!(InvalidOperation: "concat requires input of at least one array")
19
}
20
21
if arrays
22
.iter()
23
.any(|array| array.dtype() != arrays[0].dtype())
24
{
25
polars_bail!(InvalidOperation: "It is not possible to concatenate arrays of different data types.")
26
}
27
28
concatenate_unchecked(arrays)
29
}
30
31
fn len_null_count<A: AsRef<dyn Array>>(arrays: &[A]) -> (usize, usize) {
32
let mut len = 0;
33
let mut null_count = 0;
34
for arr in arrays {
35
let arr = arr.as_ref();
36
len += arr.len();
37
null_count += arr.null_count();
38
}
39
(len, null_count)
40
}
41
42
/// Concatenate the validities of multiple [Array]s into a single Bitmap.
43
pub fn concatenate_validities<A: AsRef<dyn Array>>(arrays: &[A]) -> Option<Bitmap> {
44
let (len, null_count) = len_null_count(arrays);
45
concatenate_validities_with_len_null_count(arrays, len, null_count)
46
}
47
48
fn concatenate_validities_with_len_null_count<A: AsRef<dyn Array>>(
49
arrays: &[A],
50
len: usize,
51
null_count: usize,
52
) -> Option<Bitmap> {
53
if null_count == 0 {
54
return None;
55
}
56
57
let mut bitmap = BitmapBuilder::with_capacity(len);
58
for arr in arrays {
59
let arr = arr.as_ref();
60
if arr.null_count() == arr.len() {
61
bitmap.extend_constant(arr.len(), false);
62
} else if arr.null_count() == 0 {
63
bitmap.extend_constant(arr.len(), true);
64
} else {
65
bitmap.extend_from_bitmap(arr.validity().unwrap());
66
}
67
}
68
bitmap.into_opt_validity()
69
}
70
71
/// Concatenate multiple [`Array`] of the same type into a single [`Array`].
72
/// All arrays must be of the same dtype or a panic can occur.
73
pub fn concatenate_unchecked<A: AsRef<dyn Array>>(arrays: &[A]) -> PolarsResult<Box<dyn Array>> {
74
if arrays.is_empty() {
75
polars_bail!(InvalidOperation: "concat requires input of at least one array")
76
}
77
78
if arrays.len() == 1 {
79
return Ok(arrays[0].as_ref().to_boxed());
80
}
81
82
use PhysicalType::*;
83
match arrays[0].as_ref().dtype().to_physical_type() {
84
Null => Ok(Box::new(concatenate_null(arrays))),
85
Boolean => Ok(Box::new(concatenate_bool(arrays))),
86
Primitive(ptype) => {
87
with_match_primitive_type_full!(ptype, |$T| {
88
Ok(Box::new(concatenate_primitive::<$T, _>(arrays)))
89
})
90
},
91
Binary => Ok(Box::new(concatenate_binary::<i32, _>(arrays)?)),
92
LargeBinary => Ok(Box::new(concatenate_binary::<i64, _>(arrays)?)),
93
Utf8 => Ok(Box::new(concatenate_utf8::<i32, _>(arrays)?)),
94
LargeUtf8 => Ok(Box::new(concatenate_utf8::<i64, _>(arrays)?)),
95
BinaryView => Ok(Box::new(concatenate_view::<[u8], _>(arrays))),
96
Utf8View => Ok(Box::new(concatenate_view::<str, _>(arrays))),
97
List => Ok(Box::new(concatenate_list::<i32, _>(arrays)?)),
98
LargeList => Ok(Box::new(concatenate_list::<i64, _>(arrays)?)),
99
FixedSizeBinary => Ok(Box::new(concatenate_fixed_size_binary(arrays)?)),
100
FixedSizeList => Ok(Box::new(concatenate_fixed_size_list(arrays)?)),
101
Struct => Ok(Box::new(concatenate_struct(arrays)?)),
102
Union => unimplemented!(),
103
Map => unimplemented!(),
104
Dictionary(_) => unimplemented!(),
105
}
106
}
107
108
fn concatenate_null<A: AsRef<dyn Array>>(arrays: &[A]) -> NullArray {
109
let dtype = arrays[0].as_ref().dtype().clone();
110
let total_len = arrays.iter().map(|arr| arr.as_ref().len()).sum();
111
NullArray::new(dtype, total_len)
112
}
113
114
fn concatenate_bool<A: AsRef<dyn Array>>(arrays: &[A]) -> BooleanArray {
115
let dtype = arrays[0].as_ref().dtype().clone();
116
let (total_len, null_count) = len_null_count(arrays);
117
let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
118
119
let mut bitmap = BitmapBuilder::with_capacity(total_len);
120
for arr in arrays {
121
let arr: &BooleanArray = arr.as_ref().as_any().downcast_ref().unwrap();
122
bitmap.extend_from_bitmap(arr.values());
123
}
124
BooleanArray::new(dtype, bitmap.freeze(), validity)
125
}
126
127
fn concatenate_primitive<T: NativeType, A: AsRef<dyn Array>>(arrays: &[A]) -> PrimitiveArray<T> {
128
let dtype = arrays[0].as_ref().dtype().clone();
129
let (total_len, null_count) = len_null_count(arrays);
130
let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
131
132
let mut out = Vec::with_capacity(total_len);
133
for arr in arrays {
134
let arr: &PrimitiveArray<T> = arr.as_ref().as_any().downcast_ref().unwrap();
135
out.extend_from_slice(arr.values());
136
}
137
unsafe { PrimitiveArray::new_unchecked(dtype, Buffer::from(out), validity) }
138
}
139
140
fn concatenate_binary<O: Offset, A: AsRef<dyn Array>>(
141
arrays: &[A],
142
) -> PolarsResult<BinaryArray<O>> {
143
let dtype = arrays[0].as_ref().dtype().clone();
144
let (total_len, null_count) = len_null_count(arrays);
145
let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
146
147
let total_bytes = arrays
148
.iter()
149
.map(|arr| {
150
let arr: &BinaryArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();
151
arr.get_values_size()
152
})
153
.sum();
154
155
let mut values = Vec::with_capacity(total_bytes);
156
let mut offsets = Offsets::<O>::with_capacity(total_len);
157
158
for arr in arrays {
159
let arr: &BinaryArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();
160
let first_offset = arr.offsets().first().to_usize();
161
let last_offset = arr.offsets().last().to_usize();
162
values.extend_from_slice(&arr.values()[first_offset..last_offset]);
163
for len in arr.offsets().lengths() {
164
offsets.try_push(len)?;
165
}
166
}
167
168
Ok(unsafe { BinaryArray::new(dtype, offsets.into(), values.into(), validity) })
169
}
170
171
fn concatenate_utf8<O: Offset, A: AsRef<dyn Array>>(arrays: &[A]) -> PolarsResult<Utf8Array<O>> {
172
let dtype = arrays[0].as_ref().dtype().clone();
173
let (total_len, null_count) = len_null_count(arrays);
174
let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
175
176
let total_bytes = arrays
177
.iter()
178
.map(|arr| {
179
let arr: &Utf8Array<O> = arr.as_ref().as_any().downcast_ref().unwrap();
180
arr.get_values_size()
181
})
182
.sum();
183
184
let mut bytes = Vec::with_capacity(total_bytes);
185
let mut offsets = Offsets::<O>::with_capacity(total_len);
186
187
for arr in arrays {
188
let arr: &Utf8Array<O> = arr.as_ref().as_any().downcast_ref().unwrap();
189
let first_offset = arr.offsets().first().to_usize();
190
let last_offset = arr.offsets().last().to_usize();
191
bytes.extend_from_slice(&arr.values()[first_offset..last_offset]);
192
for len in arr.offsets().lengths() {
193
offsets.try_push(len)?;
194
}
195
}
196
197
Ok(unsafe { Utf8Array::new_unchecked(dtype, offsets.into(), bytes.into(), validity) })
198
}
199
200
fn concatenate_view<V: ViewType + ?Sized, A: AsRef<dyn Array>>(
201
arrays: &[A],
202
) -> BinaryViewArrayGeneric<V> {
203
let dtype = arrays[0].as_ref().dtype().clone();
204
let (total_len, null_count) = len_null_count(arrays);
205
if total_len == 0 {
206
return BinaryViewArrayGeneric::new_empty(dtype);
207
}
208
let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
209
210
let first_arr: &BinaryViewArrayGeneric<V> = arrays[0].as_ref().as_any().downcast_ref().unwrap();
211
let mut total_nondedup_buffers = first_arr.data_buffers().len();
212
let mut max_arr_bufferset_len = 0;
213
let mut all_same_bufs = true;
214
for arr in arrays {
215
let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();
216
max_arr_bufferset_len = max_arr_bufferset_len.max(arr.data_buffers().len());
217
total_nondedup_buffers += arr.data_buffers().len();
218
// Fat pointer equality, checks both start and length.
219
all_same_bufs &= Buffer::is_same_buffer(arr.data_buffers(), first_arr.data_buffers());
220
}
221
222
let mut total_bytes_len = None;
223
let mut views = Vec::with_capacity(total_len);
224
225
let mut total_buffer_len = 0;
226
let buffers = if all_same_bufs {
227
total_buffer_len = first_arr.total_buffer_len();
228
let mut bytes_len = 0;
229
for arr in arrays {
230
let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();
231
views.extend_from_slice(arr.views());
232
bytes_len += arr.total_bytes_len();
233
}
234
total_bytes_len = Some(bytes_len);
235
Buffer::clone(first_arr.data_buffers())
236
237
// There might be way more buffers than elements, so we only dedup if there
238
// is at least one element per buffer on average.
239
} else if total_len > total_nondedup_buffers {
240
assert!(arrays.len() < u32::MAX as usize);
241
242
let mut dedup_buffers = Vec::with_capacity(total_nondedup_buffers);
243
let mut global_dedup_buffer_idx = PlHashMap::with_capacity(total_nondedup_buffers);
244
let mut local_dedup_buffer_idx = Vec::new();
245
local_dedup_buffer_idx.resize(max_arr_bufferset_len, (0, u32::MAX));
246
247
for (arr_idx, arr) in arrays.iter().enumerate() {
248
let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();
249
250
unsafe {
251
for mut view in arr.views().iter().copied() {
252
if view.length > View::MAX_INLINE_SIZE {
253
// Translate from old array-local buffer idx to global deduped buffer idx.
254
let (mut new_buffer_idx, cache_tag) =
255
*local_dedup_buffer_idx.get_unchecked(view.buffer_idx as usize);
256
if cache_tag != arr_idx as u32 {
257
// This buffer index wasn't seen before for this array, do a dedup lookup.
258
let buffer = arr.data_buffers().get_unchecked(view.buffer_idx as usize);
259
let buf_id = (buffer.as_slice().as_ptr(), buffer.len());
260
let idx = match global_dedup_buffer_idx.entry(buf_id) {
261
Entry::Occupied(o) => *o.get(),
262
Entry::Vacant(v) => {
263
let idx = dedup_buffers.len() as u32;
264
dedup_buffers.push(buffer.clone());
265
total_buffer_len += buffer.len();
266
v.insert(idx);
267
idx
268
},
269
};
270
271
// Cache result for future lookups.
272
*local_dedup_buffer_idx.get_unchecked_mut(view.buffer_idx as usize) =
273
(idx, arr_idx as u32);
274
new_buffer_idx = idx;
275
}
276
view.buffer_idx = new_buffer_idx;
277
}
278
279
views.push_unchecked(view);
280
}
281
}
282
}
283
284
dedup_buffers.into_iter().collect()
285
} else {
286
// Only very few of the total number of buffers is referenced, simply
287
// create a new direct buffer.
288
for arr in arrays {
289
let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();
290
total_buffer_len += arr
291
.len_iter()
292
.map(|l| if l > 12 { l as usize } else { 0 })
293
.sum::<usize>();
294
}
295
296
let mut unprocessed_buffer_len = total_buffer_len;
297
let mut new_buffers: Vec<Vec<u8>> = vec![Vec::with_capacity(
298
unprocessed_buffer_len.min(u32::MAX as usize),
299
)];
300
for arr in arrays {
301
let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();
302
let buffers = arr.data_buffers();
303
304
unsafe {
305
for mut view in arr.views().iter().copied() {
306
if view.length > 12 {
307
if new_buffers.last().unwrap_unchecked().len() + view.length as usize
308
>= u32::MAX as usize
309
{
310
new_buffers.push(Vec::with_capacity(
311
unprocessed_buffer_len.min(u32::MAX as usize),
312
));
313
}
314
let new_offset = new_buffers.last().unwrap_unchecked().len() as u32;
315
new_buffers
316
.last_mut()
317
.unwrap_unchecked()
318
.extend_from_slice(view.get_slice_unchecked(buffers));
319
view.offset = new_offset;
320
view.buffer_idx = new_buffers.len() as u32 - 1;
321
unprocessed_buffer_len -= view.length as usize;
322
}
323
views.push_unchecked(view);
324
}
325
}
326
}
327
328
new_buffers.into_iter().map(Buffer::from).collect()
329
};
330
331
unsafe {
332
BinaryViewArrayGeneric::new_unchecked(
333
dtype,
334
views.into(),
335
buffers,
336
validity,
337
total_bytes_len,
338
total_buffer_len,
339
)
340
}
341
}
342
343
fn concatenate_list<O: Offset, A: AsRef<dyn Array>>(arrays: &[A]) -> PolarsResult<ListArray<O>> {
344
let dtype = arrays[0].as_ref().dtype().clone();
345
let (total_len, null_count) = len_null_count(arrays);
346
let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
347
348
let mut num_sliced = 0;
349
let mut offsets = Offsets::<O>::with_capacity(total_len);
350
for arr in arrays {
351
let arr: &ListArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();
352
for len in arr.offsets().lengths() {
353
offsets.try_push(len)?;
354
}
355
let first_offset = arr.offsets().first().to_usize();
356
let offset_range = arr.offsets().range().to_usize();
357
num_sliced += (first_offset != 0 || offset_range != arr.values().len()) as usize;
358
}
359
360
let values = if num_sliced > 0 {
361
let inner_sliced_arrays = arrays
362
.iter()
363
.map(|arr| {
364
let arr: &ListArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();
365
let first_offset = arr.offsets().first().to_usize();
366
let offset_range = arr.offsets().range().to_usize();
367
if first_offset != 0 || offset_range != arr.values().len() {
368
arr.values().sliced(first_offset, offset_range)
369
} else {
370
arr.values().to_boxed()
371
}
372
})
373
.collect_vec();
374
concatenate_unchecked(&inner_sliced_arrays[..])?
375
} else {
376
let inner_arrays = arrays
377
.iter()
378
.map(|arr| {
379
let arr: &ListArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();
380
&**arr.values()
381
})
382
.collect_vec();
383
concatenate_unchecked(&inner_arrays)?
384
};
385
386
Ok(ListArray::new(dtype, offsets.into(), values, validity))
387
}
388
389
fn concatenate_fixed_size_binary<A: AsRef<dyn Array>>(
390
arrays: &[A],
391
) -> PolarsResult<FixedSizeBinaryArray> {
392
let dtype = arrays[0].as_ref().dtype().clone();
393
let (total_len, null_count) = len_null_count(arrays);
394
let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
395
396
let total_bytes = arrays
397
.iter()
398
.map(|arr| {
399
let arr: &FixedSizeBinaryArray = arr.as_ref().as_any().downcast_ref().unwrap();
400
arr.values().len()
401
})
402
.sum();
403
404
let mut bytes = Vec::with_capacity(total_bytes);
405
for arr in arrays {
406
let arr: &FixedSizeBinaryArray = arr.as_ref().as_any().downcast_ref().unwrap();
407
bytes.extend_from_slice(arr.values());
408
}
409
410
Ok(FixedSizeBinaryArray::new(dtype, bytes.into(), validity))
411
}
412
413
fn concatenate_fixed_size_list<A: AsRef<dyn Array>>(
414
arrays: &[A],
415
) -> PolarsResult<FixedSizeListArray> {
416
let dtype = arrays[0].as_ref().dtype().clone();
417
let (total_len, null_count) = len_null_count(arrays);
418
419
let inner_arrays = arrays
420
.iter()
421
.map(|arr| {
422
let arr: &FixedSizeListArray = arr.as_ref().as_any().downcast_ref().unwrap();
423
&**arr.values()
424
})
425
.collect_vec();
426
let values = concatenate_unchecked(&inner_arrays)?;
427
let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
428
Ok(FixedSizeListArray::new(dtype, total_len, values, validity))
429
}
430
431
fn concatenate_struct<A: AsRef<dyn Array>>(arrays: &[A]) -> PolarsResult<StructArray> {
432
let dtype = arrays[0].as_ref().dtype().clone();
433
let (total_len, null_count) = len_null_count(arrays);
434
let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
435
436
let first_arr: &StructArray = arrays[0].as_ref().as_any().downcast_ref().unwrap();
437
let num_fields = first_arr.values().len();
438
439
let mut inner_arrays = Vec::with_capacity(arrays.len());
440
let values = (0..num_fields)
441
.map(|f| {
442
inner_arrays.clear();
443
for arr in arrays {
444
let arr: &StructArray = arr.as_ref().as_any().downcast_ref().unwrap();
445
inner_arrays.push(&arr.values()[f]);
446
}
447
concatenate_unchecked(&inner_arrays)
448
})
449
.try_collect_vec()?;
450
451
Ok(StructArray::new(dtype, total_len, values, validity))
452
}
453
454