Path: blob/main/crates/polars-row/src/variable/binary.rs
6939 views
#![allow(unsafe_op_in_unsafe_fn)]1//! Row encoding for Binary values2//!3//! - single `0_u8` if null4//! - single `1_u8` if empty array5//! - `2_u8` if not empty, followed by one or more blocks6//!7//! where a block is encoded as8//!9//! - [`BLOCK_SIZE`] bytes of string data, padded with 0s10//! - `0xFF_u8` if this is not the last block for this string11//! - otherwise the length of the block as a `u8`12use std::mem::MaybeUninit;1314use arrow::array::{BinaryViewArray, MutableBinaryViewArray};15use polars_utils::slice::Slice2Uninit;1617use crate::row::RowEncodingOptions;18use crate::utils::decode_opt_nulls;1920/// The block size of the variable length encoding21pub(crate) const BLOCK_SIZE: usize = 32;2223/// The continuation token.24pub(crate) const BLOCK_CONTINUATION_TOKEN: u8 = 0xFF;2526/// Indicates an empty string27pub(crate) const EMPTY_SENTINEL: u8 = 1;2829/// Indicates a non-empty string30pub(crate) const NON_EMPTY_SENTINEL: u8 = 2;3132/// Returns the ceil of `value`/`divisor`33#[inline]34pub fn ceil(value: usize, divisor: usize) -> usize {35// Rewrite as `value.div_ceil(&divisor)` after36// https://github.com/rust-lang/rust/issues/88581 is merged.37value / divisor + !value.is_multiple_of(divisor) as usize38}3940#[inline]41fn padded_length(a: usize) -> usize {421 + ceil(a, BLOCK_SIZE) * (BLOCK_SIZE + 1)43}4445#[inline]46pub fn encoded_len_from_len(a: Option<usize>, _opt: RowEncodingOptions) -> usize {47a.map_or(1, padded_length)48}4950/// Encode one strings/bytes object and return the written length.51///52/// # Safety53/// `out` must have allocated enough room54unsafe fn encode_one(55out: &mut [MaybeUninit<u8>],56val: Option<&[MaybeUninit<u8>]>,57opt: RowEncodingOptions,58) -> usize {59let descending = opt.contains(RowEncodingOptions::DESCENDING);60match val {61Some([]) => {62let byte = if descending {63!EMPTY_SENTINEL64} else {65EMPTY_SENTINEL66};67*out.get_unchecked_mut(0) = MaybeUninit::new(byte);68169},70Some(val) => {71let block_count = ceil(val.len(), BLOCK_SIZE);72let end_offset = 1 + block_count * (BLOCK_SIZE + 1);7374let dst = out.get_unchecked_mut(..end_offset);7576// Write `2_u8` to demarcate as non-empty, non-null string77*dst.get_unchecked_mut(0) = MaybeUninit::new(NON_EMPTY_SENTINEL);7879let src_chunks = val.chunks_exact(BLOCK_SIZE);80let src_remainder = src_chunks.remainder();8182// + 1 is for the BLOCK CONTINUATION TOKEN83let dst_chunks = dst.get_unchecked_mut(1..).chunks_exact_mut(BLOCK_SIZE + 1);8485for (src, dst) in src_chunks.zip(dst_chunks) {86// we copy src.len() that leaves 1 bytes for the continuation tkn.87std::ptr::copy_nonoverlapping(src.as_ptr(), dst.as_mut_ptr(), src.len());88// Indicate that there are further blocks to follow89*dst.get_unchecked_mut(BLOCK_SIZE) = MaybeUninit::new(BLOCK_CONTINUATION_TOKEN);90}9192// exactly BLOCK_SIZE bytes93// this means we only need to set the length94// all other bytes are already initialized95if src_remainder.is_empty() {96// overwrite the latest continuation marker.97// replace the "there is another block" with98// "we are finished this, this is the length of this block"99*dst.last_mut().unwrap_unchecked() = MaybeUninit::new(BLOCK_SIZE as u8);100}101// there are remainder bytes102else {103// get the last block104let start_offset = 1 + (block_count - 1) * (BLOCK_SIZE + 1);105let last_dst = dst.get_unchecked_mut(start_offset..);106let n_bytes_to_write = src_remainder.len();107108std::ptr::copy_nonoverlapping(109src_remainder.as_ptr(),110last_dst.as_mut_ptr(),111n_bytes_to_write,112);113// write remainder as zeros114last_dst115.get_unchecked_mut(n_bytes_to_write..last_dst.len() - 1)116.fill(MaybeUninit::new(0));117*dst.last_mut().unwrap_unchecked() = MaybeUninit::new(src_remainder.len() as u8);118}119120if descending {121for byte in dst {122*byte = MaybeUninit::new(!byte.assume_init());123}124}125end_offset126},127None => {128*out.get_unchecked_mut(0) = MaybeUninit::new(opt.null_sentinel());129// // write remainder as zeros130// out.get_unchecked_mut(1..).fill(MaybeUninit::new(0));1311132},133}134}135136pub(crate) unsafe fn encode_iter<'a, I: Iterator<Item = Option<&'a [u8]>>>(137buffer: &mut [MaybeUninit<u8>],138input: I,139opt: RowEncodingOptions,140row_starts: &mut [usize],141) {142for (offset, opt_value) in row_starts.iter_mut().zip(input) {143let dst = buffer.get_unchecked_mut(*offset..);144let written_len = encode_one(dst, opt_value.map(|v| v.as_uninit()), opt);145*offset += written_len;146}147}148149pub(crate) unsafe fn encoded_item_len(row: &[u8], opt: RowEncodingOptions) -> usize {150let descending = opt.contains(RowEncodingOptions::DESCENDING);151let (non_empty_sentinel, continuation_token) = if descending {152(!NON_EMPTY_SENTINEL, !BLOCK_CONTINUATION_TOKEN)153} else {154(NON_EMPTY_SENTINEL, BLOCK_CONTINUATION_TOKEN)155};156157// empty or null158if *row.get_unchecked(0) != non_empty_sentinel {159return 1;160}161162let mut idx = 1;163loop {164let sentinel = *row.get_unchecked(idx + BLOCK_SIZE);165if sentinel == continuation_token {166idx += BLOCK_SIZE + 1;167continue;168}169return idx + BLOCK_SIZE + 1;170}171}172173unsafe fn decoded_len(174row: &[u8],175non_empty_sentinel: u8,176continuation_token: u8,177descending: bool,178) -> usize {179// empty or null180if *row.get_unchecked(0) != non_empty_sentinel {181return 0;182}183184let mut str_len = 0;185let mut idx = 1;186loop {187let sentinel = *row.get_unchecked(idx + BLOCK_SIZE);188if sentinel == continuation_token {189idx += BLOCK_SIZE + 1;190str_len += BLOCK_SIZE;191continue;192}193// the sentinel of the last block has the length194// of that block. The rest is padding.195let block_length = if descending {196// all bits were inverted on encoding197!sentinel198} else {199sentinel200};201return str_len + block_length as usize;202}203}204205pub(crate) unsafe fn decode_binview(206rows: &mut [&[u8]],207opt: RowEncodingOptions,208) -> BinaryViewArray {209let descending = opt.contains(RowEncodingOptions::DESCENDING);210let (non_empty_sentinel, continuation_token) = if descending {211(!NON_EMPTY_SENTINEL, !BLOCK_CONTINUATION_TOKEN)212} else {213(NON_EMPTY_SENTINEL, BLOCK_CONTINUATION_TOKEN)214};215216let null_sentinel = opt.null_sentinel();217let validity = decode_opt_nulls(rows, null_sentinel);218219let mut mutable = MutableBinaryViewArray::with_capacity(rows.len());220221let mut scratch = vec![];222for row in rows {223scratch.set_len(0);224let str_len = decoded_len(row, non_empty_sentinel, continuation_token, descending);225let mut to_read = str_len;226// we start at one, as we skip the validity byte227let mut offset = 1;228229while to_read >= BLOCK_SIZE {230to_read -= BLOCK_SIZE;231scratch.extend_from_slice(row.get_unchecked(offset..offset + BLOCK_SIZE));232offset += BLOCK_SIZE + 1;233}234235if to_read != 0 {236scratch.extend_from_slice(row.get_unchecked(offset..offset + to_read));237offset += BLOCK_SIZE + 1;238}239*row = row.get_unchecked(offset..);240241if descending {242scratch.iter_mut().for_each(|o| *o = !*o)243}244mutable.push_value_ignore_validity(&scratch);245}246247let out: BinaryViewArray = mutable.into();248out.with_validity(validity)249}250251252