use arrow::offset::OffsetsBuffer;
use polars_utils::pl_str::PlSmallStr;
use rayon::prelude::*;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use crate::POOL;
use crate::chunked_array::ops::explode::offsets_to_indexes;
use crate::prelude::*;
use crate::series::IsSorted;
fn get_exploded(
series: &Series,
options: ExplodeOptions,
) -> PolarsResult<(Series, OffsetsBuffer<i64>)> {
match series.dtype() {
DataType::List(_) => series.list().unwrap().explode_and_offsets(options),
#[cfg(feature = "dtype-array")]
DataType::Array(_, _) => series.array().unwrap().explode_and_offsets(options),
_ => polars_bail!(opq = explode, series.dtype()),
}
}
#[derive(Clone, Default, Debug, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct UnpivotArgsIR {
pub on: Vec<PlSmallStr>,
pub index: Vec<PlSmallStr>,
pub variable_name: PlSmallStr,
pub value_name: PlSmallStr,
}
impl UnpivotArgsIR {
pub fn new(
all_column_names: Vec<PlSmallStr>,
on: Option<Vec<PlSmallStr>>,
index: Vec<PlSmallStr>,
value_name: Option<PlSmallStr>,
variable_name: Option<PlSmallStr>,
) -> Self {
let on = on.unwrap_or_else(|| {
let index_set = PlHashSet::from_iter(index.iter().cloned());
all_column_names
.into_iter()
.filter(|s| !index_set.contains(s))
.collect()
});
Self {
on,
index,
variable_name: variable_name.unwrap_or_else(|| PlSmallStr::from_static("variable")),
value_name: value_name.unwrap_or_else(|| PlSmallStr::from_static("value")),
}
}
}
impl DataFrame {
pub fn explode_impl(
&self,
mut columns: Vec<Column>,
options: ExplodeOptions,
) -> PolarsResult<DataFrame> {
polars_ensure!(!columns.is_empty(), InvalidOperation: "no columns provided in explode");
let mut df = self.clone();
if self.shape_has_zero() {
for s in &columns {
df.with_column(s.as_materialized_series().explode(options)?.into_column())?;
}
return Ok(df);
}
columns.sort_by_key(|c| self.try_get_column_index(c.name()).unwrap());
for s in &columns {
df = df.drop(s.name().as_str())?;
}
let exploded_columns = POOL.install(|| {
columns
.par_iter()
.map(|c| get_exploded(c.as_materialized_series(), options))
.map(|s| s.map(|(s, o)| (Column::from(s), o)))
.collect::<PolarsResult<Vec<_>>>()
})?;
fn process_column(
original_df: &DataFrame,
df: &mut DataFrame,
exploded: Column,
) -> PolarsResult<()> {
if df.shape() == (0, 0) {
unsafe { df.set_height(exploded.len()) };
}
if exploded.len() == df.height() {
let col_idx = original_df.try_get_column_index(exploded.name().as_str())?;
unsafe { df.columns_mut() }.insert(col_idx, exploded);
} else {
polars_bail!(
ShapeMismatch: "exploded column(s) {:?} doesn't have the same length: {} \
as the dataframe: {}", exploded.name(), exploded.name(), df.height(),
);
}
Ok(())
}
let check_offsets = || {
let first_offsets = exploded_columns[0].1.as_slice();
for (_, offsets) in &exploded_columns[1..] {
let offsets = offsets.as_slice();
let offset_l = first_offsets[0];
let offset_r = offsets[0];
let all_equal_len = first_offsets.len() != offsets.len() || {
first_offsets
.iter()
.zip(offsets.iter())
.all(|(l, r)| (*l - offset_l) == (*r - offset_r))
};
polars_ensure!(all_equal_len,
ShapeMismatch: "exploded columns must have matching element counts"
)
}
Ok(())
};
let process_first = || {
let validity = columns[0].rechunk_validity();
let (exploded, offsets) = &exploded_columns[0];
let row_idx = offsets_to_indexes(
offsets.as_slice(),
exploded.len(),
options,
validity.as_ref(),
);
let mut row_idx = IdxCa::from_vec(PlSmallStr::EMPTY, row_idx);
row_idx.set_sorted_flag(IsSorted::Ascending);
let mut df = unsafe { df.take_unchecked(&row_idx) };
process_column(self, &mut df, exploded.clone())?;
PolarsResult::Ok(df)
};
let (df, result) = POOL.join(process_first, check_offsets);
let mut df = df?;
result?;
for (exploded, _) in exploded_columns.into_iter().skip(1) {
process_column(self, &mut df, exploded)?
}
Ok(df)
}
pub fn explode<I, S>(&self, columns: I, options: ExplodeOptions) -> PolarsResult<DataFrame>
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
let columns = self.select_to_vec(columns)?;
self.explode_impl(columns, options)
}
}
#[cfg(test)]
mod test {
use crate::prelude::*;
#[test]
#[cfg(feature = "dtype-i8")]
#[cfg_attr(miri, ignore)]
fn test_explode() {
let s0 = Series::new(PlSmallStr::from_static("a"), &[1i8, 2, 3]);
let s1 = Series::new(PlSmallStr::from_static("b"), &[1i8, 1, 1]);
let s2 = Series::new(PlSmallStr::from_static("c"), &[2i8, 2, 2]);
let list = Column::new(PlSmallStr::from_static("foo"), &[s0, s1, s2]);
let s0 = Column::new(PlSmallStr::from_static("B"), [1, 2, 3]);
let s1 = Column::new(PlSmallStr::from_static("C"), [1, 1, 1]);
let df = DataFrame::new_infer_height(vec![list, s0, s1]).unwrap();
let exploded = df
.explode(
["foo"],
ExplodeOptions {
empty_as_null: true,
keep_nulls: true,
},
)
.unwrap();
assert_eq!(exploded.shape(), (9, 3));
assert_eq!(
exploded
.column("C")
.unwrap()
.as_materialized_series()
.i32()
.unwrap()
.get(8),
Some(1)
);
assert_eq!(
exploded
.column("B")
.unwrap()
.as_materialized_series()
.i32()
.unwrap()
.get(8),
Some(3)
);
assert_eq!(
exploded
.column("foo")
.unwrap()
.as_materialized_series()
.i8()
.unwrap()
.get(8),
Some(2)
);
}
#[test]
#[cfg_attr(miri, ignore)]
fn test_explode_df_empty_list() -> PolarsResult<()> {
let s0 = Series::new(PlSmallStr::from_static("a"), &[1, 2, 3]);
let s1 = Series::new(PlSmallStr::from_static("b"), &[1, 1, 1]);
let list = Column::new(
PlSmallStr::from_static("foo"),
&[s0, s1.clone(), s1.clear()],
);
let s0 = Column::new(PlSmallStr::from_static("B"), [1, 2, 3]);
let s1 = Column::new(PlSmallStr::from_static("C"), [1, 1, 1]);
let df = DataFrame::new_infer_height(vec![list, s0.clone(), s1.clone()])?;
let out = df.explode(
["foo"],
ExplodeOptions {
empty_as_null: true,
keep_nulls: true,
},
)?;
let expected = df![
"foo" => [Some(1), Some(2), Some(3), Some(1), Some(1), Some(1), None],
"B" => [1, 1, 1, 2, 2, 2, 3],
"C" => [1, 1, 1, 1, 1, 1, 1],
]?;
assert!(out.equals_missing(&expected));
let list = Column::new(
PlSmallStr::from_static("foo"),
[
s0.as_materialized_series().clone(),
s1.as_materialized_series().clear(),
s1.as_materialized_series().clone(),
],
);
let df = DataFrame::new_infer_height(vec![list, s0, s1])?;
let out = df.explode(
["foo"],
ExplodeOptions {
empty_as_null: true,
keep_nulls: true,
},
)?;
let expected = df![
"foo" => [Some(1), Some(2), Some(3), None, Some(1), Some(1), Some(1)],
"B" => [1, 1, 1, 2, 3, 3, 3],
"C" => [1, 1, 1, 1, 1, 1, 1],
]?;
assert!(out.equals_missing(&expected));
Ok(())
}
#[test]
#[cfg_attr(miri, ignore)]
fn test_explode_single_col() -> PolarsResult<()> {
let s0 = Series::new(PlSmallStr::from_static("a"), &[1i32, 2, 3]);
let s1 = Series::new(PlSmallStr::from_static("b"), &[1i32, 1, 1]);
let list = Column::new(PlSmallStr::from_static("foo"), &[s0, s1]);
let df = DataFrame::new_infer_height(vec![list])?;
let out = df.explode(
["foo"],
ExplodeOptions {
empty_as_null: true,
keep_nulls: true,
},
)?;
let out = out
.column("foo")?
.as_materialized_series()
.i32()?
.into_no_null_iter()
.collect::<Vec<_>>();
assert_eq!(out, &[1i32, 2, 3, 1, 1, 1]);
Ok(())
}
}