Path: blob/main/crates/polars-arrow/src/compute/concatenate.rs
6939 views
use std::sync::Arc;12use hashbrown::hash_map::Entry;3use polars_error::{PolarsResult, polars_bail};4use polars_utils::aliases::{InitHashMaps, PlHashMap};5use polars_utils::itertools::Itertools;6use polars_utils::vec::PushUnchecked;78use crate::array::*;9use crate::bitmap::{Bitmap, BitmapBuilder};10use crate::buffer::Buffer;11use crate::datatypes::PhysicalType;12use crate::offset::Offsets;13use crate::types::{NativeType, Offset};14use crate::with_match_primitive_type_full;1516/// Concatenate multiple [`Array`] of the same type into a single [`Array`].17pub fn concatenate(arrays: &[&dyn Array]) -> PolarsResult<Box<dyn Array>> {18if arrays.is_empty() {19polars_bail!(InvalidOperation: "concat requires input of at least one array")20}2122if arrays23.iter()24.any(|array| array.dtype() != arrays[0].dtype())25{26polars_bail!(InvalidOperation: "It is not possible to concatenate arrays of different data types.")27}2829concatenate_unchecked(arrays)30}3132fn len_null_count<A: AsRef<dyn Array>>(arrays: &[A]) -> (usize, usize) {33let mut len = 0;34let mut null_count = 0;35for arr in arrays {36let arr = arr.as_ref();37len += arr.len();38null_count += arr.null_count();39}40(len, null_count)41}4243/// Concatenate the validities of multiple [Array]s into a single Bitmap.44pub fn concatenate_validities<A: AsRef<dyn Array>>(arrays: &[A]) -> Option<Bitmap> {45let (len, null_count) = len_null_count(arrays);46concatenate_validities_with_len_null_count(arrays, len, null_count)47}4849fn concatenate_validities_with_len_null_count<A: AsRef<dyn Array>>(50arrays: &[A],51len: usize,52null_count: usize,53) -> Option<Bitmap> {54if null_count == 0 {55return None;56}5758let mut bitmap = BitmapBuilder::with_capacity(len);59for arr in arrays {60let arr = arr.as_ref();61if arr.null_count() == arr.len() {62bitmap.extend_constant(arr.len(), false);63} else if arr.null_count() == 0 {64bitmap.extend_constant(arr.len(), true);65} else {66bitmap.extend_from_bitmap(arr.validity().unwrap());67}68}69bitmap.into_opt_validity()70}7172/// Concatenate multiple [`Array`] of the same type into a single [`Array`].73/// All arrays must be of the same dtype or a panic can occur.74pub fn concatenate_unchecked<A: AsRef<dyn Array>>(arrays: &[A]) -> PolarsResult<Box<dyn Array>> {75if arrays.is_empty() {76polars_bail!(InvalidOperation: "concat requires input of at least one array")77}7879if arrays.len() == 1 {80return Ok(arrays[0].as_ref().to_boxed());81}8283use PhysicalType::*;84match arrays[0].as_ref().dtype().to_physical_type() {85Null => Ok(Box::new(concatenate_null(arrays))),86Boolean => Ok(Box::new(concatenate_bool(arrays))),87Primitive(ptype) => {88with_match_primitive_type_full!(ptype, |$T| {89Ok(Box::new(concatenate_primitive::<$T, _>(arrays)))90})91},92Binary => Ok(Box::new(concatenate_binary::<i32, _>(arrays)?)),93LargeBinary => Ok(Box::new(concatenate_binary::<i64, _>(arrays)?)),94Utf8 => Ok(Box::new(concatenate_utf8::<i32, _>(arrays)?)),95LargeUtf8 => Ok(Box::new(concatenate_utf8::<i64, _>(arrays)?)),96BinaryView => Ok(Box::new(concatenate_view::<[u8], _>(arrays))),97Utf8View => Ok(Box::new(concatenate_view::<str, _>(arrays))),98List => Ok(Box::new(concatenate_list::<i32, _>(arrays)?)),99LargeList => Ok(Box::new(concatenate_list::<i64, _>(arrays)?)),100FixedSizeBinary => Ok(Box::new(concatenate_fixed_size_binary(arrays)?)),101FixedSizeList => Ok(Box::new(concatenate_fixed_size_list(arrays)?)),102Struct => Ok(Box::new(concatenate_struct(arrays)?)),103Union => unimplemented!(),104Map => unimplemented!(),105Dictionary(_) => unimplemented!(),106}107}108109fn concatenate_null<A: AsRef<dyn Array>>(arrays: &[A]) -> NullArray {110let dtype = arrays[0].as_ref().dtype().clone();111let total_len = arrays.iter().map(|arr| arr.as_ref().len()).sum();112NullArray::new(dtype, total_len)113}114115fn concatenate_bool<A: AsRef<dyn Array>>(arrays: &[A]) -> BooleanArray {116let dtype = arrays[0].as_ref().dtype().clone();117let (total_len, null_count) = len_null_count(arrays);118let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);119120let mut bitmap = BitmapBuilder::with_capacity(total_len);121for arr in arrays {122let arr: &BooleanArray = arr.as_ref().as_any().downcast_ref().unwrap();123bitmap.extend_from_bitmap(arr.values());124}125BooleanArray::new(dtype, bitmap.freeze(), validity)126}127128fn concatenate_primitive<T: NativeType, A: AsRef<dyn Array>>(arrays: &[A]) -> PrimitiveArray<T> {129let dtype = arrays[0].as_ref().dtype().clone();130let (total_len, null_count) = len_null_count(arrays);131let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);132133let mut out = Vec::with_capacity(total_len);134for arr in arrays {135let arr: &PrimitiveArray<T> = arr.as_ref().as_any().downcast_ref().unwrap();136out.extend_from_slice(arr.values());137}138unsafe { PrimitiveArray::new_unchecked(dtype, Buffer::from(out), validity) }139}140141fn concatenate_binary<O: Offset, A: AsRef<dyn Array>>(142arrays: &[A],143) -> PolarsResult<BinaryArray<O>> {144let dtype = arrays[0].as_ref().dtype().clone();145let (total_len, null_count) = len_null_count(arrays);146let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);147148let total_bytes = arrays149.iter()150.map(|arr| {151let arr: &BinaryArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();152arr.get_values_size()153})154.sum();155156let mut values = Vec::with_capacity(total_bytes);157let mut offsets = Offsets::<O>::with_capacity(total_len);158159for arr in arrays {160let arr: &BinaryArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();161let first_offset = arr.offsets().first().to_usize();162let last_offset = arr.offsets().last().to_usize();163values.extend_from_slice(&arr.values()[first_offset..last_offset]);164for len in arr.offsets().lengths() {165offsets.try_push(len)?;166}167}168169Ok(unsafe { BinaryArray::new(dtype, offsets.into(), values.into(), validity) })170}171172fn concatenate_utf8<O: Offset, A: AsRef<dyn Array>>(arrays: &[A]) -> PolarsResult<Utf8Array<O>> {173let dtype = arrays[0].as_ref().dtype().clone();174let (total_len, null_count) = len_null_count(arrays);175let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);176177let total_bytes = arrays178.iter()179.map(|arr| {180let arr: &Utf8Array<O> = arr.as_ref().as_any().downcast_ref().unwrap();181arr.get_values_size()182})183.sum();184185let mut bytes = Vec::with_capacity(total_bytes);186let mut offsets = Offsets::<O>::with_capacity(total_len);187188for arr in arrays {189let arr: &Utf8Array<O> = arr.as_ref().as_any().downcast_ref().unwrap();190let first_offset = arr.offsets().first().to_usize();191let last_offset = arr.offsets().last().to_usize();192bytes.extend_from_slice(&arr.values()[first_offset..last_offset]);193for len in arr.offsets().lengths() {194offsets.try_push(len)?;195}196}197198Ok(unsafe { Utf8Array::new_unchecked(dtype, offsets.into(), bytes.into(), validity) })199}200201fn concatenate_view<V: ViewType + ?Sized, A: AsRef<dyn Array>>(202arrays: &[A],203) -> BinaryViewArrayGeneric<V> {204let dtype = arrays[0].as_ref().dtype().clone();205let (total_len, null_count) = len_null_count(arrays);206if total_len == 0 {207return BinaryViewArrayGeneric::new_empty(dtype);208}209let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);210211let first_arr: &BinaryViewArrayGeneric<V> = arrays[0].as_ref().as_any().downcast_ref().unwrap();212let mut total_nondedup_buffers = first_arr.data_buffers().len();213let mut max_arr_bufferset_len = 0;214let mut all_same_bufs = true;215for arr in arrays {216let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();217max_arr_bufferset_len = max_arr_bufferset_len.max(arr.data_buffers().len());218total_nondedup_buffers += arr.data_buffers().len();219// Fat pointer equality, checks both start and length.220all_same_bufs &= std::ptr::eq(221Arc::as_ptr(arr.data_buffers()),222Arc::as_ptr(first_arr.data_buffers()),223);224}225226let mut total_bytes_len = 0;227let mut views = Vec::with_capacity(total_len);228229let mut total_buffer_len = 0;230let buffers = if all_same_bufs {231total_buffer_len = first_arr.total_buffer_len();232for arr in arrays {233let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();234views.extend_from_slice(arr.views());235total_bytes_len += arr.total_bytes_len();236}237Arc::clone(first_arr.data_buffers())238239// There might be way more buffers than elements, so we only dedup if there240// is at least one element per buffer on average.241} else if total_len > total_nondedup_buffers {242assert!(arrays.len() < u32::MAX as usize);243244let mut dedup_buffers = Vec::with_capacity(total_nondedup_buffers);245let mut global_dedup_buffer_idx = PlHashMap::with_capacity(total_nondedup_buffers);246let mut local_dedup_buffer_idx = Vec::new();247local_dedup_buffer_idx.resize(max_arr_bufferset_len, (0, u32::MAX));248249for (arr_idx, arr) in arrays.iter().enumerate() {250let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();251252unsafe {253for mut view in arr.views().iter().copied() {254if view.length > View::MAX_INLINE_SIZE {255// Translate from old array-local buffer idx to global deduped buffer idx.256let (mut new_buffer_idx, cache_tag) =257*local_dedup_buffer_idx.get_unchecked(view.buffer_idx as usize);258if cache_tag != arr_idx as u32 {259// This buffer index wasn't seen before for this array, do a dedup lookup.260let buffer = arr.data_buffers().get_unchecked(view.buffer_idx as usize);261let buf_id = (buffer.as_slice().as_ptr(), buffer.len());262let idx = match global_dedup_buffer_idx.entry(buf_id) {263Entry::Occupied(o) => *o.get(),264Entry::Vacant(v) => {265let idx = dedup_buffers.len() as u32;266dedup_buffers.push(buffer.clone());267total_buffer_len += buffer.len();268v.insert(idx);269idx270},271};272273// Cache result for future lookups.274*local_dedup_buffer_idx.get_unchecked_mut(view.buffer_idx as usize) =275(idx, arr_idx as u32);276new_buffer_idx = idx;277}278view.buffer_idx = new_buffer_idx;279}280281total_bytes_len += view.length as usize;282views.push_unchecked(view);283}284}285}286287dedup_buffers.into_iter().collect()288} else {289// Only very few of the total number of buffers is referenced, simply290// create a new direct buffer.291for arr in arrays {292let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();293total_buffer_len += arr294.len_iter()295.map(|l| if l > 12 { l as usize } else { 0 })296.sum::<usize>();297}298299let mut unprocessed_buffer_len = total_buffer_len;300let mut new_buffers: Vec<Vec<u8>> = vec![Vec::with_capacity(301unprocessed_buffer_len.min(u32::MAX as usize),302)];303for arr in arrays {304let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();305let buffers = arr.data_buffers();306307unsafe {308for mut view in arr.views().iter().copied() {309total_bytes_len += view.length as usize;310if view.length > 12 {311if new_buffers.last().unwrap_unchecked().len() + view.length as usize312>= u32::MAX as usize313{314new_buffers.push(Vec::with_capacity(315unprocessed_buffer_len.min(u32::MAX as usize),316));317}318let new_offset = new_buffers.last().unwrap_unchecked().len() as u32;319new_buffers320.last_mut()321.unwrap_unchecked()322.extend_from_slice(view.get_slice_unchecked(buffers));323view.offset = new_offset;324view.buffer_idx = new_buffers.len() as u32 - 1;325unprocessed_buffer_len -= view.length as usize;326}327views.push_unchecked(view);328}329}330}331332new_buffers.into_iter().map(Buffer::from).collect()333};334335unsafe {336BinaryViewArrayGeneric::new_unchecked(337dtype,338views.into(),339buffers,340validity,341total_bytes_len,342total_buffer_len,343)344}345}346347fn concatenate_list<O: Offset, A: AsRef<dyn Array>>(arrays: &[A]) -> PolarsResult<ListArray<O>> {348let dtype = arrays[0].as_ref().dtype().clone();349let (total_len, null_count) = len_null_count(arrays);350let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);351352let mut num_sliced = 0;353let mut offsets = Offsets::<O>::with_capacity(total_len);354for arr in arrays {355let arr: &ListArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();356for len in arr.offsets().lengths() {357offsets.try_push(len)?;358}359let first_offset = arr.offsets().first().to_usize();360let offset_range = arr.offsets().range().to_usize();361num_sliced += (first_offset != 0 || offset_range != arr.values().len()) as usize;362}363364let values = if num_sliced > 0 {365let inner_sliced_arrays = arrays366.iter()367.map(|arr| {368let arr: &ListArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();369let first_offset = arr.offsets().first().to_usize();370let offset_range = arr.offsets().range().to_usize();371if first_offset != 0 || offset_range != arr.values().len() {372arr.values().sliced(first_offset, offset_range)373} else {374arr.values().to_boxed()375}376})377.collect_vec();378concatenate_unchecked(&inner_sliced_arrays[..])?379} else {380let inner_arrays = arrays381.iter()382.map(|arr| {383let arr: &ListArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();384&**arr.values()385})386.collect_vec();387concatenate_unchecked(&inner_arrays)?388};389390Ok(ListArray::new(dtype, offsets.into(), values, validity))391}392393fn concatenate_fixed_size_binary<A: AsRef<dyn Array>>(394arrays: &[A],395) -> PolarsResult<FixedSizeBinaryArray> {396let dtype = arrays[0].as_ref().dtype().clone();397let (total_len, null_count) = len_null_count(arrays);398let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);399400let total_bytes = arrays401.iter()402.map(|arr| {403let arr: &FixedSizeBinaryArray = arr.as_ref().as_any().downcast_ref().unwrap();404arr.values().len()405})406.sum();407408let mut bytes = Vec::with_capacity(total_bytes);409for arr in arrays {410let arr: &FixedSizeBinaryArray = arr.as_ref().as_any().downcast_ref().unwrap();411bytes.extend_from_slice(arr.values());412}413414Ok(FixedSizeBinaryArray::new(dtype, bytes.into(), validity))415}416417fn concatenate_fixed_size_list<A: AsRef<dyn Array>>(418arrays: &[A],419) -> PolarsResult<FixedSizeListArray> {420let dtype = arrays[0].as_ref().dtype().clone();421let (total_len, null_count) = len_null_count(arrays);422423let inner_arrays = arrays424.iter()425.map(|arr| {426let arr: &FixedSizeListArray = arr.as_ref().as_any().downcast_ref().unwrap();427&**arr.values()428})429.collect_vec();430let values = concatenate_unchecked(&inner_arrays)?;431let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);432Ok(FixedSizeListArray::new(dtype, total_len, values, validity))433}434435fn concatenate_struct<A: AsRef<dyn Array>>(arrays: &[A]) -> PolarsResult<StructArray> {436let dtype = arrays[0].as_ref().dtype().clone();437let (total_len, null_count) = len_null_count(arrays);438let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);439440let first_arr: &StructArray = arrays[0].as_ref().as_any().downcast_ref().unwrap();441let num_fields = first_arr.values().len();442443let mut inner_arrays = Vec::with_capacity(arrays.len());444let values = (0..num_fields)445.map(|f| {446inner_arrays.clear();447for arr in arrays {448let arr: &StructArray = arr.as_ref().as_any().downcast_ref().unwrap();449inner_arrays.push(&arr.values()[f]);450}451concatenate_unchecked(&inner_arrays)452})453.try_collect_vec()?;454455Ok(StructArray::new(dtype, total_len, values, validity))456}457458459