Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-core/src/frame/chunks.rs
8424 views
1
use arrow::record_batch::RecordBatch;
2
use rayon::prelude::*;
3
4
use crate::POOL;
5
use crate::prelude::*;
6
use crate::utils::{_split_offsets, accumulate_dataframes_vertical_unchecked, split_df_as_ref};
7
8
impl From<RecordBatch> for DataFrame {
9
fn from(rb: RecordBatch) -> DataFrame {
10
let height = rb.height();
11
let (schema, arrays) = rb.into_schema_and_arrays();
12
13
let columns: Vec<Column> = arrays
14
.into_iter()
15
.zip(schema.iter())
16
.map(|(arr, (name, field))| {
17
// SAFETY: Record Batch has the invariant that the schema datatype matches the
18
// columns.
19
unsafe {
20
Series::_try_from_arrow_unchecked_with_md(
21
name.clone(),
22
vec![arr],
23
field.dtype(),
24
field.metadata.as_deref(),
25
)
26
}
27
.unwrap()
28
.into_column()
29
})
30
.collect();
31
32
// SAFETY: RecordBatch has the same invariants for names and heights as DataFrame.
33
unsafe { DataFrame::new_unchecked(height, columns) }
34
}
35
}
36
37
impl DataFrame {
38
pub fn split_chunks(&mut self) -> impl Iterator<Item = DataFrame> + '_ {
39
self.align_chunks_par();
40
41
let first_series_col_idx = self
42
.columns()
43
.iter()
44
.position(|col| col.as_series().is_some());
45
let df_height = self.height();
46
let mut prev_height = 0;
47
(0..self.first_col_n_chunks()).map(move |i| unsafe {
48
// There might still be scalar/partitioned columns after aligning,
49
// so we follow the size of the chunked column, if any.
50
let chunk_size = first_series_col_idx
51
.map(|c| self.columns()[c].as_series().unwrap().chunks()[i].len())
52
.unwrap_or(df_height);
53
let columns = self
54
.columns()
55
.iter()
56
.map(|col| match col {
57
Column::Series(s) => Column::from(s.select_chunk(i)),
58
Column::Scalar(_) => col.slice(prev_height as i64, chunk_size),
59
})
60
.collect::<Vec<_>>();
61
62
prev_height += chunk_size;
63
64
DataFrame::new_unchecked(chunk_size, columns)
65
})
66
}
67
68
pub fn split_chunks_by_n(self, n: usize, parallel: bool) -> Vec<DataFrame> {
69
let split = _split_offsets(self.height(), n);
70
71
let split_fn = |(offset, len)| self.slice(offset as i64, len);
72
73
if parallel {
74
// Parallel so that null_counts run in parallel
75
POOL.install(|| split.into_par_iter().map(split_fn).collect())
76
} else {
77
split.into_iter().map(split_fn).collect()
78
}
79
}
80
81
/// Convert the columns of this [DataFrame] to arrow arrays.
82
pub fn rechunk_to_arrow(&self, compat_level: CompatLevel) -> Vec<ArrayRef> {
83
self.columns()
84
.iter()
85
.map(|c| c.clone().rechunk_to_arrow(compat_level))
86
.collect()
87
}
88
89
/// Convert the columns of this [DataFrame] to arrow arrays.
90
pub fn rechunk_into_arrow(self, compat_level: CompatLevel) -> Vec<ArrayRef> {
91
self.into_columns()
92
.into_iter()
93
.map(|c| c.rechunk_to_arrow(compat_level))
94
.collect()
95
}
96
}
97
98
/// Split DataFrame into chunks in preparation for writing. The chunks have a
99
/// maximum number of rows per chunk to ensure reasonable memory efficiency when
100
/// reading the resulting file, and a minimum size per chunk to ensure
101
/// reasonable performance when writing.
102
pub fn chunk_df_for_writing(
103
df: &mut DataFrame,
104
row_group_size: usize,
105
) -> PolarsResult<std::borrow::Cow<'_, DataFrame>> {
106
// ensures all chunks are aligned.
107
df.align_chunks_par();
108
109
// Accumulate many small chunks to the row group size.
110
// See: #16403
111
if !df.columns().is_empty()
112
&& df.columns()[0]
113
.as_materialized_series()
114
.chunk_lengths()
115
.take(5)
116
.all(|len| len < row_group_size)
117
{
118
fn finish(scratch: &mut Vec<DataFrame>, new_chunks: &mut Vec<DataFrame>) {
119
let mut new = accumulate_dataframes_vertical_unchecked(scratch.drain(..));
120
new.rechunk_mut_par();
121
new_chunks.push(new);
122
}
123
124
let mut new_chunks = Vec::with_capacity(df.first_col_n_chunks()); // upper limit;
125
let mut scratch = vec![];
126
let mut remaining = row_group_size;
127
128
for df in df.split_chunks() {
129
remaining = remaining.saturating_sub(df.height());
130
scratch.push(df);
131
132
if remaining == 0 {
133
remaining = row_group_size;
134
finish(&mut scratch, &mut new_chunks);
135
}
136
}
137
if !scratch.is_empty() {
138
finish(&mut scratch, &mut new_chunks);
139
}
140
return Ok(std::borrow::Cow::Owned(
141
accumulate_dataframes_vertical_unchecked(new_chunks),
142
));
143
}
144
145
let n_splits = df.height() / row_group_size;
146
let result = if n_splits > 0 {
147
let mut splits = split_df_as_ref(df, n_splits, false);
148
149
for df in splits.iter_mut() {
150
// If the chunks are small enough, writing many small chunks
151
// leads to slow writing performance, so in that case we
152
// merge them.
153
let n_chunks = df.first_col_n_chunks();
154
if n_chunks > 1 && (df.estimated_size() / n_chunks < 128 * 1024) {
155
df.rechunk_mut_par();
156
}
157
}
158
159
std::borrow::Cow::Owned(accumulate_dataframes_vertical_unchecked(splits))
160
} else {
161
std::borrow::Cow::Borrowed(df)
162
};
163
Ok(result)
164
}
165
166