Path: blob/main/crates/polars-arrow/src/array/binview/builder.rs
6939 views
use std::marker::PhantomData;1use std::sync::{Arc, LazyLock};23use hashbrown::hash_map::Entry;4use polars_utils::IdxSize;5use polars_utils::aliases::{InitHashMaps, PlHashMap};67use crate::array::binview::{DEFAULT_BLOCK_SIZE, MAX_EXP_BLOCK_SIZE};8use crate::array::builder::{ShareStrategy, StaticArrayBuilder};9use crate::array::{Array, BinaryViewArrayGeneric, View, ViewType};10use crate::bitmap::OptBitmapBuilder;11use crate::buffer::Buffer;12use crate::datatypes::ArrowDataType;13use crate::pushable::Pushable;1415static PLACEHOLDER_BUFFER: LazyLock<Buffer<u8>> = LazyLock::new(|| Buffer::from_static(&[]));1617pub struct BinaryViewArrayGenericBuilder<V: ViewType + ?Sized> {18dtype: ArrowDataType,19views: Vec<View>,20active_buffer: Vec<u8>,21active_buffer_idx: u32,22buffer_set: Vec<Buffer<u8>>,23stolen_buffers: PlHashMap<usize, u32>,2425// With these we can amortize buffer set translation costs if repeatedly26// stealing from the same set of buffers.27last_buffer_set_stolen_from: Option<Arc<[Buffer<u8>]>>,28buffer_set_translation_idxs: Vec<(u32, u32)>, // (idx, generation)29buffer_set_translation_generation: u32,3031validity: OptBitmapBuilder,32/// Total bytes length if we would concatenate them all.33total_bytes_len: usize,34/// Total bytes in the buffer set (excluding remaining capacity).35total_buffer_len: usize,36view_type: PhantomData<V>,37}3839impl<V: ViewType + ?Sized> BinaryViewArrayGenericBuilder<V> {40pub fn new(dtype: ArrowDataType) -> Self {41Self {42dtype,43views: Vec::new(),44active_buffer: Vec::new(),45active_buffer_idx: 0,46buffer_set: Vec::new(),47stolen_buffers: PlHashMap::new(),48last_buffer_set_stolen_from: None,49buffer_set_translation_idxs: Vec::new(),50buffer_set_translation_generation: 0,51validity: OptBitmapBuilder::default(),52total_bytes_len: 0,53total_buffer_len: 0,54view_type: PhantomData,55}56}5758#[inline]59fn reserve_active_buffer(&mut self, additional: usize) {60let len = self.active_buffer.len();61let cap = self.active_buffer.capacity();62if additional > cap - len || len + additional >= (u32::MAX - 1) as usize {63self.reserve_active_buffer_slow(additional);64}65}6667#[cold]68fn reserve_active_buffer_slow(&mut self, additional: usize) {69assert!(70additional <= (u32::MAX - 1) as usize,71"strings longer than 2^32 - 2 are not supported"72);7374// Allocate a new buffer and flush the old buffer.75let new_capacity = (self.active_buffer.capacity() * 2)76.clamp(DEFAULT_BLOCK_SIZE, MAX_EXP_BLOCK_SIZE)77.max(additional);7879let old_buffer =80core::mem::replace(&mut self.active_buffer, Vec::with_capacity(new_capacity));81if !old_buffer.is_empty() {82// Replace dummy with real buffer.83self.buffer_set[self.active_buffer_idx as usize] = Buffer::from(old_buffer);84}85self.active_buffer_idx = self.buffer_set.len().try_into().unwrap();86self.buffer_set.push(PLACEHOLDER_BUFFER.clone()) // Push placeholder so active_buffer_idx stays valid.87}8889pub fn push_value_ignore_validity(&mut self, bytes: &V) {90let bytes = bytes.to_bytes();91self.total_bytes_len += bytes.len();92unsafe {93let view = if bytes.len() > View::MAX_INLINE_SIZE as usize {94self.reserve_active_buffer(bytes.len());9596let offset = self.active_buffer.len() as u32; // Ensured no overflow by reserve_active_buffer.97self.active_buffer.extend_from_slice(bytes);98self.total_buffer_len += bytes.len();99View::new_noninline_unchecked(bytes, self.active_buffer_idx, offset)100} else {101View::new_inline_unchecked(bytes)102};103self.views.push(view);104}105}106107/// # Safety108/// The view must be inline.109pub unsafe fn push_inline_view_ignore_validity(&mut self, view: View) {110debug_assert!(view.is_inline());111self.total_bytes_len += view.length as usize;112self.views.push(view);113}114115fn switch_active_stealing_bufferset_to(&mut self, buffer_set: &Arc<[Buffer<u8>]>) {116// Fat pointer equality, checks both start and length.117if self118.last_buffer_set_stolen_from119.as_ref()120.is_some_and(|stolen_bs| std::ptr::eq(Arc::as_ptr(stolen_bs), Arc::as_ptr(buffer_set)))121{122return; // Already active.123}124125// Switch to new generation (invalidating all old translation indices),126// and resizing the buffer with invalid indices if necessary.127let old_gen = self.buffer_set_translation_generation;128self.buffer_set_translation_generation = old_gen.wrapping_add(1);129if self.buffer_set_translation_idxs.len() < buffer_set.len() {130self.buffer_set_translation_idxs131.resize(buffer_set.len(), (0, old_gen));132}133}134135unsafe fn translate_view(136&mut self,137mut view: View,138other_bufferset: &Arc<[Buffer<u8>]>,139) -> View {140// Translate from old array-local buffer idx to global stolen buffer idx.141let (mut new_buffer_idx, gen_) = *self142.buffer_set_translation_idxs143.get_unchecked(view.buffer_idx as usize);144if gen_ != self.buffer_set_translation_generation {145// This buffer index wasn't seen before for this array, do a dedup lookup.146// Since we map by starting pointer and different subslices may have different lengths, we expand147// the buffer to the maximum it could be.148let buffer = other_bufferset149.get_unchecked(view.buffer_idx as usize)150.clone()151.expand_end_to_storage();152let buf_id = buffer.as_slice().as_ptr().addr();153let idx = match self.stolen_buffers.entry(buf_id) {154Entry::Occupied(o) => *o.get(),155Entry::Vacant(v) => {156let idx = self.buffer_set.len() as u32;157self.total_buffer_len += buffer.len();158self.buffer_set.push(buffer);159v.insert(idx);160idx161},162};163164// Cache result for future lookups.165*self166.buffer_set_translation_idxs167.get_unchecked_mut(view.buffer_idx as usize) =168(idx, self.buffer_set_translation_generation);169new_buffer_idx = idx;170}171view.buffer_idx = new_buffer_idx;172view173}174175unsafe fn extend_views_dedup_ignore_validity(176&mut self,177views: impl IntoIterator<Item = View>,178other_bufferset: &Arc<[Buffer<u8>]>,179) {180// TODO: if there are way more buffers than length translate per-view181// rather than all at once.182self.switch_active_stealing_bufferset_to(other_bufferset);183184for mut view in views {185if view.length > View::MAX_INLINE_SIZE {186view = self.translate_view(view, other_bufferset);187}188self.total_bytes_len += view.length as usize;189self.views.push(view);190}191}192193unsafe fn extend_views_each_repeated_dedup_ignore_validity(194&mut self,195views: impl IntoIterator<Item = View>,196repeats: usize,197other_bufferset: &Arc<[Buffer<u8>]>,198) {199// TODO: if there are way more buffers than length translate per-view200// rather than all at once.201self.switch_active_stealing_bufferset_to(other_bufferset);202203for mut view in views {204if view.length > View::MAX_INLINE_SIZE {205view = self.translate_view(view, other_bufferset);206}207self.total_bytes_len += repeats * view.length as usize;208for _ in 0..repeats {209self.views.push(view);210}211}212}213}214215impl<V: ViewType + ?Sized> StaticArrayBuilder for BinaryViewArrayGenericBuilder<V> {216type Array = BinaryViewArrayGeneric<V>;217218fn dtype(&self) -> &ArrowDataType {219&self.dtype220}221222fn reserve(&mut self, additional: usize) {223self.views.reserve(additional);224self.validity.reserve(additional);225}226227fn freeze(mut self) -> Self::Array {228// Flush active buffer and/or remove extra placeholder buffer.229if !self.active_buffer.is_empty() {230self.buffer_set[self.active_buffer_idx as usize] = Buffer::from(self.active_buffer);231} else if self.buffer_set.last().is_some_and(|b| b.is_empty()) {232self.buffer_set.pop();233}234235unsafe {236BinaryViewArrayGeneric::new_unchecked(237self.dtype,238Buffer::from(self.views),239Arc::from(self.buffer_set),240self.validity.into_opt_validity(),241self.total_bytes_len,242self.total_buffer_len,243)244}245}246247fn freeze_reset(&mut self) -> Self::Array {248// Flush active buffer and/or remove extra placeholder buffer.249if !self.active_buffer.is_empty() {250self.buffer_set[self.active_buffer_idx as usize] =251Buffer::from(core::mem::take(&mut self.active_buffer));252} else if self.buffer_set.last().is_some_and(|b| b.is_empty()) {253self.buffer_set.pop();254}255256let out = unsafe {257BinaryViewArrayGeneric::new_unchecked(258self.dtype.clone(),259Buffer::from(core::mem::take(&mut self.views)),260Arc::from(core::mem::take(&mut self.buffer_set)),261core::mem::take(&mut self.validity).into_opt_validity(),262self.total_bytes_len,263self.total_buffer_len,264)265};266267self.total_buffer_len = 0;268self.total_bytes_len = 0;269self.active_buffer_idx = 0;270self.stolen_buffers.clear();271self.last_buffer_set_stolen_from = None;272out273}274275fn len(&self) -> usize {276self.views.len()277}278279fn extend_nulls(&mut self, length: usize) {280self.views.extend_constant(length, View::default());281self.validity.extend_constant(length, false);282}283284fn subslice_extend(285&mut self,286other: &Self::Array,287start: usize,288length: usize,289share: ShareStrategy,290) {291self.views.reserve(length);292293unsafe {294match share {295ShareStrategy::Never => {296if let Some(v) = other.validity() {297for i in start..start + length {298if v.get_bit_unchecked(i) {299self.push_value_ignore_validity(other.value_unchecked(i));300} else {301self.views.push(View::default())302}303}304} else {305for i in start..start + length {306self.push_value_ignore_validity(other.value_unchecked(i));307}308}309},310ShareStrategy::Always => {311let other_views = &other.views()[start..start + length];312self.extend_views_dedup_ignore_validity(313other_views.iter().copied(),314other.data_buffers(),315);316},317}318}319320self.validity321.subslice_extend_from_opt_validity(other.validity(), start, length);322}323324fn subslice_extend_each_repeated(325&mut self,326other: &Self::Array,327start: usize,328length: usize,329repeats: usize,330share: ShareStrategy,331) {332self.views.reserve(length * repeats);333334unsafe {335match share {336ShareStrategy::Never => {337if let Some(v) = other.validity() {338for i in start..start + length {339if v.get_bit_unchecked(i) {340for _ in 0..repeats {341self.push_value_ignore_validity(other.value_unchecked(i));342}343} else {344for _ in 0..repeats {345self.views.push(View::default())346}347}348}349} else {350for i in start..start + length {351for _ in 0..repeats {352self.push_value_ignore_validity(other.value_unchecked(i));353}354}355}356},357ShareStrategy::Always => {358let other_views = &other.views()[start..start + length];359self.extend_views_each_repeated_dedup_ignore_validity(360other_views.iter().copied(),361repeats,362other.data_buffers(),363);364},365}366}367368self.validity369.subslice_extend_each_repeated_from_opt_validity(370other.validity(),371start,372length,373repeats,374);375}376377unsafe fn gather_extend(378&mut self,379other: &Self::Array,380idxs: &[IdxSize],381share: ShareStrategy,382) {383self.views.reserve(idxs.len());384385unsafe {386match share {387ShareStrategy::Never => {388if let Some(v) = other.validity() {389for idx in idxs {390if v.get_bit_unchecked(*idx as usize) {391self.push_value_ignore_validity(392other.value_unchecked(*idx as usize),393);394} else {395self.views.push(View::default())396}397}398} else {399for idx in idxs {400self.push_value_ignore_validity(other.value_unchecked(*idx as usize));401}402}403},404ShareStrategy::Always => {405let other_view_slice = other.views().as_slice();406let other_views = idxs407.iter()408.map(|idx| *other_view_slice.get_unchecked(*idx as usize));409self.extend_views_dedup_ignore_validity(other_views, other.data_buffers());410},411}412}413414self.validity415.gather_extend_from_opt_validity(other.validity(), idxs);416}417418fn opt_gather_extend(&mut self, other: &Self::Array, idxs: &[IdxSize], share: ShareStrategy) {419self.views.reserve(idxs.len());420421unsafe {422match share {423ShareStrategy::Never => {424if let Some(v) = other.validity() {425for idx in idxs {426if (*idx as usize) < v.len() && v.get_bit_unchecked(*idx as usize) {427self.push_value_ignore_validity(428other.value_unchecked(*idx as usize),429);430} else {431self.views.push(View::default())432}433}434} else {435for idx in idxs {436if (*idx as usize) < other.len() {437self.push_value_ignore_validity(438other.value_unchecked(*idx as usize),439);440} else {441self.views.push(View::default())442}443}444}445},446ShareStrategy::Always => {447let other_view_slice = other.views().as_slice();448let other_views = idxs.iter().map(|idx| {449other_view_slice450.get(*idx as usize)451.copied()452.unwrap_or_default()453});454self.extend_views_dedup_ignore_validity(other_views, other.data_buffers());455},456}457}458459self.validity460.opt_gather_extend_from_opt_validity(other.validity(), idxs, other.len());461}462}463464465