Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sinks/components/partitioner.rs
8480 views
1
use std::borrow::Cow;
2
3
use polars_core::frame::DataFrame;
4
use polars_core::prelude::row_encode::_get_rows_encoded_ca_unordered;
5
use polars_core::prelude::{BinaryOffsetChunked, Column, IntoGroupsType};
6
use polars_error::PolarsResult;
7
use polars_expr::hash_keys::{HashKeysVariant, hash_keys_variant_for_dtype};
8
use polars_expr::state::ExecutionState;
9
use polars_utils::pl_str::PlSmallStr;
10
11
use crate::async_primitives::wait_group::WaitToken;
12
use crate::expression::StreamExpr;
13
use crate::morsel::Morsel;
14
use crate::nodes::io_sinks::components::exclude_keys_projection::ExcludeKeysProjection;
15
use crate::nodes::io_sinks::components::partition_key::{PartitionKey, PreComputedKeys};
16
use crate::nodes::io_sinks::components::size::RowCountAndSize;
17
18
pub struct PartitionedDataFrames {
19
pub partitions_vec: Vec<Partition>,
20
pub input_size: RowCountAndSize,
21
pub input_wait_token: Option<WaitToken>,
22
}
23
24
pub struct Partition {
25
pub key: PartitionKey,
26
/// 1-row df with keys.
27
pub keys_df: DataFrame,
28
/// Does not include columns in `keys_df`
29
pub df: DataFrame,
30
}
31
32
pub enum Partitioner {
33
/// All rows to a single partition
34
FileSize,
35
Keyed(KeyedPartitioner),
36
}
37
38
impl Partitioner {
39
pub async fn partition_morsel(
40
&self,
41
morsel: Morsel,
42
in_memory_exec_state: &ExecutionState,
43
) -> PolarsResult<PartitionedDataFrames> {
44
let (df, _, _, input_wait_token) = morsel.into_inner();
45
let input_size = RowCountAndSize::new_from_df(&df);
46
let partitions_vec = match self {
47
Self::FileSize => vec![Partition {
48
key: PartitionKey::NULL,
49
keys_df: DataFrame::empty_with_height(1),
50
df,
51
}],
52
Self::Keyed(v) => v.partition_df(df, in_memory_exec_state).await?,
53
};
54
55
let out = PartitionedDataFrames {
56
partitions_vec,
57
input_size,
58
input_wait_token,
59
};
60
61
Ok(out)
62
}
63
64
pub fn verbose_display(&self) -> impl std::fmt::Display + '_ {
65
struct DisplayWrap<'a>(&'a Partitioner);
66
67
impl std::fmt::Display for DisplayWrap<'_> {
68
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69
match self.0 {
70
Partitioner::FileSize => f.write_str("FileSize"),
71
Partitioner::Keyed(kp) => write!(
72
f,
73
"Keyed({} key{})",
74
kp.key_exprs.len(),
75
if kp.key_exprs.len() == 1 { "" } else { "s" }
76
),
77
}
78
}
79
}
80
81
DisplayWrap(self)
82
}
83
}
84
85
pub struct KeyedPartitioner {
86
/// Must be non-empty
87
pub key_exprs: Vec<StreamExpr>,
88
/// Exclude key columns from full gather. Can be `None` if all key exprs output
89
/// names do not overlap with existing names.
90
pub exclude_keys_projection: Option<ExcludeKeysProjection>,
91
}
92
93
impl KeyedPartitioner {
94
async fn partition_df(
95
&self,
96
df: DataFrame,
97
in_memory_exec_state: &ExecutionState,
98
) -> PolarsResult<Vec<Partition>> {
99
assert!(!self.key_exprs.is_empty());
100
101
let mut key_columns = Vec::with_capacity(self.key_exprs.len());
102
103
for e in self.key_exprs.as_slice() {
104
key_columns.push(
105
e.evaluate_preserve_len_broadcast(&df, in_memory_exec_state)
106
.await?,
107
);
108
}
109
110
let mut pre_computed_keys: Option<PreComputedKeys> = None;
111
let single_non_encode = match key_columns.as_slice() {
112
[c] => {
113
pre_computed_keys = PreComputedKeys::opt_new_non_encoded(c);
114
hash_keys_variant_for_dtype(c.dtype()) != HashKeysVariant::RowEncoded
115
},
116
_ => false,
117
};
118
119
let row_encode = |columns: &[Column]| -> BinaryOffsetChunked {
120
_get_rows_encoded_ca_unordered(PlSmallStr::EMPTY, columns)
121
.unwrap()
122
.rechunk()
123
.into_owned()
124
};
125
126
let mut keys_encoded_ca: Option<BinaryOffsetChunked> =
127
(!single_non_encode).then(|| row_encode(&key_columns));
128
129
let groups = if single_non_encode {
130
key_columns[0]
131
.as_materialized_series()
132
.group_tuples(false, false)
133
} else {
134
keys_encoded_ca.as_ref().unwrap().group_tuples(false, false)
135
}
136
.unwrap();
137
138
if pre_computed_keys.is_none() {
139
if keys_encoded_ca.is_none() && groups.len() > (df.height() / 4) {
140
keys_encoded_ca = Some(row_encode(&key_columns));
141
}
142
143
pre_computed_keys = keys_encoded_ca
144
.as_ref()
145
.map(|x| PreComputedKeys::RowEncoded(x.downcast_as_array().clone()));
146
}
147
148
let gather_source_df: Cow<DataFrame> =
149
if let Some(projection) = self.exclude_keys_projection.as_ref() {
150
let columns = df.columns();
151
152
Cow::Owned(unsafe {
153
DataFrame::new_unchecked(
154
df.height(),
155
projection
156
.iter_indices()
157
.map(|i| columns[i].clone())
158
.collect(),
159
)
160
})
161
} else {
162
Cow::Borrowed(&df)
163
};
164
165
let partitions_vec: Vec<Partition> = groups
166
.iter()
167
.map(|groups_indicator| {
168
let first_idx = groups_indicator.first();
169
let df = unsafe { gather_source_df.gather_group_unchecked(&groups_indicator) };
170
171
// Ensure 0-width is handled properly.
172
assert_eq!(df.height(), groups_indicator.len());
173
174
let keys_df: DataFrame = unsafe {
175
DataFrame::new_unchecked(
176
1,
177
key_columns
178
.iter()
179
.map(|c| c.take_slice_unchecked(&[first_idx]))
180
.collect(),
181
)
182
};
183
184
let key: PartitionKey = pre_computed_keys.as_ref().map_or_else(
185
|| PartitionKey::from_slice(row_encode(keys_df.columns()).get(0).unwrap()),
186
|keys| keys.get_key(first_idx.try_into().unwrap()),
187
);
188
189
Partition { key, keys_df, df }
190
})
191
.collect();
192
193
Ok(partitions_vec)
194
}
195
}
196
197