Path: blob/main/crates/polars-stream/src/utils/in_memory_linearize.rs
6939 views
use std::cmp::Reverse;1use std::collections::BinaryHeap;23use polars_core::POOL;4use polars_core::frame::DataFrame;5use polars_utils::priority::Priority;6use polars_utils::sync::SyncPtr;78use crate::morsel::MorselSeq;910/// Amount of morsels we need to consider spawning a thread during linearization.11const MORSELS_PER_THREAD: usize = 256;1213/// Given a Vec<Morsel> for each pipe, it will output a vec of the contained dataframes.14/// If the morsels are ordered by their sequence ids within each vec, and no15/// sequence ID occurs in multiple vecs, the output will follow the same order globally.16pub fn linearize(mut morsels_per_pipe: Vec<Vec<(MorselSeq, DataFrame)>>) -> Vec<DataFrame> {17let num_morsels: usize = morsels_per_pipe.iter().map(|p| p.len()).sum();18if num_morsels == 0 {19return vec![];20}2122let n_threads = num_morsels23.div_ceil(MORSELS_PER_THREAD)24.min(POOL.current_num_threads()) as u64;2526// Partitioning based on sequence number.27let max_seq = morsels_per_pipe28.iter()29.flat_map(|p| p.iter().map(|m| m.0.to_u64()))30.max()31.unwrap();32let seqs_per_thread = (max_seq + 1).div_ceil(n_threads);3334let morsels_per_p = &morsels_per_pipe;35let mut dataframes: Vec<DataFrame> = Vec::with_capacity(num_morsels);36let dataframes_ptr = unsafe { SyncPtr::new(dataframes.as_mut_ptr()) };37POOL.scope(|s| {38let mut out_offset = 0;39let mut stop_idx_per_pipe = vec![0; morsels_per_p.len()];40for t in 0..n_threads {41// This thread will handle all morsels with sequence id42// [t * seqs_per_thread, (t + 1) * seqs_per_threads).43// Compute per pipe the slice that covers this range, re-using44// the stop indices from the previous thread as our starting indices.45let this_thread_out_offset = out_offset;46let partition_max_seq = (t + 1) * seqs_per_thread;47let cur_idx_per_pipe = stop_idx_per_pipe;48stop_idx_per_pipe = Vec::with_capacity(morsels_per_p.len());49for p in 0..morsels_per_p.len() {50let stop_idx =51morsels_per_p[p].partition_point(|m| m.0.to_u64() < partition_max_seq);52assert!(stop_idx >= cur_idx_per_pipe[p]);53out_offset += stop_idx - cur_idx_per_pipe[p];54stop_idx_per_pipe.push(stop_idx);55}5657{58let stop_idx_per_pipe = stop_idx_per_pipe.clone();59s.spawn(move |_| unsafe {60fill_partition(61morsels_per_p,62cur_idx_per_pipe,63&stop_idx_per_pipe,64dataframes_ptr.get().add(this_thread_out_offset),65)66});67}68}69});7071// SAFETY: all partitions were handled, so dataframes is full filled and72// morsels_per_pipe fully consumed.73unsafe {74for morsels in morsels_per_pipe.iter_mut() {75morsels.set_len(0);76}77dataframes.set_len(num_morsels);78}79dataframes80}8182unsafe fn fill_partition(83morsels_per_pipe: &[Vec<(MorselSeq, DataFrame)>],84mut cur_idx_per_pipe: Vec<usize>,85stop_idx_per_pipe: &[usize],86mut out_ptr: *mut DataFrame,87) {88// K-way merge, initialize priority queue with one element per pipe.89let mut kmerge = BinaryHeap::with_capacity(morsels_per_pipe.len());90for (p, morsels) in morsels_per_pipe.iter().enumerate() {91if cur_idx_per_pipe[p] != stop_idx_per_pipe[p] {92let seq = morsels[cur_idx_per_pipe[p]].0;93kmerge.push(Priority(Reverse(seq), p));94}95}9697// While the merge queue isn't empty, keep copying elements into the output.98unsafe {99while let Some(Priority(Reverse(mut seq), p)) = kmerge.pop() {100// Write the next morsel from this pipe to the output.101let morsels = &morsels_per_pipe[p];102let cur_idx = &mut cur_idx_per_pipe[p];103core::ptr::copy_nonoverlapping(&morsels[*cur_idx].1, out_ptr, 1);104out_ptr = out_ptr.add(1);105*cur_idx += 1;106107// Handle next element from this pipe.108while *cur_idx != stop_idx_per_pipe[p] {109let new_seq = morsels[*cur_idx].0;110if new_seq <= seq.successor() {111// New sequence number is the same, or a direct successor, can output immediately.112core::ptr::copy_nonoverlapping(&morsels[*cur_idx].1, out_ptr, 1);113out_ptr = out_ptr.add(1);114*cur_idx += 1;115seq = new_seq;116} else {117kmerge.push(Priority(Reverse(new_seq), p));118break;119}120}121}122}123}124125126