Path: blob/main/crates/polars-expr/src/idx_table/binview.rs
6940 views
#![allow(clippy::unnecessary_cast)] // Clippy doesn't recognize that IdxSize and u64 can be different.1#![allow(unsafe_op_in_unsafe_fn)]23use arrow::array::{Array, View};4use arrow::buffer::Buffer;5use polars_compute::binview_index_map::{BinaryViewIndexMap, Entry};6use polars_utils::idx_vec::UnitVec;7use polars_utils::itertools::Itertools;8use polars_utils::relaxed_cell::RelaxedCell;9use polars_utils::unitvec;1011use super::*;12use crate::hash_keys::HashKeys;1314pub struct BinviewKeyIdxTable {15// These AtomicU64s actually are IdxSizes, but we use the top bit of the16// first index in each to mark keys during probing.17idx_map: BinaryViewIndexMap<UnitVec<RelaxedCell<u64>>>,18idx_offset: IdxSize,19null_keys: Vec<IdxSize>,20nulls_emitted: RelaxedCell<bool>,21}2223impl BinviewKeyIdxTable {24pub fn new() -> Self {25Self {26idx_map: BinaryViewIndexMap::default(),27idx_offset: 0,28null_keys: Vec::new(),29nulls_emitted: RelaxedCell::from(false),30}31}3233/// # Safety34/// The view must be valid for the buffers.35#[inline(always)]36unsafe fn probe_one<const MARK_MATCHES: bool>(37&self,38key_idx: IdxSize,39hash: u64,40key: &View,41buffers: &[Buffer<u8>],42table_match: &mut Vec<IdxSize>,43probe_match: &mut Vec<IdxSize>,44) -> bool {45if let Some(idxs) = unsafe { self.idx_map.get_view(hash, key, buffers) } {46for idx in &idxs[..] {47// Create matches, making sure to clear top bit.48table_match.push((idx.load() & !(1 << 63)) as IdxSize);49probe_match.push(key_idx);50}5152// Mark if necessary. This action is idempotent so doesn't need53// atomic fetch_or to do it atomically.54if MARK_MATCHES {55let first_idx = unsafe { idxs.get_unchecked(0) };56let first_idx_val = first_idx.load();57if first_idx_val >> 63 == 0 {58first_idx.store(first_idx_val | (1 << 63));59}60}61true62} else {63false64}65}6667/// # Safety68/// The views must be valid for the buffers.69unsafe fn probe_impl<70'a,71const MARK_MATCHES: bool,72const EMIT_UNMATCHED: bool,73const NULL_IS_VALID: bool,74>(75&self,76keys: impl Iterator<Item = (IdxSize, u64, Option<&'a View>)>,77buffers: &[Buffer<u8>],78table_match: &mut Vec<IdxSize>,79probe_match: &mut Vec<IdxSize>,80limit: IdxSize,81) -> IdxSize {82let mut keys_processed = 0;83for (key_idx, hash, key) in keys {84let found_match = if let Some(key) = key {85self.probe_one::<MARK_MATCHES>(86key_idx,87hash,88key,89buffers,90table_match,91probe_match,92)93} else if NULL_IS_VALID {94for idx in &self.null_keys {95table_match.push(*idx);96probe_match.push(key_idx);97}98if MARK_MATCHES && !self.nulls_emitted.load() {99self.nulls_emitted.store(true);100}101!self.null_keys.is_empty()102} else {103false104};105106if EMIT_UNMATCHED && !found_match {107table_match.push(IdxSize::MAX);108probe_match.push(key_idx);109}110111keys_processed += 1;112if table_match.len() >= limit as usize {113break;114}115}116keys_processed117}118119/// # Safety120/// The views must be valid for the buffers.121#[allow(clippy::too_many_arguments)]122unsafe fn probe_dispatch<'a>(123&self,124keys: impl Iterator<Item = (IdxSize, u64, Option<&'a View>)>,125buffers: &[Buffer<u8>],126table_match: &mut Vec<IdxSize>,127probe_match: &mut Vec<IdxSize>,128mark_matches: bool,129emit_unmatched: bool,130null_is_valid: bool,131limit: IdxSize,132) -> IdxSize {133match (mark_matches, emit_unmatched, null_is_valid) {134(false, false, false) => self.probe_impl::<false, false, false>(135keys,136buffers,137table_match,138probe_match,139limit,140),141(false, false, true) => self.probe_impl::<false, false, true>(142keys,143buffers,144table_match,145probe_match,146limit,147),148(false, true, false) => self.probe_impl::<false, true, false>(149keys,150buffers,151table_match,152probe_match,153limit,154),155(false, true, true) => {156self.probe_impl::<false, true, true>(keys, buffers, table_match, probe_match, limit)157},158(true, false, false) => self.probe_impl::<true, false, false>(159keys,160buffers,161table_match,162probe_match,163limit,164),165(true, false, true) => {166self.probe_impl::<true, false, true>(keys, buffers, table_match, probe_match, limit)167},168(true, true, false) => {169self.probe_impl::<true, true, false>(keys, buffers, table_match, probe_match, limit)170},171(true, true, true) => {172self.probe_impl::<true, true, true>(keys, buffers, table_match, probe_match, limit)173},174}175}176}177178impl IdxTable for BinviewKeyIdxTable {179fn new_empty(&self) -> Box<dyn IdxTable> {180Box::new(Self::new())181}182183fn reserve(&mut self, additional: usize) {184self.idx_map.reserve(additional);185}186187fn num_keys(&self) -> IdxSize {188self.idx_map.len()189}190191fn insert_keys(&mut self, _hash_keys: &HashKeys, _track_unmatchable: bool) {192// Isn't needed anymore, but also don't want to remove the code from the other implementations.193unimplemented!()194}195196unsafe fn insert_keys_subset(197&mut self,198hash_keys: &HashKeys,199subset: &[IdxSize],200track_unmatchable: bool,201) {202let HashKeys::Binview(hash_keys) = hash_keys else {203unreachable!()204};205let new_idx_offset = (self.idx_offset as usize)206.checked_add(subset.len())207.unwrap();208assert!(209new_idx_offset < IdxSize::MAX as usize,210"overly large index in BinviewKeyIdxTable"211);212213unsafe {214let buffers = hash_keys.keys.data_buffers();215let views = hash_keys.keys.views();216if let Some(validity) = hash_keys.keys.validity() {217for (i, subset_idx) in subset.iter().enumerate_idx() {218let hash = hash_keys.hashes.value_unchecked(*subset_idx as usize);219let key = views.get_unchecked(*subset_idx as usize);220let idx = self.idx_offset + i;221if validity.get_bit_unchecked(*subset_idx as usize) {222match self.idx_map.entry_view(hash, *key, buffers) {223Entry::Occupied(o) => {224o.into_mut().push(RelaxedCell::from(idx as u64));225},226Entry::Vacant(v) => {227v.insert(unitvec![RelaxedCell::from(idx as u64)]);228},229}230} else if track_unmatchable | hash_keys.null_is_valid {231self.null_keys.push(idx);232}233}234} else {235for (i, subset_idx) in subset.iter().enumerate_idx() {236let hash = hash_keys.hashes.value_unchecked(*subset_idx as usize);237let key = views.get_unchecked(*subset_idx as usize);238let idx = self.idx_offset + i;239match self.idx_map.entry_view(hash, *key, buffers) {240Entry::Occupied(o) => {241o.into_mut().push(RelaxedCell::from(idx as u64));242},243Entry::Vacant(v) => {244v.insert(unitvec![RelaxedCell::from(idx as u64)]);245},246}247}248}249}250251self.idx_offset = new_idx_offset as IdxSize;252}253254fn probe(255&self,256_hash_keys: &HashKeys,257_table_match: &mut Vec<IdxSize>,258_probe_match: &mut Vec<IdxSize>,259_mark_matches: bool,260_emit_unmatched: bool,261_limit: IdxSize,262) -> IdxSize {263// Isn't needed anymore, but also don't want to remove the code from the other implementations.264unimplemented!()265}266267unsafe fn probe_subset(268&self,269hash_keys: &HashKeys,270subset: &[IdxSize],271table_match: &mut Vec<IdxSize>,272probe_match: &mut Vec<IdxSize>,273mark_matches: bool,274emit_unmatched: bool,275limit: IdxSize,276) -> IdxSize {277let HashKeys::Binview(hash_keys) = hash_keys else {278unreachable!()279};280281unsafe {282let buffers = hash_keys.keys.data_buffers();283let views = hash_keys.keys.views();284if let Some(validity) = hash_keys.keys.validity() {285let iter = subset.iter().map(|i| {286(287*i,288hash_keys.hashes.value_unchecked(*i as usize),289if validity.get_bit_unchecked(*i as usize) {290Some(views.get_unchecked(*i as usize))291} else {292None293},294)295});296self.probe_dispatch(297iter,298buffers,299table_match,300probe_match,301mark_matches,302emit_unmatched,303hash_keys.null_is_valid,304limit,305)306} else {307let iter = subset.iter().map(|i| {308(309*i,310hash_keys.hashes.value_unchecked(*i as usize),311Some(views.get_unchecked(*i as usize)),312)313});314self.probe_dispatch(315iter,316buffers,317table_match,318probe_match,319mark_matches,320emit_unmatched,321false, // Whether or not nulls are valid doesn't matter.322limit,323)324}325}326}327328fn unmarked_keys(329&self,330out: &mut Vec<IdxSize>,331mut offset: IdxSize,332limit: IdxSize,333) -> IdxSize {334out.clear();335336let mut keys_processed = 0;337if !self.nulls_emitted.load() {338if (offset as usize) < self.null_keys.len() {339out.extend(340self.null_keys[offset as usize..]341.iter()342.copied()343.take(limit as usize),344);345keys_processed += out.len() as IdxSize;346offset += out.len() as IdxSize;347if out.len() >= limit as usize {348return keys_processed;349}350}351offset -= self.null_keys.len() as IdxSize;352}353354while let Some((_, _, idxs)) = self.idx_map.get_index(offset) {355let first_idx = unsafe { idxs.get_unchecked(0) };356let first_idx_val = first_idx.load();357if first_idx_val >> 63 == 0 {358for idx in &idxs[..] {359out.push((idx.load() & !(1 << 63)) as IdxSize);360}361}362363keys_processed += 1;364offset += 1;365if out.len() >= limit as usize {366break;367}368}369370keys_processed371}372}373374375