Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/compute/concatenate.rs
6939 views
1
use std::sync::Arc;
2
3
use hashbrown::hash_map::Entry;
4
use polars_error::{PolarsResult, polars_bail};
5
use polars_utils::aliases::{InitHashMaps, PlHashMap};
6
use polars_utils::itertools::Itertools;
7
use polars_utils::vec::PushUnchecked;
8
9
use crate::array::*;
10
use crate::bitmap::{Bitmap, BitmapBuilder};
11
use crate::buffer::Buffer;
12
use crate::datatypes::PhysicalType;
13
use crate::offset::Offsets;
14
use crate::types::{NativeType, Offset};
15
use crate::with_match_primitive_type_full;
16
17
/// Concatenate multiple [`Array`] of the same type into a single [`Array`].
18
pub fn concatenate(arrays: &[&dyn Array]) -> PolarsResult<Box<dyn Array>> {
19
if arrays.is_empty() {
20
polars_bail!(InvalidOperation: "concat requires input of at least one array")
21
}
22
23
if arrays
24
.iter()
25
.any(|array| array.dtype() != arrays[0].dtype())
26
{
27
polars_bail!(InvalidOperation: "It is not possible to concatenate arrays of different data types.")
28
}
29
30
concatenate_unchecked(arrays)
31
}
32
33
fn len_null_count<A: AsRef<dyn Array>>(arrays: &[A]) -> (usize, usize) {
34
let mut len = 0;
35
let mut null_count = 0;
36
for arr in arrays {
37
let arr = arr.as_ref();
38
len += arr.len();
39
null_count += arr.null_count();
40
}
41
(len, null_count)
42
}
43
44
/// Concatenate the validities of multiple [Array]s into a single Bitmap.
45
pub fn concatenate_validities<A: AsRef<dyn Array>>(arrays: &[A]) -> Option<Bitmap> {
46
let (len, null_count) = len_null_count(arrays);
47
concatenate_validities_with_len_null_count(arrays, len, null_count)
48
}
49
50
fn concatenate_validities_with_len_null_count<A: AsRef<dyn Array>>(
51
arrays: &[A],
52
len: usize,
53
null_count: usize,
54
) -> Option<Bitmap> {
55
if null_count == 0 {
56
return None;
57
}
58
59
let mut bitmap = BitmapBuilder::with_capacity(len);
60
for arr in arrays {
61
let arr = arr.as_ref();
62
if arr.null_count() == arr.len() {
63
bitmap.extend_constant(arr.len(), false);
64
} else if arr.null_count() == 0 {
65
bitmap.extend_constant(arr.len(), true);
66
} else {
67
bitmap.extend_from_bitmap(arr.validity().unwrap());
68
}
69
}
70
bitmap.into_opt_validity()
71
}
72
73
/// Concatenate multiple [`Array`] of the same type into a single [`Array`].
74
/// All arrays must be of the same dtype or a panic can occur.
75
pub fn concatenate_unchecked<A: AsRef<dyn Array>>(arrays: &[A]) -> PolarsResult<Box<dyn Array>> {
76
if arrays.is_empty() {
77
polars_bail!(InvalidOperation: "concat requires input of at least one array")
78
}
79
80
if arrays.len() == 1 {
81
return Ok(arrays[0].as_ref().to_boxed());
82
}
83
84
use PhysicalType::*;
85
match arrays[0].as_ref().dtype().to_physical_type() {
86
Null => Ok(Box::new(concatenate_null(arrays))),
87
Boolean => Ok(Box::new(concatenate_bool(arrays))),
88
Primitive(ptype) => {
89
with_match_primitive_type_full!(ptype, |$T| {
90
Ok(Box::new(concatenate_primitive::<$T, _>(arrays)))
91
})
92
},
93
Binary => Ok(Box::new(concatenate_binary::<i32, _>(arrays)?)),
94
LargeBinary => Ok(Box::new(concatenate_binary::<i64, _>(arrays)?)),
95
Utf8 => Ok(Box::new(concatenate_utf8::<i32, _>(arrays)?)),
96
LargeUtf8 => Ok(Box::new(concatenate_utf8::<i64, _>(arrays)?)),
97
BinaryView => Ok(Box::new(concatenate_view::<[u8], _>(arrays))),
98
Utf8View => Ok(Box::new(concatenate_view::<str, _>(arrays))),
99
List => Ok(Box::new(concatenate_list::<i32, _>(arrays)?)),
100
LargeList => Ok(Box::new(concatenate_list::<i64, _>(arrays)?)),
101
FixedSizeBinary => Ok(Box::new(concatenate_fixed_size_binary(arrays)?)),
102
FixedSizeList => Ok(Box::new(concatenate_fixed_size_list(arrays)?)),
103
Struct => Ok(Box::new(concatenate_struct(arrays)?)),
104
Union => unimplemented!(),
105
Map => unimplemented!(),
106
Dictionary(_) => unimplemented!(),
107
}
108
}
109
110
fn concatenate_null<A: AsRef<dyn Array>>(arrays: &[A]) -> NullArray {
111
let dtype = arrays[0].as_ref().dtype().clone();
112
let total_len = arrays.iter().map(|arr| arr.as_ref().len()).sum();
113
NullArray::new(dtype, total_len)
114
}
115
116
fn concatenate_bool<A: AsRef<dyn Array>>(arrays: &[A]) -> BooleanArray {
117
let dtype = arrays[0].as_ref().dtype().clone();
118
let (total_len, null_count) = len_null_count(arrays);
119
let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
120
121
let mut bitmap = BitmapBuilder::with_capacity(total_len);
122
for arr in arrays {
123
let arr: &BooleanArray = arr.as_ref().as_any().downcast_ref().unwrap();
124
bitmap.extend_from_bitmap(arr.values());
125
}
126
BooleanArray::new(dtype, bitmap.freeze(), validity)
127
}
128
129
fn concatenate_primitive<T: NativeType, A: AsRef<dyn Array>>(arrays: &[A]) -> PrimitiveArray<T> {
130
let dtype = arrays[0].as_ref().dtype().clone();
131
let (total_len, null_count) = len_null_count(arrays);
132
let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
133
134
let mut out = Vec::with_capacity(total_len);
135
for arr in arrays {
136
let arr: &PrimitiveArray<T> = arr.as_ref().as_any().downcast_ref().unwrap();
137
out.extend_from_slice(arr.values());
138
}
139
unsafe { PrimitiveArray::new_unchecked(dtype, Buffer::from(out), validity) }
140
}
141
142
fn concatenate_binary<O: Offset, A: AsRef<dyn Array>>(
143
arrays: &[A],
144
) -> PolarsResult<BinaryArray<O>> {
145
let dtype = arrays[0].as_ref().dtype().clone();
146
let (total_len, null_count) = len_null_count(arrays);
147
let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
148
149
let total_bytes = arrays
150
.iter()
151
.map(|arr| {
152
let arr: &BinaryArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();
153
arr.get_values_size()
154
})
155
.sum();
156
157
let mut values = Vec::with_capacity(total_bytes);
158
let mut offsets = Offsets::<O>::with_capacity(total_len);
159
160
for arr in arrays {
161
let arr: &BinaryArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();
162
let first_offset = arr.offsets().first().to_usize();
163
let last_offset = arr.offsets().last().to_usize();
164
values.extend_from_slice(&arr.values()[first_offset..last_offset]);
165
for len in arr.offsets().lengths() {
166
offsets.try_push(len)?;
167
}
168
}
169
170
Ok(unsafe { BinaryArray::new(dtype, offsets.into(), values.into(), validity) })
171
}
172
173
fn concatenate_utf8<O: Offset, A: AsRef<dyn Array>>(arrays: &[A]) -> PolarsResult<Utf8Array<O>> {
174
let dtype = arrays[0].as_ref().dtype().clone();
175
let (total_len, null_count) = len_null_count(arrays);
176
let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
177
178
let total_bytes = arrays
179
.iter()
180
.map(|arr| {
181
let arr: &Utf8Array<O> = arr.as_ref().as_any().downcast_ref().unwrap();
182
arr.get_values_size()
183
})
184
.sum();
185
186
let mut bytes = Vec::with_capacity(total_bytes);
187
let mut offsets = Offsets::<O>::with_capacity(total_len);
188
189
for arr in arrays {
190
let arr: &Utf8Array<O> = arr.as_ref().as_any().downcast_ref().unwrap();
191
let first_offset = arr.offsets().first().to_usize();
192
let last_offset = arr.offsets().last().to_usize();
193
bytes.extend_from_slice(&arr.values()[first_offset..last_offset]);
194
for len in arr.offsets().lengths() {
195
offsets.try_push(len)?;
196
}
197
}
198
199
Ok(unsafe { Utf8Array::new_unchecked(dtype, offsets.into(), bytes.into(), validity) })
200
}
201
202
fn concatenate_view<V: ViewType + ?Sized, A: AsRef<dyn Array>>(
203
arrays: &[A],
204
) -> BinaryViewArrayGeneric<V> {
205
let dtype = arrays[0].as_ref().dtype().clone();
206
let (total_len, null_count) = len_null_count(arrays);
207
if total_len == 0 {
208
return BinaryViewArrayGeneric::new_empty(dtype);
209
}
210
let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
211
212
let first_arr: &BinaryViewArrayGeneric<V> = arrays[0].as_ref().as_any().downcast_ref().unwrap();
213
let mut total_nondedup_buffers = first_arr.data_buffers().len();
214
let mut max_arr_bufferset_len = 0;
215
let mut all_same_bufs = true;
216
for arr in arrays {
217
let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();
218
max_arr_bufferset_len = max_arr_bufferset_len.max(arr.data_buffers().len());
219
total_nondedup_buffers += arr.data_buffers().len();
220
// Fat pointer equality, checks both start and length.
221
all_same_bufs &= std::ptr::eq(
222
Arc::as_ptr(arr.data_buffers()),
223
Arc::as_ptr(first_arr.data_buffers()),
224
);
225
}
226
227
let mut total_bytes_len = 0;
228
let mut views = Vec::with_capacity(total_len);
229
230
let mut total_buffer_len = 0;
231
let buffers = if all_same_bufs {
232
total_buffer_len = first_arr.total_buffer_len();
233
for arr in arrays {
234
let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();
235
views.extend_from_slice(arr.views());
236
total_bytes_len += arr.total_bytes_len();
237
}
238
Arc::clone(first_arr.data_buffers())
239
240
// There might be way more buffers than elements, so we only dedup if there
241
// is at least one element per buffer on average.
242
} else if total_len > total_nondedup_buffers {
243
assert!(arrays.len() < u32::MAX as usize);
244
245
let mut dedup_buffers = Vec::with_capacity(total_nondedup_buffers);
246
let mut global_dedup_buffer_idx = PlHashMap::with_capacity(total_nondedup_buffers);
247
let mut local_dedup_buffer_idx = Vec::new();
248
local_dedup_buffer_idx.resize(max_arr_bufferset_len, (0, u32::MAX));
249
250
for (arr_idx, arr) in arrays.iter().enumerate() {
251
let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();
252
253
unsafe {
254
for mut view in arr.views().iter().copied() {
255
if view.length > View::MAX_INLINE_SIZE {
256
// Translate from old array-local buffer idx to global deduped buffer idx.
257
let (mut new_buffer_idx, cache_tag) =
258
*local_dedup_buffer_idx.get_unchecked(view.buffer_idx as usize);
259
if cache_tag != arr_idx as u32 {
260
// This buffer index wasn't seen before for this array, do a dedup lookup.
261
let buffer = arr.data_buffers().get_unchecked(view.buffer_idx as usize);
262
let buf_id = (buffer.as_slice().as_ptr(), buffer.len());
263
let idx = match global_dedup_buffer_idx.entry(buf_id) {
264
Entry::Occupied(o) => *o.get(),
265
Entry::Vacant(v) => {
266
let idx = dedup_buffers.len() as u32;
267
dedup_buffers.push(buffer.clone());
268
total_buffer_len += buffer.len();
269
v.insert(idx);
270
idx
271
},
272
};
273
274
// Cache result for future lookups.
275
*local_dedup_buffer_idx.get_unchecked_mut(view.buffer_idx as usize) =
276
(idx, arr_idx as u32);
277
new_buffer_idx = idx;
278
}
279
view.buffer_idx = new_buffer_idx;
280
}
281
282
total_bytes_len += view.length as usize;
283
views.push_unchecked(view);
284
}
285
}
286
}
287
288
dedup_buffers.into_iter().collect()
289
} else {
290
// Only very few of the total number of buffers is referenced, simply
291
// create a new direct buffer.
292
for arr in arrays {
293
let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();
294
total_buffer_len += arr
295
.len_iter()
296
.map(|l| if l > 12 { l as usize } else { 0 })
297
.sum::<usize>();
298
}
299
300
let mut unprocessed_buffer_len = total_buffer_len;
301
let mut new_buffers: Vec<Vec<u8>> = vec![Vec::with_capacity(
302
unprocessed_buffer_len.min(u32::MAX as usize),
303
)];
304
for arr in arrays {
305
let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();
306
let buffers = arr.data_buffers();
307
308
unsafe {
309
for mut view in arr.views().iter().copied() {
310
total_bytes_len += view.length as usize;
311
if view.length > 12 {
312
if new_buffers.last().unwrap_unchecked().len() + view.length as usize
313
>= u32::MAX as usize
314
{
315
new_buffers.push(Vec::with_capacity(
316
unprocessed_buffer_len.min(u32::MAX as usize),
317
));
318
}
319
let new_offset = new_buffers.last().unwrap_unchecked().len() as u32;
320
new_buffers
321
.last_mut()
322
.unwrap_unchecked()
323
.extend_from_slice(view.get_slice_unchecked(buffers));
324
view.offset = new_offset;
325
view.buffer_idx = new_buffers.len() as u32 - 1;
326
unprocessed_buffer_len -= view.length as usize;
327
}
328
views.push_unchecked(view);
329
}
330
}
331
}
332
333
new_buffers.into_iter().map(Buffer::from).collect()
334
};
335
336
unsafe {
337
BinaryViewArrayGeneric::new_unchecked(
338
dtype,
339
views.into(),
340
buffers,
341
validity,
342
total_bytes_len,
343
total_buffer_len,
344
)
345
}
346
}
347
348
fn concatenate_list<O: Offset, A: AsRef<dyn Array>>(arrays: &[A]) -> PolarsResult<ListArray<O>> {
349
let dtype = arrays[0].as_ref().dtype().clone();
350
let (total_len, null_count) = len_null_count(arrays);
351
let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
352
353
let mut num_sliced = 0;
354
let mut offsets = Offsets::<O>::with_capacity(total_len);
355
for arr in arrays {
356
let arr: &ListArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();
357
for len in arr.offsets().lengths() {
358
offsets.try_push(len)?;
359
}
360
let first_offset = arr.offsets().first().to_usize();
361
let offset_range = arr.offsets().range().to_usize();
362
num_sliced += (first_offset != 0 || offset_range != arr.values().len()) as usize;
363
}
364
365
let values = if num_sliced > 0 {
366
let inner_sliced_arrays = arrays
367
.iter()
368
.map(|arr| {
369
let arr: &ListArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();
370
let first_offset = arr.offsets().first().to_usize();
371
let offset_range = arr.offsets().range().to_usize();
372
if first_offset != 0 || offset_range != arr.values().len() {
373
arr.values().sliced(first_offset, offset_range)
374
} else {
375
arr.values().to_boxed()
376
}
377
})
378
.collect_vec();
379
concatenate_unchecked(&inner_sliced_arrays[..])?
380
} else {
381
let inner_arrays = arrays
382
.iter()
383
.map(|arr| {
384
let arr: &ListArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();
385
&**arr.values()
386
})
387
.collect_vec();
388
concatenate_unchecked(&inner_arrays)?
389
};
390
391
Ok(ListArray::new(dtype, offsets.into(), values, validity))
392
}
393
394
fn concatenate_fixed_size_binary<A: AsRef<dyn Array>>(
395
arrays: &[A],
396
) -> PolarsResult<FixedSizeBinaryArray> {
397
let dtype = arrays[0].as_ref().dtype().clone();
398
let (total_len, null_count) = len_null_count(arrays);
399
let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
400
401
let total_bytes = arrays
402
.iter()
403
.map(|arr| {
404
let arr: &FixedSizeBinaryArray = arr.as_ref().as_any().downcast_ref().unwrap();
405
arr.values().len()
406
})
407
.sum();
408
409
let mut bytes = Vec::with_capacity(total_bytes);
410
for arr in arrays {
411
let arr: &FixedSizeBinaryArray = arr.as_ref().as_any().downcast_ref().unwrap();
412
bytes.extend_from_slice(arr.values());
413
}
414
415
Ok(FixedSizeBinaryArray::new(dtype, bytes.into(), validity))
416
}
417
418
fn concatenate_fixed_size_list<A: AsRef<dyn Array>>(
419
arrays: &[A],
420
) -> PolarsResult<FixedSizeListArray> {
421
let dtype = arrays[0].as_ref().dtype().clone();
422
let (total_len, null_count) = len_null_count(arrays);
423
424
let inner_arrays = arrays
425
.iter()
426
.map(|arr| {
427
let arr: &FixedSizeListArray = arr.as_ref().as_any().downcast_ref().unwrap();
428
&**arr.values()
429
})
430
.collect_vec();
431
let values = concatenate_unchecked(&inner_arrays)?;
432
let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
433
Ok(FixedSizeListArray::new(dtype, total_len, values, validity))
434
}
435
436
fn concatenate_struct<A: AsRef<dyn Array>>(arrays: &[A]) -> PolarsResult<StructArray> {
437
let dtype = arrays[0].as_ref().dtype().clone();
438
let (total_len, null_count) = len_null_count(arrays);
439
let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);
440
441
let first_arr: &StructArray = arrays[0].as_ref().as_any().downcast_ref().unwrap();
442
let num_fields = first_arr.values().len();
443
444
let mut inner_arrays = Vec::with_capacity(arrays.len());
445
let values = (0..num_fields)
446
.map(|f| {
447
inner_arrays.clear();
448
for arr in arrays {
449
let arr: &StructArray = arr.as_ref().as_any().downcast_ref().unwrap();
450
inner_arrays.push(&arr.values()[f]);
451
}
452
concatenate_unchecked(&inner_arrays)
453
})
454
.try_collect_vec()?;
455
456
Ok(StructArray::new(dtype, total_len, values, validity))
457
}
458
459