Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-core/src/utils/flatten.rs
6940 views
1
use arrow::bitmap::MutableBitmap;
2
use polars_utils::sync::SyncPtr;
3
4
use super::*;
5
6
pub fn flatten_df_iter(df: &DataFrame) -> impl Iterator<Item = DataFrame> + '_ {
7
df.iter_chunks_physical().flat_map(|chunk| {
8
let columns = df
9
.iter()
10
.zip(chunk.into_arrays())
11
.map(|(s, arr)| {
12
// SAFETY:
13
// datatypes are correct
14
let mut out = unsafe {
15
Series::from_chunks_and_dtype_unchecked(s.name().clone(), vec![arr], s.dtype())
16
};
17
out.set_sorted_flag(s.is_sorted_flag());
18
Column::from(out)
19
})
20
.collect::<Vec<_>>();
21
22
let height = DataFrame::infer_height(&columns);
23
let df = unsafe { DataFrame::new_no_checks(height, columns) };
24
if df.is_empty() { None } else { Some(df) }
25
})
26
}
27
28
pub fn flatten_series(s: &Series) -> Vec<Series> {
29
let name = s.name();
30
let dtype = s.dtype();
31
unsafe {
32
s.chunks()
33
.iter()
34
.map(|arr| {
35
Series::from_chunks_and_dtype_unchecked(name.clone(), vec![arr.clone()], dtype)
36
})
37
.collect()
38
}
39
}
40
41
pub fn cap_and_offsets<I>(v: &[Vec<I>]) -> (usize, Vec<usize>) {
42
let cap = v.iter().map(|v| v.len()).sum::<usize>();
43
let offsets = v
44
.iter()
45
.scan(0_usize, |acc, v| {
46
let out = *acc;
47
*acc += v.len();
48
Some(out)
49
})
50
.collect::<Vec<_>>();
51
(cap, offsets)
52
}
53
54
pub fn flatten_par<T: Send + Sync + Copy, S: AsRef<[T]>>(bufs: &[S]) -> Vec<T> {
55
let mut len = 0;
56
let mut offsets = Vec::with_capacity(bufs.len());
57
let bufs = bufs
58
.iter()
59
.map(|s| {
60
offsets.push(len);
61
let slice = s.as_ref();
62
len += slice.len();
63
slice
64
})
65
.collect::<Vec<_>>();
66
flatten_par_impl(&bufs, len, offsets)
67
}
68
69
fn flatten_par_impl<T: Send + Sync + Copy>(
70
bufs: &[&[T]],
71
len: usize,
72
offsets: Vec<usize>,
73
) -> Vec<T> {
74
let mut out = Vec::with_capacity(len);
75
let out_ptr = unsafe { SyncPtr::new(out.as_mut_ptr()) };
76
77
POOL.install(|| {
78
offsets.into_par_iter().enumerate().for_each(|(i, offset)| {
79
let buf = bufs[i];
80
let ptr: *mut T = out_ptr.get();
81
unsafe {
82
let dst = ptr.add(offset);
83
let src = buf.as_ptr();
84
std::ptr::copy_nonoverlapping(src, dst, buf.len())
85
}
86
})
87
});
88
unsafe {
89
out.set_len(len);
90
}
91
out
92
}
93
94
pub fn flatten_nullable<S: AsRef<[NullableIdxSize]> + Send + Sync>(
95
bufs: &[S],
96
) -> PrimitiveArray<IdxSize> {
97
let a = || flatten_par(bufs);
98
let b = || {
99
let cap = bufs.iter().map(|s| s.as_ref().len()).sum::<usize>();
100
let mut validity = MutableBitmap::with_capacity(cap);
101
validity.extend_constant(cap, true);
102
103
let mut count = 0usize;
104
for s in bufs {
105
let s = s.as_ref();
106
107
for id in s {
108
if id.is_null_idx() {
109
unsafe { validity.set_unchecked(count, false) };
110
}
111
112
count += 1;
113
}
114
}
115
validity.freeze()
116
};
117
118
let (a, b) = POOL.join(a, b);
119
PrimitiveArray::from_vec(bytemuck::cast_vec::<_, IdxSize>(a)).with_validity(Some(b))
120
}
121
122