Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-core/src/frame/explode.rs
8327 views
1
use arrow::offset::OffsetsBuffer;
2
use polars_utils::pl_str::PlSmallStr;
3
use rayon::prelude::*;
4
#[cfg(feature = "serde")]
5
use serde::{Deserialize, Serialize};
6
7
use crate::POOL;
8
use crate::chunked_array::ops::explode::offsets_to_indexes;
9
use crate::prelude::*;
10
use crate::series::IsSorted;
11
12
fn get_exploded(
13
series: &Series,
14
options: ExplodeOptions,
15
) -> PolarsResult<(Series, OffsetsBuffer<i64>)> {
16
match series.dtype() {
17
DataType::List(_) => series.list().unwrap().explode_and_offsets(options),
18
#[cfg(feature = "dtype-array")]
19
DataType::Array(_, _) => series.array().unwrap().explode_and_offsets(options),
20
_ => polars_bail!(opq = explode, series.dtype()),
21
}
22
}
23
24
/// Arguments for `LazyFrame::unpivot` function
25
#[derive(Clone, Default, Debug, PartialEq, Eq, Hash)]
26
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
27
pub struct UnpivotArgsIR {
28
pub on: Vec<PlSmallStr>,
29
pub index: Vec<PlSmallStr>,
30
pub variable_name: PlSmallStr,
31
pub value_name: PlSmallStr,
32
}
33
34
impl UnpivotArgsIR {
35
pub fn new(
36
all_column_names: Vec<PlSmallStr>,
37
on: Option<Vec<PlSmallStr>>,
38
index: Vec<PlSmallStr>,
39
value_name: Option<PlSmallStr>,
40
variable_name: Option<PlSmallStr>,
41
) -> Self {
42
let on = on.unwrap_or_else(|| {
43
// If value vars is empty we take all columns that are not in id_vars.
44
let index_set = PlHashSet::from_iter(index.iter().cloned());
45
all_column_names
46
.into_iter()
47
.filter(|s| !index_set.contains(s))
48
.collect()
49
});
50
51
Self {
52
on,
53
index,
54
variable_name: variable_name.unwrap_or_else(|| PlSmallStr::from_static("variable")),
55
value_name: value_name.unwrap_or_else(|| PlSmallStr::from_static("value")),
56
}
57
}
58
}
59
60
impl DataFrame {
61
pub fn explode_impl(
62
&self,
63
mut columns: Vec<Column>,
64
options: ExplodeOptions,
65
) -> PolarsResult<DataFrame> {
66
polars_ensure!(!columns.is_empty(), InvalidOperation: "no columns provided in explode");
67
let mut df = self.clone();
68
if self.shape_has_zero() {
69
for s in &columns {
70
df.with_column(s.as_materialized_series().explode(options)?.into_column())?;
71
}
72
return Ok(df);
73
}
74
75
columns.sort_by_key(|c| self.try_get_column_index(c.name()).unwrap());
76
77
// first remove all the exploded columns
78
for s in &columns {
79
df = df.drop(s.name().as_str())?;
80
}
81
82
let exploded_columns = POOL.install(|| {
83
columns
84
.par_iter()
85
.map(|c| get_exploded(c.as_materialized_series(), options))
86
.map(|s| s.map(|(s, o)| (Column::from(s), o)))
87
.collect::<PolarsResult<Vec<_>>>()
88
})?;
89
90
fn process_column(
91
original_df: &DataFrame,
92
df: &mut DataFrame,
93
exploded: Column,
94
) -> PolarsResult<()> {
95
if df.shape() == (0, 0) {
96
unsafe { df.set_height(exploded.len()) };
97
}
98
99
if exploded.len() == df.height() {
100
let col_idx = original_df.try_get_column_index(exploded.name().as_str())?;
101
unsafe { df.columns_mut() }.insert(col_idx, exploded);
102
} else {
103
polars_bail!(
104
ShapeMismatch: "exploded column(s) {:?} doesn't have the same length: {} \
105
as the dataframe: {}", exploded.name(), exploded.name(), df.height(),
106
);
107
}
108
Ok(())
109
}
110
111
let check_offsets = || {
112
let first_offsets = exploded_columns[0].1.as_slice();
113
for (_, offsets) in &exploded_columns[1..] {
114
let offsets = offsets.as_slice();
115
116
let offset_l = first_offsets[0];
117
let offset_r = offsets[0];
118
let all_equal_len = first_offsets.len() != offsets.len() || {
119
first_offsets
120
.iter()
121
.zip(offsets.iter())
122
.all(|(l, r)| (*l - offset_l) == (*r - offset_r))
123
};
124
125
polars_ensure!(all_equal_len,
126
ShapeMismatch: "exploded columns must have matching element counts"
127
)
128
}
129
Ok(())
130
};
131
let process_first = || {
132
let validity = columns[0].rechunk_validity();
133
let (exploded, offsets) = &exploded_columns[0];
134
135
let row_idx = offsets_to_indexes(
136
offsets.as_slice(),
137
exploded.len(),
138
options,
139
validity.as_ref(),
140
);
141
let mut row_idx = IdxCa::from_vec(PlSmallStr::EMPTY, row_idx);
142
row_idx.set_sorted_flag(IsSorted::Ascending);
143
144
// SAFETY:
145
// We just created indices that are in bounds.
146
let mut df = unsafe { df.take_unchecked(&row_idx) };
147
process_column(self, &mut df, exploded.clone())?;
148
PolarsResult::Ok(df)
149
};
150
let (df, result) = POOL.join(process_first, check_offsets);
151
let mut df = df?;
152
result?;
153
154
for (exploded, _) in exploded_columns.into_iter().skip(1) {
155
process_column(self, &mut df, exploded)?
156
}
157
158
Ok(df)
159
}
160
/// Explode `DataFrame` to long format by exploding a column with Lists.
161
///
162
/// # Example
163
///
164
/// ```ignore
165
/// # use polars_core::prelude::*;
166
/// let s0 = Series::new("a".into(), &[1i64, 2, 3]);
167
/// let s1 = Series::new("b".into(), &[1i64, 1, 1]);
168
/// let s2 = Series::new("c".into(), &[2i64, 2, 2]);
169
/// let list = Series::new("foo", &[s0, s1, s2]);
170
///
171
/// let s0 = Series::new("B".into(), [1, 2, 3]);
172
/// let s1 = Series::new("C".into(), [1, 1, 1]);
173
/// let df = DataFrame::new_infer_height(vec![list, s0, s1])?;
174
/// let exploded = df.explode(["foo"])?;
175
///
176
/// println!("{:?}", df);
177
/// println!("{:?}", exploded);
178
/// # Ok::<(), PolarsError>(())
179
/// ```
180
/// Outputs:
181
///
182
/// ```text
183
/// +-------------+-----+-----+
184
/// | foo | B | C |
185
/// | --- | --- | --- |
186
/// | list [i64] | i32 | i32 |
187
/// +=============+=====+=====+
188
/// | "[1, 2, 3]" | 1 | 1 |
189
/// +-------------+-----+-----+
190
/// | "[1, 1, 1]" | 2 | 1 |
191
/// +-------------+-----+-----+
192
/// | "[2, 2, 2]" | 3 | 1 |
193
/// +-------------+-----+-----+
194
///
195
/// +-----+-----+-----+
196
/// | foo | B | C |
197
/// | --- | --- | --- |
198
/// | i64 | i32 | i32 |
199
/// +=====+=====+=====+
200
/// | 1 | 1 | 1 |
201
/// +-----+-----+-----+
202
/// | 2 | 1 | 1 |
203
/// +-----+-----+-----+
204
/// | 3 | 1 | 1 |
205
/// +-----+-----+-----+
206
/// | 1 | 2 | 1 |
207
/// +-----+-----+-----+
208
/// | 1 | 2 | 1 |
209
/// +-----+-----+-----+
210
/// | 1 | 2 | 1 |
211
/// +-----+-----+-----+
212
/// | 2 | 3 | 1 |
213
/// +-----+-----+-----+
214
/// | 2 | 3 | 1 |
215
/// +-----+-----+-----+
216
/// | 2 | 3 | 1 |
217
/// +-----+-----+-----+
218
/// ```
219
pub fn explode<I, S>(&self, columns: I, options: ExplodeOptions) -> PolarsResult<DataFrame>
220
where
221
I: IntoIterator<Item = S>,
222
S: AsRef<str>,
223
{
224
// We need to sort the column by order of original occurrence. Otherwise the insert by index
225
// below will panic
226
let columns = self.select_to_vec(columns)?;
227
self.explode_impl(columns, options)
228
}
229
}
230
231
#[cfg(test)]
232
mod test {
233
use crate::prelude::*;
234
235
#[test]
236
#[cfg(feature = "dtype-i8")]
237
#[cfg_attr(miri, ignore)]
238
fn test_explode() {
239
let s0 = Series::new(PlSmallStr::from_static("a"), &[1i8, 2, 3]);
240
let s1 = Series::new(PlSmallStr::from_static("b"), &[1i8, 1, 1]);
241
let s2 = Series::new(PlSmallStr::from_static("c"), &[2i8, 2, 2]);
242
let list = Column::new(PlSmallStr::from_static("foo"), &[s0, s1, s2]);
243
244
let s0 = Column::new(PlSmallStr::from_static("B"), [1, 2, 3]);
245
let s1 = Column::new(PlSmallStr::from_static("C"), [1, 1, 1]);
246
let df = DataFrame::new_infer_height(vec![list, s0, s1]).unwrap();
247
let exploded = df
248
.explode(
249
["foo"],
250
ExplodeOptions {
251
empty_as_null: true,
252
keep_nulls: true,
253
},
254
)
255
.unwrap();
256
assert_eq!(exploded.shape(), (9, 3));
257
assert_eq!(
258
exploded
259
.column("C")
260
.unwrap()
261
.as_materialized_series()
262
.i32()
263
.unwrap()
264
.get(8),
265
Some(1)
266
);
267
assert_eq!(
268
exploded
269
.column("B")
270
.unwrap()
271
.as_materialized_series()
272
.i32()
273
.unwrap()
274
.get(8),
275
Some(3)
276
);
277
assert_eq!(
278
exploded
279
.column("foo")
280
.unwrap()
281
.as_materialized_series()
282
.i8()
283
.unwrap()
284
.get(8),
285
Some(2)
286
);
287
}
288
289
#[test]
290
#[cfg_attr(miri, ignore)]
291
fn test_explode_df_empty_list() -> PolarsResult<()> {
292
let s0 = Series::new(PlSmallStr::from_static("a"), &[1, 2, 3]);
293
let s1 = Series::new(PlSmallStr::from_static("b"), &[1, 1, 1]);
294
let list = Column::new(
295
PlSmallStr::from_static("foo"),
296
&[s0, s1.clone(), s1.clear()],
297
);
298
let s0 = Column::new(PlSmallStr::from_static("B"), [1, 2, 3]);
299
let s1 = Column::new(PlSmallStr::from_static("C"), [1, 1, 1]);
300
let df = DataFrame::new_infer_height(vec![list, s0.clone(), s1.clone()])?;
301
302
let out = df.explode(
303
["foo"],
304
ExplodeOptions {
305
empty_as_null: true,
306
keep_nulls: true,
307
},
308
)?;
309
let expected = df![
310
"foo" => [Some(1), Some(2), Some(3), Some(1), Some(1), Some(1), None],
311
"B" => [1, 1, 1, 2, 2, 2, 3],
312
"C" => [1, 1, 1, 1, 1, 1, 1],
313
]?;
314
315
assert!(out.equals_missing(&expected));
316
317
let list = Column::new(
318
PlSmallStr::from_static("foo"),
319
[
320
s0.as_materialized_series().clone(),
321
s1.as_materialized_series().clear(),
322
s1.as_materialized_series().clone(),
323
],
324
);
325
let df = DataFrame::new_infer_height(vec![list, s0, s1])?;
326
let out = df.explode(
327
["foo"],
328
ExplodeOptions {
329
empty_as_null: true,
330
keep_nulls: true,
331
},
332
)?;
333
let expected = df![
334
"foo" => [Some(1), Some(2), Some(3), None, Some(1), Some(1), Some(1)],
335
"B" => [1, 1, 1, 2, 3, 3, 3],
336
"C" => [1, 1, 1, 1, 1, 1, 1],
337
]?;
338
339
assert!(out.equals_missing(&expected));
340
Ok(())
341
}
342
343
#[test]
344
#[cfg_attr(miri, ignore)]
345
fn test_explode_single_col() -> PolarsResult<()> {
346
let s0 = Series::new(PlSmallStr::from_static("a"), &[1i32, 2, 3]);
347
let s1 = Series::new(PlSmallStr::from_static("b"), &[1i32, 1, 1]);
348
let list = Column::new(PlSmallStr::from_static("foo"), &[s0, s1]);
349
let df = DataFrame::new_infer_height(vec![list])?;
350
351
let out = df.explode(
352
["foo"],
353
ExplodeOptions {
354
empty_as_null: true,
355
keep_nulls: true,
356
},
357
)?;
358
let out = out
359
.column("foo")?
360
.as_materialized_series()
361
.i32()?
362
.into_no_null_iter()
363
.collect::<Vec<_>>();
364
assert_eq!(out, &[1i32, 2, 3, 1, 1, 1]);
365
366
Ok(())
367
}
368
}
369
370