Path: blob/main/crates/polars-arrow/src/compute/concatenate.rs
8479 views
use hashbrown::hash_map::Entry;1use polars_buffer::Buffer;2use polars_error::{PolarsResult, polars_bail};3use polars_utils::aliases::{InitHashMaps, PlHashMap};4use polars_utils::itertools::Itertools;5use polars_utils::vec::PushUnchecked;67use crate::array::*;8use crate::bitmap::{Bitmap, BitmapBuilder};9use crate::datatypes::PhysicalType;10use crate::offset::Offsets;11use crate::types::{NativeType, Offset};12use crate::with_match_primitive_type_full;1314/// Concatenate multiple [`Array`] of the same type into a single [`Array`].15pub fn concatenate(arrays: &[&dyn Array]) -> PolarsResult<Box<dyn Array>> {16if arrays.is_empty() {17polars_bail!(InvalidOperation: "concat requires input of at least one array")18}1920if arrays21.iter()22.any(|array| array.dtype() != arrays[0].dtype())23{24polars_bail!(InvalidOperation: "It is not possible to concatenate arrays of different data types.")25}2627concatenate_unchecked(arrays)28}2930fn len_null_count<A: AsRef<dyn Array>>(arrays: &[A]) -> (usize, usize) {31let mut len = 0;32let mut null_count = 0;33for arr in arrays {34let arr = arr.as_ref();35len += arr.len();36null_count += arr.null_count();37}38(len, null_count)39}4041/// Concatenate the validities of multiple [Array]s into a single Bitmap.42pub fn concatenate_validities<A: AsRef<dyn Array>>(arrays: &[A]) -> Option<Bitmap> {43let (len, null_count) = len_null_count(arrays);44concatenate_validities_with_len_null_count(arrays, len, null_count)45}4647fn concatenate_validities_with_len_null_count<A: AsRef<dyn Array>>(48arrays: &[A],49len: usize,50null_count: usize,51) -> Option<Bitmap> {52if null_count == 0 {53return None;54}5556let mut bitmap = BitmapBuilder::with_capacity(len);57for arr in arrays {58let arr = arr.as_ref();59if arr.null_count() == arr.len() {60bitmap.extend_constant(arr.len(), false);61} else if arr.null_count() == 0 {62bitmap.extend_constant(arr.len(), true);63} else {64bitmap.extend_from_bitmap(arr.validity().unwrap());65}66}67bitmap.into_opt_validity()68}6970/// Concatenate multiple [`Array`] of the same type into a single [`Array`].71/// All arrays must be of the same dtype or a panic can occur.72pub fn concatenate_unchecked<A: AsRef<dyn Array>>(arrays: &[A]) -> PolarsResult<Box<dyn Array>> {73if arrays.is_empty() {74polars_bail!(InvalidOperation: "concat requires input of at least one array")75}7677if arrays.len() == 1 {78return Ok(arrays[0].as_ref().to_boxed());79}8081use PhysicalType::*;82match arrays[0].as_ref().dtype().to_physical_type() {83Null => Ok(Box::new(concatenate_null(arrays))),84Boolean => Ok(Box::new(concatenate_bool(arrays))),85Primitive(ptype) => {86with_match_primitive_type_full!(ptype, |$T| {87Ok(Box::new(concatenate_primitive::<$T, _>(arrays)))88})89},90Binary => Ok(Box::new(concatenate_binary::<i32, _>(arrays)?)),91LargeBinary => Ok(Box::new(concatenate_binary::<i64, _>(arrays)?)),92Utf8 => Ok(Box::new(concatenate_utf8::<i32, _>(arrays)?)),93LargeUtf8 => Ok(Box::new(concatenate_utf8::<i64, _>(arrays)?)),94BinaryView => Ok(Box::new(concatenate_view::<[u8], _>(arrays))),95Utf8View => Ok(Box::new(concatenate_view::<str, _>(arrays))),96List => Ok(Box::new(concatenate_list::<i32, _>(arrays)?)),97LargeList => Ok(Box::new(concatenate_list::<i64, _>(arrays)?)),98FixedSizeBinary => Ok(Box::new(concatenate_fixed_size_binary(arrays)?)),99FixedSizeList => Ok(Box::new(concatenate_fixed_size_list(arrays)?)),100Struct => Ok(Box::new(concatenate_struct(arrays)?)),101Union => unimplemented!(),102Map => unimplemented!(),103Dictionary(_) => unimplemented!(),104}105}106107fn concatenate_null<A: AsRef<dyn Array>>(arrays: &[A]) -> NullArray {108let dtype = arrays[0].as_ref().dtype().clone();109let total_len = arrays.iter().map(|arr| arr.as_ref().len()).sum();110NullArray::new(dtype, total_len)111}112113fn concatenate_bool<A: AsRef<dyn Array>>(arrays: &[A]) -> BooleanArray {114let dtype = arrays[0].as_ref().dtype().clone();115let (total_len, null_count) = len_null_count(arrays);116let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);117118let mut bitmap = BitmapBuilder::with_capacity(total_len);119for arr in arrays {120let arr: &BooleanArray = arr.as_ref().as_any().downcast_ref().unwrap();121bitmap.extend_from_bitmap(arr.values());122}123BooleanArray::new(dtype, bitmap.freeze(), validity)124}125126fn concatenate_primitive<T: NativeType, A: AsRef<dyn Array>>(arrays: &[A]) -> PrimitiveArray<T> {127let dtype = arrays[0].as_ref().dtype().clone();128let (total_len, null_count) = len_null_count(arrays);129let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);130131let mut out = Vec::with_capacity(total_len);132for arr in arrays {133let arr: &PrimitiveArray<T> = arr.as_ref().as_any().downcast_ref().unwrap();134out.extend_from_slice(arr.values());135}136unsafe { PrimitiveArray::new_unchecked(dtype, Buffer::from(out), validity) }137}138139fn concatenate_binary<O: Offset, A: AsRef<dyn Array>>(140arrays: &[A],141) -> PolarsResult<BinaryArray<O>> {142let dtype = arrays[0].as_ref().dtype().clone();143let (total_len, null_count) = len_null_count(arrays);144let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);145146let total_bytes = arrays147.iter()148.map(|arr| {149let arr: &BinaryArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();150arr.get_values_size()151})152.sum();153154let mut values = Vec::with_capacity(total_bytes);155let mut offsets = Offsets::<O>::with_capacity(total_len);156157for arr in arrays {158let arr: &BinaryArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();159let first_offset = arr.offsets().first().to_usize();160let last_offset = arr.offsets().last().to_usize();161values.extend_from_slice(&arr.values()[first_offset..last_offset]);162for len in arr.offsets().lengths() {163offsets.try_push(len)?;164}165}166167Ok(unsafe { BinaryArray::new(dtype, offsets.into(), values.into(), validity) })168}169170fn concatenate_utf8<O: Offset, A: AsRef<dyn Array>>(arrays: &[A]) -> PolarsResult<Utf8Array<O>> {171let dtype = arrays[0].as_ref().dtype().clone();172let (total_len, null_count) = len_null_count(arrays);173let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);174175let total_bytes = arrays176.iter()177.map(|arr| {178let arr: &Utf8Array<O> = arr.as_ref().as_any().downcast_ref().unwrap();179arr.get_values_size()180})181.sum();182183let mut bytes = Vec::with_capacity(total_bytes);184let mut offsets = Offsets::<O>::with_capacity(total_len);185186for arr in arrays {187let arr: &Utf8Array<O> = arr.as_ref().as_any().downcast_ref().unwrap();188let first_offset = arr.offsets().first().to_usize();189let last_offset = arr.offsets().last().to_usize();190bytes.extend_from_slice(&arr.values()[first_offset..last_offset]);191for len in arr.offsets().lengths() {192offsets.try_push(len)?;193}194}195196Ok(unsafe { Utf8Array::new_unchecked(dtype, offsets.into(), bytes.into(), validity) })197}198199fn concatenate_view<V: ViewType + ?Sized, A: AsRef<dyn Array>>(200arrays: &[A],201) -> BinaryViewArrayGeneric<V> {202let dtype = arrays[0].as_ref().dtype().clone();203let (total_len, null_count) = len_null_count(arrays);204if total_len == 0 {205return BinaryViewArrayGeneric::new_empty(dtype);206}207let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);208209let first_arr: &BinaryViewArrayGeneric<V> = arrays[0].as_ref().as_any().downcast_ref().unwrap();210let mut total_nondedup_buffers = first_arr.data_buffers().len();211let mut max_arr_bufferset_len = 0;212let mut all_same_bufs = true;213for arr in arrays {214let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();215max_arr_bufferset_len = max_arr_bufferset_len.max(arr.data_buffers().len());216total_nondedup_buffers += arr.data_buffers().len();217// Fat pointer equality, checks both start and length.218all_same_bufs &= Buffer::is_same_buffer(arr.data_buffers(), first_arr.data_buffers());219}220221let mut total_bytes_len = None;222let mut views = Vec::with_capacity(total_len);223224let mut total_buffer_len = 0;225let buffers = if all_same_bufs {226total_buffer_len = first_arr.total_buffer_len();227let mut bytes_len = 0;228for arr in arrays {229let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();230views.extend_from_slice(arr.views());231bytes_len += arr.total_bytes_len();232}233total_bytes_len = Some(bytes_len);234Buffer::clone(first_arr.data_buffers())235236// There might be way more buffers than elements, so we only dedup if there237// is at least one element per buffer on average.238} else if total_len > total_nondedup_buffers {239assert!(arrays.len() < u32::MAX as usize);240241let mut dedup_buffers = Vec::with_capacity(total_nondedup_buffers);242let mut global_dedup_buffer_idx = PlHashMap::with_capacity(total_nondedup_buffers);243let mut local_dedup_buffer_idx = Vec::new();244local_dedup_buffer_idx.resize(max_arr_bufferset_len, (0, u32::MAX));245246for (arr_idx, arr) in arrays.iter().enumerate() {247let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();248249unsafe {250for mut view in arr.views().iter().copied() {251if view.length > View::MAX_INLINE_SIZE {252// Translate from old array-local buffer idx to global deduped buffer idx.253let (mut new_buffer_idx, cache_tag) =254*local_dedup_buffer_idx.get_unchecked(view.buffer_idx as usize);255if cache_tag != arr_idx as u32 {256// This buffer index wasn't seen before for this array, do a dedup lookup.257let buffer = arr.data_buffers().get_unchecked(view.buffer_idx as usize);258let buf_id = (buffer.as_slice().as_ptr(), buffer.len());259let idx = match global_dedup_buffer_idx.entry(buf_id) {260Entry::Occupied(o) => *o.get(),261Entry::Vacant(v) => {262let idx = dedup_buffers.len() as u32;263dedup_buffers.push(buffer.clone());264total_buffer_len += buffer.len();265v.insert(idx);266idx267},268};269270// Cache result for future lookups.271*local_dedup_buffer_idx.get_unchecked_mut(view.buffer_idx as usize) =272(idx, arr_idx as u32);273new_buffer_idx = idx;274}275view.buffer_idx = new_buffer_idx;276}277278views.push_unchecked(view);279}280}281}282283dedup_buffers.into_iter().collect()284} else {285// Only very few of the total number of buffers is referenced, simply286// create a new direct buffer.287for arr in arrays {288let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();289total_buffer_len += arr290.len_iter()291.map(|l| if l > 12 { l as usize } else { 0 })292.sum::<usize>();293}294295let mut unprocessed_buffer_len = total_buffer_len;296let mut new_buffers: Vec<Vec<u8>> = vec![Vec::with_capacity(297unprocessed_buffer_len.min(u32::MAX as usize),298)];299for arr in arrays {300let arr: &BinaryViewArrayGeneric<V> = arr.as_ref().as_any().downcast_ref().unwrap();301let buffers = arr.data_buffers();302303unsafe {304for mut view in arr.views().iter().copied() {305if view.length > 12 {306if new_buffers.last().unwrap_unchecked().len() + view.length as usize307>= u32::MAX as usize308{309new_buffers.push(Vec::with_capacity(310unprocessed_buffer_len.min(u32::MAX as usize),311));312}313let new_offset = new_buffers.last().unwrap_unchecked().len() as u32;314new_buffers315.last_mut()316.unwrap_unchecked()317.extend_from_slice(view.get_slice_unchecked(buffers));318view.offset = new_offset;319view.buffer_idx = new_buffers.len() as u32 - 1;320unprocessed_buffer_len -= view.length as usize;321}322views.push_unchecked(view);323}324}325}326327new_buffers.into_iter().map(Buffer::from).collect()328};329330unsafe {331BinaryViewArrayGeneric::new_unchecked(332dtype,333views.into(),334buffers,335validity,336total_bytes_len,337total_buffer_len,338)339}340}341342fn concatenate_list<O: Offset, A: AsRef<dyn Array>>(arrays: &[A]) -> PolarsResult<ListArray<O>> {343let dtype = arrays[0].as_ref().dtype().clone();344let (total_len, null_count) = len_null_count(arrays);345let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);346347let mut num_sliced = 0;348let mut offsets = Offsets::<O>::with_capacity(total_len);349for arr in arrays {350let arr: &ListArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();351for len in arr.offsets().lengths() {352offsets.try_push(len)?;353}354let first_offset = arr.offsets().first().to_usize();355let offset_range = arr.offsets().range().to_usize();356num_sliced += (first_offset != 0 || offset_range != arr.values().len()) as usize;357}358359let values = if num_sliced > 0 {360let inner_sliced_arrays = arrays361.iter()362.map(|arr| {363let arr: &ListArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();364let first_offset = arr.offsets().first().to_usize();365let offset_range = arr.offsets().range().to_usize();366if first_offset != 0 || offset_range != arr.values().len() {367arr.values().sliced(first_offset, offset_range)368} else {369arr.values().to_boxed()370}371})372.collect_vec();373concatenate_unchecked(&inner_sliced_arrays[..])?374} else {375let inner_arrays = arrays376.iter()377.map(|arr| {378let arr: &ListArray<O> = arr.as_ref().as_any().downcast_ref().unwrap();379&**arr.values()380})381.collect_vec();382concatenate_unchecked(&inner_arrays)?383};384385Ok(ListArray::new(dtype, offsets.into(), values, validity))386}387388fn concatenate_fixed_size_binary<A: AsRef<dyn Array>>(389arrays: &[A],390) -> PolarsResult<FixedSizeBinaryArray> {391let dtype = arrays[0].as_ref().dtype().clone();392let (total_len, null_count) = len_null_count(arrays);393let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);394395let total_bytes = arrays396.iter()397.map(|arr| {398let arr: &FixedSizeBinaryArray = arr.as_ref().as_any().downcast_ref().unwrap();399arr.values().len()400})401.sum();402403let mut bytes = Vec::with_capacity(total_bytes);404for arr in arrays {405let arr: &FixedSizeBinaryArray = arr.as_ref().as_any().downcast_ref().unwrap();406bytes.extend_from_slice(arr.values());407}408409Ok(FixedSizeBinaryArray::new(dtype, bytes.into(), validity))410}411412fn concatenate_fixed_size_list<A: AsRef<dyn Array>>(413arrays: &[A],414) -> PolarsResult<FixedSizeListArray> {415let dtype = arrays[0].as_ref().dtype().clone();416let (total_len, null_count) = len_null_count(arrays);417418let inner_arrays = arrays419.iter()420.map(|arr| {421let arr: &FixedSizeListArray = arr.as_ref().as_any().downcast_ref().unwrap();422&**arr.values()423})424.collect_vec();425let values = concatenate_unchecked(&inner_arrays)?;426let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);427Ok(FixedSizeListArray::new(dtype, total_len, values, validity))428}429430fn concatenate_struct<A: AsRef<dyn Array>>(arrays: &[A]) -> PolarsResult<StructArray> {431let dtype = arrays[0].as_ref().dtype().clone();432let (total_len, null_count) = len_null_count(arrays);433let validity = concatenate_validities_with_len_null_count(arrays, total_len, null_count);434435let first_arr: &StructArray = arrays[0].as_ref().as_any().downcast_ref().unwrap();436let num_fields = first_arr.values().len();437438let mut inner_arrays = Vec::with_capacity(arrays.len());439let values = (0..num_fields)440.map(|f| {441inner_arrays.clear();442for arr in arrays {443let arr: &StructArray = arr.as_ref().as_any().downcast_ref().unwrap();444inner_arrays.push(&arr.values()[f]);445}446concatenate_unchecked(&inner_arrays)447})448.try_collect_vec()?;449450Ok(StructArray::new(dtype, total_len, values, validity))451}452453454