Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-core/src/frame/chunks.rs
6940 views
1
use arrow::record_batch::RecordBatch;
2
use rayon::prelude::*;
3
4
use crate::POOL;
5
use crate::prelude::*;
6
use crate::utils::{_split_offsets, accumulate_dataframes_vertical_unchecked, split_df_as_ref};
7
8
impl From<RecordBatch> for DataFrame {
9
fn from(rb: RecordBatch) -> DataFrame {
10
let height = rb.height();
11
let (schema, arrays) = rb.into_schema_and_arrays();
12
13
let columns: Vec<Column> = arrays
14
.into_iter()
15
.zip(schema.iter())
16
.map(|(arr, (name, field))| {
17
// SAFETY: Record Batch has the invariant that the schema datatype matches the
18
// columns.
19
unsafe {
20
Series::_try_from_arrow_unchecked_with_md(
21
name.clone(),
22
vec![arr],
23
field.dtype(),
24
field.metadata.as_deref(),
25
)
26
}
27
.unwrap()
28
.into_column()
29
})
30
.collect();
31
32
// SAFETY: RecordBatch has the same invariants for names and heights as DataFrame.
33
unsafe { DataFrame::new_no_checks(height, columns) }
34
}
35
}
36
37
impl DataFrame {
38
pub fn split_chunks(&mut self) -> impl Iterator<Item = DataFrame> + '_ {
39
self.align_chunks_par();
40
41
let first_series_col_idx = self
42
.columns
43
.iter()
44
.position(|col| col.as_series().is_some());
45
let df_height = self.height();
46
let mut prev_height = 0;
47
(0..self.first_col_n_chunks()).map(move |i| unsafe {
48
// There might still be scalar/partitioned columns after aligning,
49
// so we follow the size of the chunked column, if any.
50
let chunk_size = first_series_col_idx
51
.map(|c| self.get_columns()[c].as_series().unwrap().chunks()[i].len())
52
.unwrap_or(df_height);
53
let columns = self
54
.get_columns()
55
.iter()
56
.map(|col| match col {
57
Column::Series(s) => Column::from(s.select_chunk(i)),
58
Column::Partitioned(_) | Column::Scalar(_) => {
59
col.slice(prev_height as i64, chunk_size)
60
},
61
})
62
.collect::<Vec<_>>();
63
64
prev_height += chunk_size;
65
66
DataFrame::new_no_checks(chunk_size, columns)
67
})
68
}
69
70
pub fn split_chunks_by_n(self, n: usize, parallel: bool) -> Vec<DataFrame> {
71
let split = _split_offsets(self.height(), n);
72
73
let split_fn = |(offset, len)| self.slice(offset as i64, len);
74
75
if parallel {
76
// Parallel so that null_counts run in parallel
77
POOL.install(|| split.into_par_iter().map(split_fn).collect())
78
} else {
79
split.into_iter().map(split_fn).collect()
80
}
81
}
82
}
83
84
/// Split DataFrame into chunks in preparation for writing. The chunks have a
85
/// maximum number of rows per chunk to ensure reasonable memory efficiency when
86
/// reading the resulting file, and a minimum size per chunk to ensure
87
/// reasonable performance when writing.
88
pub fn chunk_df_for_writing(
89
df: &mut DataFrame,
90
row_group_size: usize,
91
) -> PolarsResult<std::borrow::Cow<'_, DataFrame>> {
92
// ensures all chunks are aligned.
93
df.align_chunks_par();
94
95
// Accumulate many small chunks to the row group size.
96
// See: #16403
97
if !df.get_columns().is_empty()
98
&& df.get_columns()[0]
99
.as_materialized_series()
100
.chunk_lengths()
101
.take(5)
102
.all(|len| len < row_group_size)
103
{
104
fn finish(scratch: &mut Vec<DataFrame>, new_chunks: &mut Vec<DataFrame>) {
105
let mut new = accumulate_dataframes_vertical_unchecked(scratch.drain(..));
106
new.as_single_chunk_par();
107
new_chunks.push(new);
108
}
109
110
let mut new_chunks = Vec::with_capacity(df.first_col_n_chunks()); // upper limit;
111
let mut scratch = vec![];
112
let mut remaining = row_group_size;
113
114
for df in df.split_chunks() {
115
remaining = remaining.saturating_sub(df.height());
116
scratch.push(df);
117
118
if remaining == 0 {
119
remaining = row_group_size;
120
finish(&mut scratch, &mut new_chunks);
121
}
122
}
123
if !scratch.is_empty() {
124
finish(&mut scratch, &mut new_chunks);
125
}
126
return Ok(std::borrow::Cow::Owned(
127
accumulate_dataframes_vertical_unchecked(new_chunks),
128
));
129
}
130
131
let n_splits = df.height() / row_group_size;
132
let result = if n_splits > 0 {
133
let mut splits = split_df_as_ref(df, n_splits, false);
134
135
for df in splits.iter_mut() {
136
// If the chunks are small enough, writing many small chunks
137
// leads to slow writing performance, so in that case we
138
// merge them.
139
let n_chunks = df.first_col_n_chunks();
140
if n_chunks > 1 && (df.estimated_size() / n_chunks < 128 * 1024) {
141
df.as_single_chunk_par();
142
}
143
}
144
145
std::borrow::Cow::Owned(accumulate_dataframes_vertical_unchecked(splits))
146
} else {
147
std::borrow::Cow::Borrowed(df)
148
};
149
Ok(result)
150
}
151
152