Path: blob/main/crates/polars-expr/src/idx_table/row_encoded.rs
6940 views
#![allow(clippy::unnecessary_cast)] // Clippy doesn't recognize that IdxSize and u64 can be different.1#![allow(unsafe_op_in_unsafe_fn)]23use arrow::array::Array;4use polars_compute::binview_index_map::{BinaryViewIndexMap, Entry};5use polars_utils::idx_vec::UnitVec;6use polars_utils::itertools::Itertools;7use polars_utils::relaxed_cell::RelaxedCell;8use polars_utils::unitvec;910use super::*;11use crate::hash_keys::HashKeys;1213#[derive(Default)]14pub struct RowEncodedIdxTable {15// These AtomicU64s actually are IdxSizes, but we use the top bit of the16// first index in each to mark keys during probing.17idx_map: BinaryViewIndexMap<UnitVec<RelaxedCell<u64>>>,18idx_offset: IdxSize,19null_keys: Vec<IdxSize>,20}2122impl RowEncodedIdxTable {23pub fn new() -> Self {24Self {25idx_map: BinaryViewIndexMap::new(),26idx_offset: 0,27null_keys: Vec::new(),28}29}30}3132impl RowEncodedIdxTable {33#[inline(always)]34fn probe_one<const MARK_MATCHES: bool>(35&self,36key_idx: IdxSize,37hash: u64,38key: &[u8],39table_match: &mut Vec<IdxSize>,40probe_match: &mut Vec<IdxSize>,41) -> bool {42if let Some(idxs) = self.idx_map.get(hash, key) {43for idx in &idxs[..] {44// Create matches, making sure to clear top bit.45table_match.push((idx.load() & !(1 << 63)) as IdxSize);46probe_match.push(key_idx);47}4849// Mark if necessary. This action is idempotent so doesn't50// fetch_or to do it atomically.51if MARK_MATCHES {52let first_idx = unsafe { idxs.get_unchecked(0) };53let first_idx_val = first_idx.load();54if first_idx_val >> 63 == 0 {55first_idx.store(first_idx_val | (1 << 63));56}57}58true59} else {60false61}62}6364fn probe_impl<'a, const MARK_MATCHES: bool, const EMIT_UNMATCHED: bool>(65&self,66hash_keys: impl Iterator<Item = (IdxSize, u64, Option<&'a [u8]>)>,67table_match: &mut Vec<IdxSize>,68probe_match: &mut Vec<IdxSize>,69limit: IdxSize,70) -> IdxSize {71let mut keys_processed = 0;72for (key_idx, hash, key) in hash_keys {73let found_match = if let Some(key) = key {74self.probe_one::<MARK_MATCHES>(key_idx, hash, key, table_match, probe_match)75} else {76false77};7879if EMIT_UNMATCHED && !found_match {80table_match.push(IdxSize::MAX);81probe_match.push(key_idx);82}8384keys_processed += 1;85if table_match.len() >= limit as usize {86break;87}88}89keys_processed90}9192fn probe_dispatch<'a>(93&self,94hash_keys: impl Iterator<Item = (IdxSize, u64, Option<&'a [u8]>)>,95table_match: &mut Vec<IdxSize>,96probe_match: &mut Vec<IdxSize>,97mark_matches: bool,98emit_unmatched: bool,99limit: IdxSize,100) -> IdxSize {101match (mark_matches, emit_unmatched) {102(false, false) => {103self.probe_impl::<false, false>(hash_keys, table_match, probe_match, limit)104},105(false, true) => {106self.probe_impl::<false, true>(hash_keys, table_match, probe_match, limit)107},108(true, false) => {109self.probe_impl::<true, false>(hash_keys, table_match, probe_match, limit)110},111(true, true) => {112self.probe_impl::<true, true>(hash_keys, table_match, probe_match, limit)113},114}115}116}117118impl IdxTable for RowEncodedIdxTable {119fn new_empty(&self) -> Box<dyn IdxTable> {120Box::new(Self::new())121}122123fn reserve(&mut self, additional: usize) {124self.idx_map.reserve(additional);125}126127fn num_keys(&self) -> IdxSize {128self.idx_map.len()129}130131fn insert_keys(&mut self, hash_keys: &HashKeys, track_unmatchable: bool) {132let HashKeys::RowEncoded(hash_keys) = hash_keys else {133unreachable!()134};135let new_idx_offset = (self.idx_offset as usize)136.checked_add(hash_keys.keys.len())137.unwrap();138assert!(139new_idx_offset < IdxSize::MAX as usize,140"overly large index in RowEncodedIdxTable"141);142143for (i, (hash, key)) in hash_keys144.hashes145.values_iter()146.zip(hash_keys.keys.iter())147.enumerate_idx()148{149let idx = self.idx_offset + i;150if let Some(key) = key {151match self.idx_map.entry(*hash, key) {152Entry::Occupied(o) => {153o.into_mut().push(RelaxedCell::from(idx as u64));154},155Entry::Vacant(v) => {156v.insert(unitvec![RelaxedCell::from(idx as u64)]);157},158}159} else if track_unmatchable {160self.null_keys.push(idx);161}162}163164self.idx_offset = new_idx_offset as IdxSize;165}166167unsafe fn insert_keys_subset(168&mut self,169hash_keys: &HashKeys,170subset: &[IdxSize],171track_unmatchable: bool,172) {173let HashKeys::RowEncoded(hash_keys) = hash_keys else {174unreachable!()175};176let new_idx_offset = (self.idx_offset as usize)177.checked_add(subset.len())178.unwrap();179assert!(180new_idx_offset < IdxSize::MAX as usize,181"overly large index in RowEncodedIdxTable"182);183184for (i, subset_idx) in subset.iter().enumerate_idx() {185let hash = unsafe { hash_keys.hashes.value_unchecked(*subset_idx as usize) };186let key = unsafe { hash_keys.keys.get_unchecked(*subset_idx as usize) };187let idx = self.idx_offset + i;188if let Some(key) = key {189match self.idx_map.entry(hash, key) {190Entry::Occupied(o) => {191o.into_mut().push(RelaxedCell::from(idx as u64));192},193Entry::Vacant(v) => {194v.insert(unitvec![RelaxedCell::from(idx as u64)]);195},196}197} else if track_unmatchable {198self.null_keys.push(idx);199}200}201202self.idx_offset = new_idx_offset as IdxSize;203}204205fn probe(206&self,207hash_keys: &HashKeys,208table_match: &mut Vec<IdxSize>,209probe_match: &mut Vec<IdxSize>,210mark_matches: bool,211emit_unmatched: bool,212limit: IdxSize,213) -> IdxSize {214let HashKeys::RowEncoded(hash_keys) = hash_keys else {215unreachable!()216};217218if hash_keys.keys.has_nulls() {219let iter = hash_keys220.hashes221.values_iter()222.copied()223.zip(hash_keys.keys.iter())224.enumerate_idx()225.map(|(i, (h, k))| (i, h, k));226self.probe_dispatch(227iter,228table_match,229probe_match,230mark_matches,231emit_unmatched,232limit,233)234} else {235let iter = hash_keys236.hashes237.values_iter()238.copied()239.zip(hash_keys.keys.values_iter().map(Some))240.enumerate_idx()241.map(|(i, (h, k))| (i, h, k));242self.probe_dispatch(243iter,244table_match,245probe_match,246mark_matches,247emit_unmatched,248limit,249)250}251}252253unsafe fn probe_subset(254&self,255hash_keys: &HashKeys,256subset: &[IdxSize],257table_match: &mut Vec<IdxSize>,258probe_match: &mut Vec<IdxSize>,259mark_matches: bool,260emit_unmatched: bool,261limit: IdxSize,262) -> IdxSize {263let HashKeys::RowEncoded(hash_keys) = hash_keys else {264unreachable!()265};266267if hash_keys.keys.has_nulls() {268let iter = subset.iter().map(|i| {269(270*i,271hash_keys.hashes.value_unchecked(*i as usize),272hash_keys.keys.get_unchecked(*i as usize),273)274});275self.probe_dispatch(276iter,277table_match,278probe_match,279mark_matches,280emit_unmatched,281limit,282)283} else {284let iter = subset.iter().map(|i| {285(286*i,287hash_keys.hashes.value_unchecked(*i as usize),288Some(hash_keys.keys.value_unchecked(*i as usize)),289)290});291self.probe_dispatch(292iter,293table_match,294probe_match,295mark_matches,296emit_unmatched,297limit,298)299}300}301302fn unmarked_keys(303&self,304out: &mut Vec<IdxSize>,305mut offset: IdxSize,306limit: IdxSize,307) -> IdxSize {308out.clear();309310let mut keys_processed = 0;311if (offset as usize) < self.null_keys.len() {312out.extend(313self.null_keys[offset as usize..]314.iter()315.copied()316.take(limit as usize),317);318keys_processed += out.len() as IdxSize;319offset += out.len() as IdxSize;320if out.len() >= limit as usize {321return keys_processed;322}323}324325offset -= self.null_keys.len() as IdxSize;326327while let Some((_, _, idxs)) = self.idx_map.get_index(offset) {328let first_idx = unsafe { idxs.get_unchecked(0) };329let first_idx_val = first_idx.load();330if first_idx_val >> 63 == 0 {331for idx in &idxs[..] {332out.push((idx.load() & !(1 << 63)) as IdxSize);333}334}335336keys_processed += 1;337offset += 1;338if out.len() >= limit as usize {339break;340}341}342343keys_processed344}345}346347348