Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-core/src/utils/flatten.rs
8416 views
1
use arrow::bitmap::MutableBitmap;
2
use polars_utils::sync::SyncPtr;
3
4
use super::*;
5
6
pub fn flatten_df_iter(df: &DataFrame) -> impl Iterator<Item = DataFrame> + '_ {
7
df.iter_chunks_physical()
8
.filter(|chunk| !chunk.is_empty())
9
.map(|chunk| {
10
let height = chunk.len();
11
let columns = df
12
.columns()
13
.iter()
14
.zip(chunk.into_arrays())
15
.map(|(s, arr)| {
16
// SAFETY:
17
// datatypes are correct
18
let mut out = unsafe {
19
Series::from_chunks_and_dtype_unchecked(
20
s.name().clone(),
21
vec![arr],
22
s.dtype(),
23
)
24
};
25
out.set_sorted_flag(s.is_sorted_flag());
26
Column::from(out)
27
})
28
.collect::<Vec<_>>();
29
30
unsafe { DataFrame::new_unchecked(height, columns) }
31
})
32
}
33
34
pub fn flatten_series(s: &Series) -> Vec<Series> {
35
let name = s.name();
36
let dtype = s.dtype();
37
unsafe {
38
s.chunks()
39
.iter()
40
.map(|arr| {
41
Series::from_chunks_and_dtype_unchecked(name.clone(), vec![arr.clone()], dtype)
42
})
43
.collect()
44
}
45
}
46
47
pub fn cap_and_offsets<I>(v: &[Vec<I>]) -> (usize, Vec<usize>) {
48
let cap = v.iter().map(|v| v.len()).sum::<usize>();
49
let offsets = v
50
.iter()
51
.scan(0_usize, |acc, v| {
52
let out = *acc;
53
*acc += v.len();
54
Some(out)
55
})
56
.collect::<Vec<_>>();
57
(cap, offsets)
58
}
59
60
pub fn flatten_par<T: Send + Sync + Copy, S: AsRef<[T]>>(bufs: &[S]) -> Vec<T> {
61
let mut len = 0;
62
let mut offsets = Vec::with_capacity(bufs.len());
63
let bufs = bufs
64
.iter()
65
.map(|s| {
66
offsets.push(len);
67
let slice = s.as_ref();
68
len += slice.len();
69
slice
70
})
71
.collect::<Vec<_>>();
72
flatten_par_impl(&bufs, len, offsets)
73
}
74
75
fn flatten_par_impl<T: Send + Sync + Copy>(
76
bufs: &[&[T]],
77
len: usize,
78
offsets: Vec<usize>,
79
) -> Vec<T> {
80
let mut out = Vec::with_capacity(len);
81
let out_ptr = unsafe { SyncPtr::new(out.as_mut_ptr()) };
82
83
POOL.install(|| {
84
offsets.into_par_iter().enumerate().for_each(|(i, offset)| {
85
let buf = bufs[i];
86
let ptr: *mut T = out_ptr.get();
87
unsafe {
88
let dst = ptr.add(offset);
89
let src = buf.as_ptr();
90
std::ptr::copy_nonoverlapping(src, dst, buf.len())
91
}
92
})
93
});
94
unsafe {
95
out.set_len(len);
96
}
97
out
98
}
99
100
pub fn flatten_nullable<S: AsRef<[NullableIdxSize]> + Send + Sync>(
101
bufs: &[S],
102
) -> PrimitiveArray<IdxSize> {
103
let a = || flatten_par(bufs);
104
let b = || {
105
let cap = bufs.iter().map(|s| s.as_ref().len()).sum::<usize>();
106
let mut validity = MutableBitmap::with_capacity(cap);
107
validity.extend_constant(cap, true);
108
109
let mut count = 0usize;
110
for s in bufs {
111
let s = s.as_ref();
112
113
for id in s {
114
if id.is_null_idx() {
115
unsafe { validity.set_unchecked(count, false) };
116
}
117
118
count += 1;
119
}
120
}
121
validity.freeze()
122
};
123
124
let (a, b) = POOL.join(a, b);
125
PrimitiveArray::from_vec(bytemuck::cast_vec::<_, IdxSize>(a)).with_validity(Some(b))
126
}
127
128