Path: blob/main/crates/polars-compute/src/propagate_nulls.rs
6939 views
use arrow::array::{Array, FixedSizeListArray, ListArray, StructArray};1use arrow::bitmap::BitmapBuilder;2use arrow::bitmap::bitmask::BitMask;3use arrow::types::Offset;45/// Propagate nulls down to masked-out values in lower nesting levels.6pub fn propagate_nulls(arr: &dyn Array) -> Option<Box<dyn Array>> {7let arr = arr.as_any();8if let Some(arr) = arr.downcast_ref::<ListArray<i32>>() {9return propagate_nulls_list(arr).map(|arr| Box::new(arr) as _);10}11if let Some(arr) = arr.downcast_ref::<ListArray<i64>>() {12return propagate_nulls_list(arr).map(|arr| Box::new(arr) as _);13}14if let Some(arr) = arr.downcast_ref::<FixedSizeListArray>() {15return propagate_nulls_fsl(arr).map(|arr| Box::new(arr) as _);16}17if let Some(arr) = arr.downcast_ref::<StructArray>() {18return propagate_nulls_struct(arr).map(|arr| Box::new(arr) as _);19}2021None22}2324pub fn propagate_nulls_list<O: Offset>(arr: &ListArray<O>) -> Option<ListArray<O>> {25let Some(validity) = arr.validity() else {26return propagate_nulls(arr.values().as_ref()).map(|values| {27ListArray::new(arr.dtype().clone(), arr.offsets().clone(), values, None)28});29};3031let mut last_idx = 0;32let old_child_validity = arr.values().validity();33let mut new_child_validity = BitmapBuilder::new();3435let mut new_values = None;3637// Find the first element that does not have propagated nulls.38let null_mask = !validity;39for i in null_mask.true_idx_iter() {40last_idx = i;41let (start, end) = arr.offsets().start_end(i);42if end == start {43continue;44}4546if old_child_validity.is_none_or(|v| {47BitMask::from_bitmap(v)48.sliced(start, end - start)49.set_bits()50> 051}) {52new_child_validity.subslice_extend_from_opt_validity(old_child_validity, 0, start);53new_child_validity.extend_constant(end - start, false);54break;55}56}5758if !new_child_validity.is_empty() {59// If nulls need to be propagated, create a new validity mask for the child array.60let null_mask = null_mask.sliced(last_idx + 1, arr.len() - last_idx - 1);6162for i in null_mask.true_idx_iter() {63let i = i + last_idx + 1;64let (start, end) = arr.offsets().start_end(i);65if end == start {66continue;67}6869new_child_validity.subslice_extend_from_opt_validity(70old_child_validity,71new_child_validity.len(),72start - new_child_validity.len(),73);74new_child_validity.extend_constant(end - start, false);75}7677new_child_validity.subslice_extend_from_opt_validity(78old_child_validity,79new_child_validity.len(),80arr.values().len() - new_child_validity.len(),81);8283let new_child_validity = new_child_validity.freeze();84new_values = Some(arr.values().with_validity(Some(new_child_validity)));85}8687let Some(values) = new_values88.as_ref()89.and_then(|v| propagate_nulls(v.as_ref()))90.or(new_values)91else {92// Nothing was changed. Return the original array.93return None;94};9596Some(ListArray::new(97arr.dtype().clone(),98arr.offsets().clone(),99values,100Some(validity.clone()),101))102}103104pub fn propagate_nulls_fsl(arr: &FixedSizeListArray) -> Option<FixedSizeListArray> {105let Some(validity) = arr.validity() else {106return propagate_nulls(arr.values().as_ref())107.map(|values| FixedSizeListArray::new(arr.dtype().clone(), arr.len(), values, None));108};109110if arr.size() == 0 || validity.unset_bits() == 0 {111return None;112}113114let start_point = match arr.values().validity() {115None => Some(validity.leading_ones()),116Some(old_child_validity) => {117// Find the first element that does not have propagated nulls.118let null_mask = !validity;119null_mask.true_idx_iter().find(|i| {120BitMask::from_bitmap(old_child_validity)121.sliced(i * arr.size(), arr.size())122.set_bits()123> 0124})125},126};127128let mut new_values = None;129if let Some(start_point) = start_point {130// Nulls need to be propagated, create a new validity mask.131let mut new_child_validity = BitmapBuilder::with_capacity(arr.size() * arr.len());132133let mut validity = validity.clone();134validity.slice(start_point, validity.len() - start_point);135match arr.values().validity() {136None => {137new_child_validity.extend_constant(start_point * arr.size(), true);138139while !validity.is_empty() {140let num_zeroes = validity.take_leading_zeros();141new_child_validity.extend_constant(num_zeroes * arr.size(), false);142143let num_ones = validity.take_leading_ones();144new_child_validity.extend_constant(num_ones * arr.size(), true);145}146},147148Some(old_child_validity) => {149new_child_validity.subslice_extend_from_bitmap(150old_child_validity,1510,152start_point * arr.size(),153);154while !validity.is_empty() {155let num_zeroes = validity.take_leading_zeros();156new_child_validity.extend_constant(num_zeroes * arr.size(), false);157158let num_ones = validity.take_leading_ones();159new_child_validity.subslice_extend_from_bitmap(160old_child_validity,161new_child_validity.len(),162num_ones * arr.size(),163);164}165},166}167168let new_child_validity = new_child_validity.freeze();169new_values = Some(arr.values().with_validity(Some(new_child_validity)));170}171172let Some(values) = new_values173.as_ref()174.and_then(|v| propagate_nulls(v.as_ref()))175.or(new_values)176else {177// Nothing was changed. Return the original array.178return None;179};180181// The child array was changed.182Some(FixedSizeListArray::new(183arr.dtype().clone(),184arr.len(),185values,186Some(validity.clone()),187))188}189190pub fn propagate_nulls_struct(arr: &StructArray) -> Option<StructArray> {191let Some(validity) = arr.validity() else {192let mut new_values = Vec::new();193for (i, field_array) in arr.values().iter().enumerate() {194if let Some(field_array) = propagate_nulls(field_array.as_ref()) {195new_values.reserve(arr.values().len());196new_values.extend(arr.values()[..i].iter().cloned());197new_values.push(field_array);198break;199}200}201202if new_values.is_empty() {203return None;204}205206new_values.extend(arr.values()[new_values.len()..].iter().map(|field_array| {207propagate_nulls(field_array.as_ref()).unwrap_or_else(|| field_array.to_boxed())208}));209return Some(StructArray::new(210arr.dtype().clone(),211arr.len(),212new_values,213None,214));215};216217if arr.values().is_empty() || validity.unset_bits() == 0 {218return None;219}220221let mut new_values = Vec::new();222for (i, field_array) in arr.values().iter().enumerate() {223let new_field_array = match field_array.validity() {224None => Some(field_array.with_validity(Some(validity.clone()))),225Some(v) if v.num_intersections_with(validity) == validity.set_bits() => None,226Some(v) => Some(field_array.with_validity(Some(v & validity))),227};228229let Some(new_field_array) = new_field_array230.as_ref()231.and_then(|v| propagate_nulls(v.as_ref()))232.or(new_field_array)233else {234// Nothing was changed. Return the original array.235continue;236};237238new_values.reserve(arr.values().len());239new_values.extend(arr.values()[..i].iter().cloned());240new_values.push(new_field_array);241break;242}243244if new_values.is_empty() {245return None;246}247248new_values.extend(arr.values()[new_values.len()..].iter().map(|field_array| {249let new_field_array = match field_array.validity() {250None => Some(field_array.with_validity(Some(validity.clone()))),251Some(v) if v.num_intersections_with(validity) == validity.set_bits() => None,252Some(v) => Some(field_array.with_validity(Some(v & validity))),253};254255new_field_array256.as_ref()257.and_then(|v| propagate_nulls(v.as_ref()))258.or(new_field_array)259.unwrap_or_else(|| field_array.clone())260}));261262Some(StructArray::new(263arr.dtype().clone(),264arr.len(),265new_values,266Some(validity.clone()),267))268}269270#[cfg(test)]271mod tests {272use arrow::array::proptest::array;273use proptest::proptest;274275use crate::propagate_nulls::propagate_nulls;276277proptest! {278#[test]279fn test_proptest(array in array(0..100)) {280if let Some(p_arr) = propagate_nulls(array.as_ref()) {281proptest::prop_assert_eq!(array, p_arr);282}283}284}285}286287288