Path: blob/main/crates/polars-arrow/src/array/binview/builder.rs
8382 views
use std::marker::PhantomData;1use std::sync::LazyLock;23use hashbrown::hash_map::Entry;4use polars_buffer::Buffer;5use polars_utils::IdxSize;6use polars_utils::aliases::{InitHashMaps, PlHashMap};78use crate::array::binview::{DEFAULT_BLOCK_SIZE, MAX_EXP_BLOCK_SIZE};9use crate::array::builder::{ShareStrategy, StaticArrayBuilder};10use crate::array::{Array, BinaryViewArrayGeneric, View, ViewType};11use crate::bitmap::OptBitmapBuilder;12use crate::datatypes::ArrowDataType;13use crate::pushable::Pushable;1415static PLACEHOLDER_BUFFER: LazyLock<Buffer<u8>> = LazyLock::new(|| Buffer::from_static(&[]));1617pub struct BinaryViewArrayGenericBuilder<V: ViewType + ?Sized> {18dtype: ArrowDataType,19views: Vec<View>,20active_buffer: Vec<u8>,21active_buffer_idx: u32,22buffer_set: Vec<Buffer<u8>>,23stolen_buffers: PlHashMap<usize, u32>,2425// With these we can amortize buffer set translation costs if repeatedly26// stealing from the same set of buffers.27last_buffer_set_stolen_from: Option<Buffer<Buffer<u8>>>,28buffer_set_translation_idxs: Vec<(u32, u32)>, // (idx, generation)29buffer_set_translation_generation: u32,3031validity: OptBitmapBuilder,32/// Total bytes length if we would concatenate them all.33total_bytes_len: usize,34/// Total bytes in the buffer set (excluding remaining capacity).35total_buffer_len: usize,36view_type: PhantomData<V>,37}3839impl<V: ViewType + ?Sized> BinaryViewArrayGenericBuilder<V> {40pub const MAX_ROW_BYTE_LEN: usize = (u32::MAX - 1) as _;4142pub fn new(dtype: ArrowDataType) -> Self {43Self {44dtype,45views: Vec::new(),46active_buffer: Vec::new(),47active_buffer_idx: 0,48buffer_set: Vec::new(),49stolen_buffers: PlHashMap::new(),50last_buffer_set_stolen_from: None,51buffer_set_translation_idxs: Vec::new(),52buffer_set_translation_generation: 0,53validity: OptBitmapBuilder::default(),54total_bytes_len: 0,55total_buffer_len: 0,56view_type: PhantomData,57}58}5960#[inline]61fn reserve_active_buffer(&mut self, additional: usize) {62let len = self.active_buffer.len();63let cap = self.active_buffer.capacity();64if additional > cap - len || len + additional >= Self::MAX_ROW_BYTE_LEN {65self.reserve_active_buffer_slow(additional);66}67}6869#[cold]70fn reserve_active_buffer_slow(&mut self, additional: usize) {71assert!(72additional <= Self::MAX_ROW_BYTE_LEN,73"strings longer than 2^32 - 2 are not supported"74);7576// Allocate a new buffer and flush the old buffer.77let new_capacity = (self.active_buffer.capacity() * 2)78.clamp(DEFAULT_BLOCK_SIZE, MAX_EXP_BLOCK_SIZE)79.max(additional);8081let old_buffer =82core::mem::replace(&mut self.active_buffer, Vec::with_capacity(new_capacity));83if !old_buffer.is_empty() {84// Replace dummy with real buffer.85self.buffer_set[self.active_buffer_idx as usize] = Buffer::from(old_buffer);86}87self.active_buffer_idx = self.buffer_set.len().try_into().unwrap();88self.buffer_set.push(PLACEHOLDER_BUFFER.clone()) // Push placeholder so active_buffer_idx stays valid.89}9091pub fn push_value_ignore_validity(&mut self, bytes: &V) {92let bytes = bytes.to_bytes();93self.total_bytes_len += bytes.len();94unsafe {95let view = if bytes.len() > View::MAX_INLINE_SIZE as usize {96self.reserve_active_buffer(bytes.len());9798let offset = self.active_buffer.len() as u32; // Ensured no overflow by reserve_active_buffer.99self.active_buffer.extend_from_slice(bytes);100self.total_buffer_len += bytes.len();101View::new_noninline_unchecked(bytes, self.active_buffer_idx, offset)102} else {103View::new_inline_unchecked(bytes)104};105self.views.push(view);106}107}108109/// # Safety110/// The view must be inline.111pub unsafe fn push_inline_view_ignore_validity(&mut self, view: View) {112debug_assert!(view.is_inline());113self.total_bytes_len += view.length as usize;114self.views.push(view);115}116117fn switch_active_stealing_bufferset_to(&mut self, buffer_set: &Buffer<Buffer<u8>>) {118if self119.last_buffer_set_stolen_from120.as_ref()121.is_some_and(|stolen_bs| {122stolen_bs.as_ptr() == buffer_set.as_ptr() && stolen_bs.len() >= buffer_set.len()123})124{125return; // Already active.126}127128// Switch to new generation (invalidating all old translation indices),129// and resizing the buffer with invalid indices if necessary.130let old_gen = self.buffer_set_translation_generation;131self.buffer_set_translation_generation = old_gen.wrapping_add(1);132if self.buffer_set_translation_idxs.len() < buffer_set.len() {133self.buffer_set_translation_idxs134.resize(buffer_set.len(), (0, old_gen));135}136}137138unsafe fn translate_view(139&mut self,140mut view: View,141other_bufferset: &Buffer<Buffer<u8>>,142) -> View {143// Translate from old array-local buffer idx to global stolen buffer idx.144let (mut new_buffer_idx, gen_) = *self145.buffer_set_translation_idxs146.get_unchecked(view.buffer_idx as usize);147if gen_ != self.buffer_set_translation_generation {148// This buffer index wasn't seen before for this array, do a dedup lookup.149// Since we map by starting pointer and different subslices may have different lengths, we expand150// the buffer to the maximum it could be.151let buffer = other_bufferset152.get_unchecked(view.buffer_idx as usize)153.clone()154.expand_end_to_storage();155let buf_id = buffer.as_slice().as_ptr().addr();156let idx = match self.stolen_buffers.entry(buf_id) {157Entry::Occupied(o) => *o.get(),158Entry::Vacant(v) => {159let idx = self.buffer_set.len() as u32;160self.total_buffer_len += buffer.len();161self.buffer_set.push(buffer);162v.insert(idx);163idx164},165};166167// Cache result for future lookups.168*self169.buffer_set_translation_idxs170.get_unchecked_mut(view.buffer_idx as usize) =171(idx, self.buffer_set_translation_generation);172new_buffer_idx = idx;173}174view.buffer_idx = new_buffer_idx;175view176}177178unsafe fn extend_views_dedup_ignore_validity(179&mut self,180views: impl IntoIterator<Item = View>,181other_bufferset: &Buffer<Buffer<u8>>,182) {183// TODO: if there are way more buffers than length translate per-view184// rather than all at once.185self.switch_active_stealing_bufferset_to(other_bufferset);186187for mut view in views {188if view.length > View::MAX_INLINE_SIZE {189view = self.translate_view(view, other_bufferset);190}191self.total_bytes_len += view.length as usize;192self.views.push(view);193}194}195196unsafe fn extend_views_each_repeated_dedup_ignore_validity(197&mut self,198views: impl IntoIterator<Item = View>,199repeats: usize,200other_bufferset: &Buffer<Buffer<u8>>,201) {202// TODO: if there are way more buffers than length translate per-view203// rather than all at once.204self.switch_active_stealing_bufferset_to(other_bufferset);205206for mut view in views {207if view.length > View::MAX_INLINE_SIZE {208view = self.translate_view(view, other_bufferset);209}210self.total_bytes_len += repeats * view.length as usize;211for _ in 0..repeats {212self.views.push(view);213}214}215}216}217218impl<V: ViewType + ?Sized> StaticArrayBuilder for BinaryViewArrayGenericBuilder<V> {219type Array = BinaryViewArrayGeneric<V>;220221fn dtype(&self) -> &ArrowDataType {222&self.dtype223}224225fn reserve(&mut self, additional: usize) {226self.views.reserve(additional);227self.validity.reserve(additional);228}229230fn freeze(mut self) -> Self::Array {231// Flush active buffer and/or remove extra placeholder buffer.232if !self.active_buffer.is_empty() {233self.buffer_set[self.active_buffer_idx as usize] = Buffer::from(self.active_buffer);234} else if self.buffer_set.last().is_some_and(|b| b.is_empty()) {235self.buffer_set.pop();236}237238unsafe {239BinaryViewArrayGeneric::new_unchecked(240self.dtype,241Buffer::from(self.views),242Buffer::from(self.buffer_set),243self.validity.into_opt_validity(),244Some(self.total_bytes_len),245self.total_buffer_len,246)247}248}249250fn freeze_reset(&mut self) -> Self::Array {251// Flush active buffer and/or remove extra placeholder buffer.252if !self.active_buffer.is_empty() {253self.buffer_set[self.active_buffer_idx as usize] =254Buffer::from(core::mem::take(&mut self.active_buffer));255} else if self.buffer_set.last().is_some_and(|b| b.is_empty()) {256self.buffer_set.pop();257}258259let out = unsafe {260BinaryViewArrayGeneric::new_unchecked(261self.dtype.clone(),262Buffer::from(core::mem::take(&mut self.views)),263Buffer::from(core::mem::take(&mut self.buffer_set)),264core::mem::take(&mut self.validity).into_opt_validity(),265Some(self.total_bytes_len),266self.total_buffer_len,267)268};269270self.total_buffer_len = 0;271self.total_bytes_len = 0;272self.active_buffer_idx = 0;273self.stolen_buffers.clear();274self.last_buffer_set_stolen_from = None;275out276}277278fn len(&self) -> usize {279self.views.len()280}281282fn extend_nulls(&mut self, length: usize) {283self.views.extend_constant(length, View::default());284self.validity.extend_constant(length, false);285}286287fn subslice_extend(288&mut self,289other: &Self::Array,290start: usize,291length: usize,292share: ShareStrategy,293) {294self.views.reserve(length);295296unsafe {297match share {298ShareStrategy::Never => {299if let Some(v) = other.validity() {300for i in start..start + length {301if v.get_bit_unchecked(i) {302self.push_value_ignore_validity(other.value_unchecked(i));303} else {304self.views.push(View::default())305}306}307} else {308for i in start..start + length {309self.push_value_ignore_validity(other.value_unchecked(i));310}311}312},313ShareStrategy::Always => {314let other_views = &other.views()[start..start + length];315self.extend_views_dedup_ignore_validity(316other_views.iter().copied(),317other.data_buffers(),318);319},320}321}322323self.validity324.subslice_extend_from_opt_validity(other.validity(), start, length);325}326327fn subslice_extend_each_repeated(328&mut self,329other: &Self::Array,330start: usize,331length: usize,332repeats: usize,333share: ShareStrategy,334) {335self.views.reserve(length * repeats);336337unsafe {338match share {339ShareStrategy::Never => {340if let Some(v) = other.validity() {341for i in start..start + length {342if v.get_bit_unchecked(i) {343for _ in 0..repeats {344self.push_value_ignore_validity(other.value_unchecked(i));345}346} else {347for _ in 0..repeats {348self.views.push(View::default())349}350}351}352} else {353for i in start..start + length {354for _ in 0..repeats {355self.push_value_ignore_validity(other.value_unchecked(i));356}357}358}359},360ShareStrategy::Always => {361let other_views = &other.views()[start..start + length];362self.extend_views_each_repeated_dedup_ignore_validity(363other_views.iter().copied(),364repeats,365other.data_buffers(),366);367},368}369}370371self.validity372.subslice_extend_each_repeated_from_opt_validity(373other.validity(),374start,375length,376repeats,377);378}379380unsafe fn gather_extend(381&mut self,382other: &Self::Array,383idxs: &[IdxSize],384share: ShareStrategy,385) {386self.views.reserve(idxs.len());387388unsafe {389match share {390ShareStrategy::Never => {391if let Some(v) = other.validity() {392for idx in idxs {393if v.get_bit_unchecked(*idx as usize) {394self.push_value_ignore_validity(395other.value_unchecked(*idx as usize),396);397} else {398self.views.push(View::default())399}400}401} else {402for idx in idxs {403self.push_value_ignore_validity(other.value_unchecked(*idx as usize));404}405}406},407ShareStrategy::Always => {408let other_view_slice = other.views().as_slice();409let other_views = idxs410.iter()411.map(|idx| *other_view_slice.get_unchecked(*idx as usize));412self.extend_views_dedup_ignore_validity(other_views, other.data_buffers());413},414}415}416417self.validity418.gather_extend_from_opt_validity(other.validity(), idxs);419}420421fn opt_gather_extend(&mut self, other: &Self::Array, idxs: &[IdxSize], share: ShareStrategy) {422self.views.reserve(idxs.len());423424unsafe {425match share {426ShareStrategy::Never => {427if let Some(v) = other.validity() {428for idx in idxs {429if (*idx as usize) < v.len() && v.get_bit_unchecked(*idx as usize) {430self.push_value_ignore_validity(431other.value_unchecked(*idx as usize),432);433} else {434self.views.push(View::default())435}436}437} else {438for idx in idxs {439if (*idx as usize) < other.len() {440self.push_value_ignore_validity(441other.value_unchecked(*idx as usize),442);443} else {444self.views.push(View::default())445}446}447}448},449ShareStrategy::Always => {450let other_view_slice = other.views().as_slice();451let other_views = idxs.iter().map(|idx| {452other_view_slice453.get(*idx as usize)454.copied()455.unwrap_or_default()456});457self.extend_views_dedup_ignore_validity(other_views, other.data_buffers());458},459}460}461462self.validity463.opt_gather_extend_from_opt_validity(other.validity(), idxs, other.len());464}465}466467468