Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sinks2/components/partitioner.rs
7884 views
1
use std::borrow::Cow;
2
3
use arrow::array::{BinaryViewArray, FixedSizeBinaryArray, PrimitiveArray};
4
use arrow::buffer::Buffer;
5
use arrow::datatypes::ArrowDataType;
6
use polars_core::frame::DataFrame;
7
use polars_core::prelude::row_encode::_get_rows_encoded_ca_unordered;
8
use polars_core::prelude::{
9
BinaryOffsetChunked, Column, DataType, GroupsIndicator, IntoGroupsType, LargeBinaryArray,
10
};
11
use polars_core::{with_match_physical_integer_type, with_match_physical_numeric_type};
12
use polars_error::PolarsResult;
13
use polars_expr::hash_keys::{HashKeysVariant, hash_keys_variant_for_dtype};
14
use polars_expr::state::ExecutionState;
15
use polars_utils::IdxSize;
16
use polars_utils::pl_str::PlSmallStr;
17
18
use crate::async_primitives::wait_group::WaitToken;
19
use crate::expression::StreamExpr;
20
use crate::morsel::Morsel;
21
use crate::nodes::io_sinks2::components::exclude_keys_projection::ExcludeKeysProjection;
22
use crate::nodes::io_sinks2::components::partition_key::{PartitionKey, PreComputedKeys};
23
use crate::nodes::io_sinks2::components::size::RowCountAndSize;
24
25
pub struct PartitionedDataFrames {
26
pub partitions_vec: Vec<Partition>,
27
pub input_size: RowCountAndSize,
28
pub input_wait_token: Option<WaitToken>,
29
}
30
31
pub struct Partition {
32
pub key: PartitionKey,
33
/// 1-row df with keys.
34
pub keys_df: DataFrame,
35
/// Does not include columns in `keys_df`
36
pub df: DataFrame,
37
}
38
39
pub enum Partitioner {
40
/// All rows to a single partition
41
FileSize,
42
Keyed(KeyedPartitioner),
43
}
44
45
impl Partitioner {
46
pub async fn partition_morsel(
47
&self,
48
morsel: Morsel,
49
in_memory_exec_state: &ExecutionState,
50
) -> PolarsResult<PartitionedDataFrames> {
51
let (df, _, _, input_wait_token) = morsel.into_inner();
52
let input_size = RowCountAndSize::new_from_df(&df);
53
let partitions_vec = match self {
54
Self::FileSize => vec![Partition {
55
key: PartitionKey::NULL,
56
keys_df: DataFrame::empty_with_height(1),
57
df,
58
}],
59
Self::Keyed(v) => v.partition_df(df, in_memory_exec_state).await?,
60
};
61
62
let out = PartitionedDataFrames {
63
partitions_vec,
64
input_size,
65
input_wait_token,
66
};
67
68
Ok(out)
69
}
70
71
pub fn verbose_display(&self) -> impl std::fmt::Display + '_ {
72
struct DisplayWrap<'a>(&'a Partitioner);
73
74
impl std::fmt::Display for DisplayWrap<'_> {
75
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76
match self.0 {
77
Partitioner::FileSize => f.write_str("FileSize"),
78
Partitioner::Keyed(kp) => write!(
79
f,
80
"Keyed({} key{})",
81
kp.key_exprs.len(),
82
if kp.key_exprs.len() == 1 { "" } else { "s" }
83
),
84
}
85
}
86
}
87
88
DisplayWrap(self)
89
}
90
}
91
92
pub struct KeyedPartitioner {
93
/// Must be non-empty
94
pub key_exprs: Vec<StreamExpr>,
95
/// Exclude key columns from full gather. Can be `None` if all key exprs output
96
/// names do not overlap with existing names.
97
pub exclude_keys_projection: Option<ExcludeKeysProjection>,
98
}
99
100
impl KeyedPartitioner {
101
async fn partition_df(
102
&self,
103
df: DataFrame,
104
in_memory_exec_state: &ExecutionState,
105
) -> PolarsResult<Vec<Partition>> {
106
assert!(!self.key_exprs.is_empty());
107
108
let mut key_columns = Vec::with_capacity(self.key_exprs.len());
109
110
for e in self.key_exprs.as_slice() {
111
key_columns.push(e.evaluate(&df, in_memory_exec_state).await?);
112
}
113
114
let mut pre_computed_keys: Option<PreComputedKeys> = None;
115
let single_non_encode = match key_columns.as_slice() {
116
[c] => {
117
pre_computed_keys = PreComputedKeys::opt_new_non_encoded(c);
118
hash_keys_variant_for_dtype(c.dtype()) != HashKeysVariant::RowEncoded
119
},
120
_ => false,
121
};
122
123
let row_encode = |columns: &[Column]| -> BinaryOffsetChunked {
124
_get_rows_encoded_ca_unordered(PlSmallStr::EMPTY, columns)
125
.unwrap()
126
.rechunk()
127
.into_owned()
128
};
129
130
let mut keys_encoded_ca: Option<BinaryOffsetChunked> =
131
(!single_non_encode).then(|| row_encode(&key_columns));
132
133
let groups = if single_non_encode {
134
key_columns[0]
135
.as_materialized_series()
136
.group_tuples(false, false)
137
} else {
138
keys_encoded_ca.as_ref().unwrap().group_tuples(false, false)
139
}
140
.unwrap();
141
142
if pre_computed_keys.is_none() {
143
if keys_encoded_ca.is_none() && groups.len() > (df.height() / 4) {
144
keys_encoded_ca = Some(row_encode(&key_columns));
145
}
146
147
pre_computed_keys = keys_encoded_ca
148
.as_ref()
149
.map(|x| PreComputedKeys::RowEncoded(x.downcast_as_array().clone()));
150
}
151
152
let gather_source_df: Cow<DataFrame> =
153
if let Some(projection) = self.exclude_keys_projection.as_ref() {
154
let columns = df.get_columns();
155
156
Cow::Owned(if projection.len() == 0 {
157
DataFrame::empty_with_height(df.height())
158
} else {
159
projection
160
.iter_indices()
161
.map(|i| columns[i].clone())
162
.collect()
163
})
164
} else {
165
Cow::Borrowed(&df)
166
};
167
168
let partitions_vec: Vec<Partition> = groups
169
.iter()
170
.map(|groups_indicator| {
171
let first_idx = groups_indicator.first();
172
let df = unsafe { gather_source_df.gather_group_unchecked(&groups_indicator) };
173
174
// Ensure 0-width is handled properly.
175
assert_eq!(df.height(), groups_indicator.len());
176
177
let keys_df: DataFrame = key_columns
178
.iter()
179
.map(|c| unsafe { c.take_slice_unchecked(&[first_idx]) })
180
.collect();
181
182
let key: PartitionKey = pre_computed_keys.as_ref().map_or_else(
183
|| PartitionKey::from_slice(row_encode(keys_df.get_columns()).get(0).unwrap()),
184
|keys| keys.get_key(first_idx.try_into().unwrap()),
185
);
186
187
Partition { key, keys_df, df }
188
})
189
.collect();
190
191
Ok(partitions_vec)
192
}
193
}
194
195