Path: blob/main/crates/polars-core/src/frame/column/partitioned.rs
6940 views
use std::borrow::Cow;1use std::convert::identity;2use std::sync::{Arc, OnceLock};34use polars_error::{PolarsResult, polars_ensure};5use polars_utils::IdxSize;6use polars_utils::pl_str::PlSmallStr;78use super::{AnyValue, Column, DataType, Field, IntoColumn, Series};9use crate::chunked_array::cast::CastOptions;10use crate::frame::Scalar;11use crate::series::IsSorted;1213#[derive(Debug, Clone)]14#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]15#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]16pub struct PartitionedColumn {17name: PlSmallStr,1819values: Series,20ends: Arc<[IdxSize]>,2122#[cfg_attr(feature = "serde", serde(skip, default))]23materialized: OnceLock<Series>,24}2526impl IntoColumn for PartitionedColumn {27fn into_column(self) -> Column {28Column::Partitioned(self)29}30}3132impl From<PartitionedColumn> for Column {33fn from(value: PartitionedColumn) -> Self {34value.into_column()35}36}3738fn verify_invariants(values: &Series, ends: &[IdxSize]) -> PolarsResult<()> {39polars_ensure!(40values.len() == ends.len(),41ComputeError: "partitioned column `values` length does not match `ends` length ({} != {})",42values.len(),43ends.len()44);4546for vs in ends.windows(2) {47polars_ensure!(48vs[0] <= vs[1],49ComputeError: "partitioned column `ends` are not monotonely non-decreasing",50);51}5253Ok(())54}5556impl PartitionedColumn {57pub fn new(name: PlSmallStr, values: Series, ends: Arc<[IdxSize]>) -> Self {58Self::try_new(name, values, ends).unwrap()59}6061/// # Safety62///63/// Safe if:64/// - `values.len() == ends.len()`65/// - all values can have `dtype`66/// - `ends` is monotonely non-decreasing67pub unsafe fn new_unchecked(name: PlSmallStr, values: Series, ends: Arc<[IdxSize]>) -> Self {68if cfg!(debug_assertions) {69verify_invariants(&values, ends.as_ref()).unwrap();70}7172let values = values.rechunk();73Self {74name,75values,76ends,77materialized: OnceLock::new(),78}79}8081pub fn try_new(name: PlSmallStr, values: Series, ends: Arc<[IdxSize]>) -> PolarsResult<Self> {82verify_invariants(&values, ends.as_ref())?;8384// SAFETY: Invariants checked before85Ok(unsafe { Self::new_unchecked(name, values, ends) })86}8788pub fn new_empty(name: PlSmallStr, dtype: DataType) -> Self {89Self {90name,91values: Series::new_empty(PlSmallStr::EMPTY, &dtype),92ends: Arc::default(),9394materialized: OnceLock::new(),95}96}9798pub fn len(&self) -> usize {99self.ends.last().map_or(0, |last| *last as usize)100}101102pub fn is_empty(&self) -> bool {103self.len() == 0104}105106pub fn name(&self) -> &PlSmallStr {107&self.name108}109110pub fn dtype(&self) -> &DataType {111self.values.dtype()112}113114#[inline]115pub fn field(&self) -> Cow<'_, Field> {116match self.lazy_as_materialized_series() {117None => Cow::Owned(Field::new(self.name().clone(), self.dtype().clone())),118Some(s) => s.field(),119}120}121122pub fn rename(&mut self, name: PlSmallStr) -> &mut Self {123self.name = name;124self125}126127fn _to_series(name: PlSmallStr, values: &Series, ends: &[IdxSize]) -> Series {128let dtype = values.dtype();129let mut column = Column::Series(Series::new_empty(name, dtype).into());130131let mut prev_offset = 0;132for (i, &offset) in ends.iter().enumerate() {133// @TODO: Optimize134let length = offset - prev_offset;135column136.extend(&Column::new_scalar(137PlSmallStr::EMPTY,138Scalar::new(dtype.clone(), values.get(i).unwrap().into_static()),139length as usize,140))141.unwrap();142prev_offset = offset;143}144145debug_assert_eq!(column.len(), prev_offset as usize);146147column.take_materialized_series()148}149150/// Materialize the [`PartitionedColumn`] into a [`Series`].151fn to_series(&self) -> Series {152Self::_to_series(self.name.clone(), &self.values, &self.ends)153}154155/// Get the [`PartitionedColumn`] as [`Series`] if it was already materialized.156pub fn lazy_as_materialized_series(&self) -> Option<&Series> {157self.materialized.get()158}159160/// Get the [`PartitionedColumn`] as [`Series`]161///162/// This needs to materialize upon the first call. Afterwards, this is cached.163pub fn as_materialized_series(&self) -> &Series {164self.materialized.get_or_init(|| self.to_series())165}166167/// Take the [`PartitionedColumn`] and materialize as a [`Series`] if not already done.168pub fn take_materialized_series(self) -> Series {169self.materialized170.into_inner()171.unwrap_or_else(|| Self::_to_series(self.name, &self.values, &self.ends))172}173174pub fn apply_unary_elementwise(&self, f: impl Fn(&Series) -> Series) -> Self {175let result = f(&self.values).rechunk();176assert_eq!(self.values.len(), result.len());177unsafe { Self::new_unchecked(self.name.clone(), result, self.ends.clone()) }178}179180pub fn try_apply_unary_elementwise(181&self,182f: impl Fn(&Series) -> PolarsResult<Series>,183) -> PolarsResult<Self> {184let result = f(&self.values)?.rechunk();185assert_eq!(self.values.len(), result.len());186Ok(unsafe { Self::new_unchecked(self.name.clone(), result, self.ends.clone()) })187}188189pub fn extend_constant(&self, value: AnyValue, n: usize) -> PolarsResult<Self> {190let mut new_ends = self.ends.to_vec();191// @TODO: IdxSize checks192let new_length = (self.len() + n) as IdxSize;193194let values = if !self.is_empty() && self.values.last().value() == &value {195*new_ends.last_mut().unwrap() = new_length;196self.values.clone()197} else {198new_ends.push(new_length);199self.values.extend_constant(value, 1)?200};201202Ok(unsafe { Self::new_unchecked(self.name.clone(), values, new_ends.into()) })203}204205pub unsafe fn get_unchecked(&self, index: usize) -> AnyValue<'_> {206debug_assert!(index < self.len());207208// Common situation get_unchecked(0)209if index < self.ends[0] as usize {210return unsafe { self.get_unchecked(0) };211}212213let value_idx = self214.ends215.binary_search(&(index as IdxSize))216.map_or_else(identity, identity);217218self.get_unchecked(value_idx)219}220221pub fn min_reduce(&self) -> PolarsResult<Scalar> {222self.values.min_reduce()223}224pub fn max_reduce(&self) -> Result<Scalar, polars_error::PolarsError> {225self.values.max_reduce()226}227228pub fn reverse(&self) -> Self {229let values = self.values.reverse();230let mut ends = Vec::with_capacity(self.ends.len());231232let mut offset = 0;233ends.extend(self.ends.windows(2).rev().map(|vs| {234offset += vs[1] - vs[0];235offset236}));237ends.push(self.len() as IdxSize);238239unsafe { Self::new_unchecked(self.name.clone(), values, ends.into()) }240}241242pub fn set_sorted_flag(&mut self, sorted: IsSorted) {243self.values.set_sorted_flag(sorted);244}245246pub fn cast_with_options(&self, dtype: &DataType, options: CastOptions) -> PolarsResult<Self> {247let values = self.values.cast_with_options(dtype, options)?;248Ok(unsafe { Self::new_unchecked(self.name.clone(), values, self.ends.clone()) })249}250251pub fn strict_cast(&self, dtype: &DataType) -> PolarsResult<Self> {252let values = self.values.strict_cast(dtype)?;253Ok(unsafe { Self::new_unchecked(self.name.clone(), values, self.ends.clone()) })254}255256pub fn cast(&self, dtype: &DataType) -> PolarsResult<Self> {257let values = self.values.cast(dtype)?;258Ok(unsafe { Self::new_unchecked(self.name.clone(), values, self.ends.clone()) })259}260261pub unsafe fn cast_unchecked(&self, dtype: &DataType) -> PolarsResult<Self> {262let values = unsafe { self.values.cast_unchecked(dtype) }?;263Ok(unsafe { Self::new_unchecked(self.name.clone(), values, self.ends.clone()) })264}265266pub fn null_count(&self) -> usize {267match self.lazy_as_materialized_series() {268Some(s) => s.null_count(),269None => {270// @partition-opt271self.as_materialized_series().null_count()272},273}274}275276pub fn clear(&self) -> Self {277Self::new_empty(self.name.clone(), self.values.dtype().clone())278}279280pub fn partitions(&self) -> &Series {281&self.values282}283pub fn partition_ends(&self) -> &[IdxSize] {284&self.ends285}286287pub fn partition_ends_ref(&self) -> &Arc<[IdxSize]> {288&self.ends289}290291pub fn or_reduce(&self) -> PolarsResult<Scalar> {292self.values.or_reduce()293}294295pub fn and_reduce(&self) -> PolarsResult<Scalar> {296self.values.and_reduce()297}298}299300301