Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/utils/in_memory_linearize.rs
6939 views
1
use std::cmp::Reverse;
2
use std::collections::BinaryHeap;
3
4
use polars_core::POOL;
5
use polars_core::frame::DataFrame;
6
use polars_utils::priority::Priority;
7
use polars_utils::sync::SyncPtr;
8
9
use crate::morsel::MorselSeq;
10
11
/// Amount of morsels we need to consider spawning a thread during linearization.
12
const MORSELS_PER_THREAD: usize = 256;
13
14
/// Given a Vec<Morsel> for each pipe, it will output a vec of the contained dataframes.
15
/// If the morsels are ordered by their sequence ids within each vec, and no
16
/// sequence ID occurs in multiple vecs, the output will follow the same order globally.
17
pub fn linearize(mut morsels_per_pipe: Vec<Vec<(MorselSeq, DataFrame)>>) -> Vec<DataFrame> {
18
let num_morsels: usize = morsels_per_pipe.iter().map(|p| p.len()).sum();
19
if num_morsels == 0 {
20
return vec![];
21
}
22
23
let n_threads = num_morsels
24
.div_ceil(MORSELS_PER_THREAD)
25
.min(POOL.current_num_threads()) as u64;
26
27
// Partitioning based on sequence number.
28
let max_seq = morsels_per_pipe
29
.iter()
30
.flat_map(|p| p.iter().map(|m| m.0.to_u64()))
31
.max()
32
.unwrap();
33
let seqs_per_thread = (max_seq + 1).div_ceil(n_threads);
34
35
let morsels_per_p = &morsels_per_pipe;
36
let mut dataframes: Vec<DataFrame> = Vec::with_capacity(num_morsels);
37
let dataframes_ptr = unsafe { SyncPtr::new(dataframes.as_mut_ptr()) };
38
POOL.scope(|s| {
39
let mut out_offset = 0;
40
let mut stop_idx_per_pipe = vec![0; morsels_per_p.len()];
41
for t in 0..n_threads {
42
// This thread will handle all morsels with sequence id
43
// [t * seqs_per_thread, (t + 1) * seqs_per_threads).
44
// Compute per pipe the slice that covers this range, re-using
45
// the stop indices from the previous thread as our starting indices.
46
let this_thread_out_offset = out_offset;
47
let partition_max_seq = (t + 1) * seqs_per_thread;
48
let cur_idx_per_pipe = stop_idx_per_pipe;
49
stop_idx_per_pipe = Vec::with_capacity(morsels_per_p.len());
50
for p in 0..morsels_per_p.len() {
51
let stop_idx =
52
morsels_per_p[p].partition_point(|m| m.0.to_u64() < partition_max_seq);
53
assert!(stop_idx >= cur_idx_per_pipe[p]);
54
out_offset += stop_idx - cur_idx_per_pipe[p];
55
stop_idx_per_pipe.push(stop_idx);
56
}
57
58
{
59
let stop_idx_per_pipe = stop_idx_per_pipe.clone();
60
s.spawn(move |_| unsafe {
61
fill_partition(
62
morsels_per_p,
63
cur_idx_per_pipe,
64
&stop_idx_per_pipe,
65
dataframes_ptr.get().add(this_thread_out_offset),
66
)
67
});
68
}
69
}
70
});
71
72
// SAFETY: all partitions were handled, so dataframes is full filled and
73
// morsels_per_pipe fully consumed.
74
unsafe {
75
for morsels in morsels_per_pipe.iter_mut() {
76
morsels.set_len(0);
77
}
78
dataframes.set_len(num_morsels);
79
}
80
dataframes
81
}
82
83
unsafe fn fill_partition(
84
morsels_per_pipe: &[Vec<(MorselSeq, DataFrame)>],
85
mut cur_idx_per_pipe: Vec<usize>,
86
stop_idx_per_pipe: &[usize],
87
mut out_ptr: *mut DataFrame,
88
) {
89
// K-way merge, initialize priority queue with one element per pipe.
90
let mut kmerge = BinaryHeap::with_capacity(morsels_per_pipe.len());
91
for (p, morsels) in morsels_per_pipe.iter().enumerate() {
92
if cur_idx_per_pipe[p] != stop_idx_per_pipe[p] {
93
let seq = morsels[cur_idx_per_pipe[p]].0;
94
kmerge.push(Priority(Reverse(seq), p));
95
}
96
}
97
98
// While the merge queue isn't empty, keep copying elements into the output.
99
unsafe {
100
while let Some(Priority(Reverse(mut seq), p)) = kmerge.pop() {
101
// Write the next morsel from this pipe to the output.
102
let morsels = &morsels_per_pipe[p];
103
let cur_idx = &mut cur_idx_per_pipe[p];
104
core::ptr::copy_nonoverlapping(&morsels[*cur_idx].1, out_ptr, 1);
105
out_ptr = out_ptr.add(1);
106
*cur_idx += 1;
107
108
// Handle next element from this pipe.
109
while *cur_idx != stop_idx_per_pipe[p] {
110
let new_seq = morsels[*cur_idx].0;
111
if new_seq <= seq.successor() {
112
// New sequence number is the same, or a direct successor, can output immediately.
113
core::ptr::copy_nonoverlapping(&morsels[*cur_idx].1, out_ptr, 1);
114
out_ptr = out_ptr.add(1);
115
*cur_idx += 1;
116
seq = new_seq;
117
} else {
118
kmerge.push(Priority(Reverse(new_seq), p));
119
break;
120
}
121
}
122
}
123
}
124
}
125
126