Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sinks/metrics.rs
6939 views
1
use arrow::array::builder::ShareStrategy;
2
use polars_core::frame::DataFrame;
3
use polars_core::prelude::{
4
AnyValue, ChunkedBuilder, DataType, IntoColumn, PrimitiveChunkedBuilder, StringChunkedBuilder,
5
StructChunked, UInt64Type,
6
};
7
use polars_core::schema::Schema;
8
use polars_core::series::builder::SeriesBuilder;
9
use polars_core::series::{IntoSeries, Series};
10
use polars_error::PolarsResult;
11
use polars_expr::reduce::{GroupedReduction, new_max_reduction, new_min_reduction};
12
use polars_utils::format_pl_smallstr;
13
use polars_utils::pl_str::PlSmallStr;
14
15
/// Metrics that relate to a written file.
16
pub struct WriteMetrics {
17
/// Stringified path to the file.
18
pub path: String,
19
/// Number of rows in the file.
20
pub num_rows: u64,
21
/// Size of written file in bytes.
22
pub file_size: u64,
23
/// Keys of the partition.
24
pub keys: Option<Vec<AnyValue<'static>>>,
25
/// Metrics for each column.
26
pub columns: Vec<WriteMetricsColumn>,
27
}
28
29
/// Metrics in a written file for a specific column.
30
pub struct WriteMetricsColumn {
31
/// Number of missing values in the column.
32
pub null_count: u64,
33
/// Number of NaN values in the column.
34
pub nan_count: u64,
35
/// The minimum value in the column.
36
///
37
/// `NaN`s are always ignored and `None` is the default value.
38
pub lower_bound: Option<Box<dyn GroupedReduction>>,
39
/// The maximum value in the column.
40
///
41
/// `NaN`s are always ignored and `None` is the default value.
42
pub upper_bound: Option<Box<dyn GroupedReduction>>,
43
}
44
45
impl WriteMetrics {
46
pub fn new(path: String, schema: &Schema) -> Self {
47
Self {
48
path,
49
file_size: 0,
50
num_rows: 0,
51
keys: None,
52
columns: schema
53
.iter_values()
54
.cloned()
55
.map(WriteMetricsColumn::new)
56
.collect(),
57
}
58
}
59
60
pub fn append(&mut self, df: &DataFrame) -> PolarsResult<()> {
61
assert_eq!(self.columns.len(), df.width());
62
self.num_rows += df.height() as u64;
63
for (w, c) in self.columns.iter_mut().zip(df.get_columns()) {
64
let null_count = c.null_count();
65
w.null_count += c.null_count() as u64;
66
67
let mut has_non_null_non_nan_values = df.height() != null_count;
68
if c.dtype().is_float() {
69
let nan_count = c.is_nan()?.sum().unwrap_or_default();
70
has_non_null_non_nan_values = nan_count as usize + null_count < df.height();
71
#[allow(clippy::useless_conversion)]
72
{
73
w.nan_count += u64::from(nan_count);
74
}
75
}
76
77
if has_non_null_non_nan_values {
78
if let Some(lb) = &mut w.lower_bound {
79
lb.update_group(c, 0, 0)?;
80
}
81
if let Some(ub) = &mut w.upper_bound {
82
ub.update_group(c, 0, 0)?;
83
}
84
}
85
}
86
Ok(())
87
}
88
89
pub fn collapse_to_df(
90
metrics: Vec<Self>,
91
input_schema: &Schema,
92
key_schema: Option<&Schema>,
93
) -> DataFrame {
94
let num_metrics = metrics.len();
95
96
let mut path = StringChunkedBuilder::new(PlSmallStr::from_static("path"), num_metrics);
97
let mut num_rows = PrimitiveChunkedBuilder::<UInt64Type>::new(
98
PlSmallStr::from_static("num_rows"),
99
num_metrics,
100
);
101
let mut file_size = PrimitiveChunkedBuilder::<UInt64Type>::new(
102
PlSmallStr::from_static("file_size"),
103
num_metrics,
104
);
105
let mut keys = key_schema.map(|s| {
106
(0..s.len())
107
.map(|_| Vec::with_capacity(metrics.len()))
108
.collect::<Vec<_>>()
109
});
110
let mut columns = input_schema
111
.iter_values()
112
.map(|dtype| {
113
let null_count = PrimitiveChunkedBuilder::<UInt64Type>::new(
114
PlSmallStr::from_static("null_count"),
115
num_metrics,
116
);
117
let nan_count = PrimitiveChunkedBuilder::<UInt64Type>::new(
118
PlSmallStr::from_static("nan_count"),
119
num_metrics,
120
);
121
let mut lower_bound = SeriesBuilder::new(dtype.clone());
122
let mut upper_bound = SeriesBuilder::new(dtype.clone());
123
lower_bound.reserve(num_metrics);
124
upper_bound.reserve(num_metrics);
125
126
(null_count, nan_count, lower_bound, upper_bound)
127
})
128
.collect::<Vec<_>>();
129
130
for m in metrics {
131
path.append_value(m.path);
132
num_rows.append_value(m.num_rows);
133
file_size.append_value(m.file_size);
134
match (&mut keys, m.keys) {
135
(None, None) => {},
136
(Some(keys), Some(m_keys)) => {
137
for (key, m_key) in keys.iter_mut().zip(m_keys) {
138
key.push(m_key);
139
}
140
},
141
_ => unreachable!(),
142
}
143
144
for (mut w, c) in m.columns.into_iter().zip(columns.iter_mut()) {
145
c.0.append_value(w.null_count);
146
c.1.append_value(w.nan_count);
147
match &mut w.lower_bound {
148
None => c.2.extend_nulls(1),
149
Some(lb) => c.2.extend(&lb.finalize().unwrap(), ShareStrategy::Always),
150
}
151
match &mut w.upper_bound {
152
None => c.3.extend_nulls(1),
153
Some(ub) => c.3.extend(&ub.finalize().unwrap(), ShareStrategy::Always),
154
}
155
}
156
}
157
158
let mut df_columns = Vec::with_capacity(4 + input_schema.len());
159
df_columns.push(path.finish().into_column());
160
df_columns.push(num_rows.finish().into_column());
161
df_columns.push(file_size.finish().into_column());
162
match (keys, key_schema) {
163
(None, None) => df_columns.push(
164
StructChunked::from_series(
165
PlSmallStr::from_static("keys"),
166
num_metrics,
167
[].into_iter(),
168
)
169
.unwrap()
170
.into_column(),
171
),
172
(Some(keys), Some(key_schema)) => {
173
let keys = keys
174
.into_iter()
175
.zip(key_schema.iter())
176
.map(|(key, (name, dtype))| {
177
Series::from_any_values_and_dtype(name.clone(), key.as_slice(), dtype, true)
178
.unwrap()
179
})
180
.collect::<Vec<Series>>();
181
df_columns.push(
182
StructChunked::from_series(
183
PlSmallStr::from_static("keys"),
184
num_metrics,
185
keys.iter(),
186
)
187
.unwrap()
188
.into_column(),
189
);
190
},
191
_ => unreachable!(),
192
}
193
for (name, column) in input_schema.iter_names().zip(columns) {
194
let struct_ca = StructChunked::from_series(
195
format_pl_smallstr!("{name}_stats"),
196
num_metrics,
197
[
198
column.0.finish().into_series(),
199
column.1.finish().into_series(),
200
column.2.freeze(PlSmallStr::from_static("lower_bound")),
201
column.3.freeze(PlSmallStr::from_static("upper_bound")),
202
]
203
.iter(),
204
)
205
.unwrap();
206
df_columns.push(struct_ca.into_column());
207
}
208
209
DataFrame::new_with_height(num_metrics, df_columns).unwrap()
210
}
211
}
212
213
impl WriteMetricsColumn {
214
pub fn new(dtype: DataType) -> Self {
215
let (lower_bound, upper_bound) = if dtype.is_nested() {
216
(None, None)
217
} else {
218
let mut lower_bound = new_min_reduction(dtype.clone(), false);
219
let mut upper_bound = new_max_reduction(dtype, false);
220
221
lower_bound.resize(1);
222
upper_bound.resize(1);
223
224
(Some(lower_bound), Some(upper_bound))
225
};
226
227
Self {
228
null_count: 0,
229
nan_count: 0,
230
lower_bound,
231
upper_bound,
232
}
233
}
234
}
235
236