Path: blob/main/crates/polars-stream/src/nodes/io_sinks/metrics.rs
6939 views
use arrow::array::builder::ShareStrategy;1use polars_core::frame::DataFrame;2use polars_core::prelude::{3AnyValue, ChunkedBuilder, DataType, IntoColumn, PrimitiveChunkedBuilder, StringChunkedBuilder,4StructChunked, UInt64Type,5};6use polars_core::schema::Schema;7use polars_core::series::builder::SeriesBuilder;8use polars_core::series::{IntoSeries, Series};9use polars_error::PolarsResult;10use polars_expr::reduce::{GroupedReduction, new_max_reduction, new_min_reduction};11use polars_utils::format_pl_smallstr;12use polars_utils::pl_str::PlSmallStr;1314/// Metrics that relate to a written file.15pub struct WriteMetrics {16/// Stringified path to the file.17pub path: String,18/// Number of rows in the file.19pub num_rows: u64,20/// Size of written file in bytes.21pub file_size: u64,22/// Keys of the partition.23pub keys: Option<Vec<AnyValue<'static>>>,24/// Metrics for each column.25pub columns: Vec<WriteMetricsColumn>,26}2728/// Metrics in a written file for a specific column.29pub struct WriteMetricsColumn {30/// Number of missing values in the column.31pub null_count: u64,32/// Number of NaN values in the column.33pub nan_count: u64,34/// The minimum value in the column.35///36/// `NaN`s are always ignored and `None` is the default value.37pub lower_bound: Option<Box<dyn GroupedReduction>>,38/// The maximum value in the column.39///40/// `NaN`s are always ignored and `None` is the default value.41pub upper_bound: Option<Box<dyn GroupedReduction>>,42}4344impl WriteMetrics {45pub fn new(path: String, schema: &Schema) -> Self {46Self {47path,48file_size: 0,49num_rows: 0,50keys: None,51columns: schema52.iter_values()53.cloned()54.map(WriteMetricsColumn::new)55.collect(),56}57}5859pub fn append(&mut self, df: &DataFrame) -> PolarsResult<()> {60assert_eq!(self.columns.len(), df.width());61self.num_rows += df.height() as u64;62for (w, c) in self.columns.iter_mut().zip(df.get_columns()) {63let null_count = c.null_count();64w.null_count += c.null_count() as u64;6566let mut has_non_null_non_nan_values = df.height() != null_count;67if c.dtype().is_float() {68let nan_count = c.is_nan()?.sum().unwrap_or_default();69has_non_null_non_nan_values = nan_count as usize + null_count < df.height();70#[allow(clippy::useless_conversion)]71{72w.nan_count += u64::from(nan_count);73}74}7576if has_non_null_non_nan_values {77if let Some(lb) = &mut w.lower_bound {78lb.update_group(c, 0, 0)?;79}80if let Some(ub) = &mut w.upper_bound {81ub.update_group(c, 0, 0)?;82}83}84}85Ok(())86}8788pub fn collapse_to_df(89metrics: Vec<Self>,90input_schema: &Schema,91key_schema: Option<&Schema>,92) -> DataFrame {93let num_metrics = metrics.len();9495let mut path = StringChunkedBuilder::new(PlSmallStr::from_static("path"), num_metrics);96let mut num_rows = PrimitiveChunkedBuilder::<UInt64Type>::new(97PlSmallStr::from_static("num_rows"),98num_metrics,99);100let mut file_size = PrimitiveChunkedBuilder::<UInt64Type>::new(101PlSmallStr::from_static("file_size"),102num_metrics,103);104let mut keys = key_schema.map(|s| {105(0..s.len())106.map(|_| Vec::with_capacity(metrics.len()))107.collect::<Vec<_>>()108});109let mut columns = input_schema110.iter_values()111.map(|dtype| {112let null_count = PrimitiveChunkedBuilder::<UInt64Type>::new(113PlSmallStr::from_static("null_count"),114num_metrics,115);116let nan_count = PrimitiveChunkedBuilder::<UInt64Type>::new(117PlSmallStr::from_static("nan_count"),118num_metrics,119);120let mut lower_bound = SeriesBuilder::new(dtype.clone());121let mut upper_bound = SeriesBuilder::new(dtype.clone());122lower_bound.reserve(num_metrics);123upper_bound.reserve(num_metrics);124125(null_count, nan_count, lower_bound, upper_bound)126})127.collect::<Vec<_>>();128129for m in metrics {130path.append_value(m.path);131num_rows.append_value(m.num_rows);132file_size.append_value(m.file_size);133match (&mut keys, m.keys) {134(None, None) => {},135(Some(keys), Some(m_keys)) => {136for (key, m_key) in keys.iter_mut().zip(m_keys) {137key.push(m_key);138}139},140_ => unreachable!(),141}142143for (mut w, c) in m.columns.into_iter().zip(columns.iter_mut()) {144c.0.append_value(w.null_count);145c.1.append_value(w.nan_count);146match &mut w.lower_bound {147None => c.2.extend_nulls(1),148Some(lb) => c.2.extend(&lb.finalize().unwrap(), ShareStrategy::Always),149}150match &mut w.upper_bound {151None => c.3.extend_nulls(1),152Some(ub) => c.3.extend(&ub.finalize().unwrap(), ShareStrategy::Always),153}154}155}156157let mut df_columns = Vec::with_capacity(4 + input_schema.len());158df_columns.push(path.finish().into_column());159df_columns.push(num_rows.finish().into_column());160df_columns.push(file_size.finish().into_column());161match (keys, key_schema) {162(None, None) => df_columns.push(163StructChunked::from_series(164PlSmallStr::from_static("keys"),165num_metrics,166[].into_iter(),167)168.unwrap()169.into_column(),170),171(Some(keys), Some(key_schema)) => {172let keys = keys173.into_iter()174.zip(key_schema.iter())175.map(|(key, (name, dtype))| {176Series::from_any_values_and_dtype(name.clone(), key.as_slice(), dtype, true)177.unwrap()178})179.collect::<Vec<Series>>();180df_columns.push(181StructChunked::from_series(182PlSmallStr::from_static("keys"),183num_metrics,184keys.iter(),185)186.unwrap()187.into_column(),188);189},190_ => unreachable!(),191}192for (name, column) in input_schema.iter_names().zip(columns) {193let struct_ca = StructChunked::from_series(194format_pl_smallstr!("{name}_stats"),195num_metrics,196[197column.0.finish().into_series(),198column.1.finish().into_series(),199column.2.freeze(PlSmallStr::from_static("lower_bound")),200column.3.freeze(PlSmallStr::from_static("upper_bound")),201]202.iter(),203)204.unwrap();205df_columns.push(struct_ca.into_column());206}207208DataFrame::new_with_height(num_metrics, df_columns).unwrap()209}210}211212impl WriteMetricsColumn {213pub fn new(dtype: DataType) -> Self {214let (lower_bound, upper_bound) = if dtype.is_nested() {215(None, None)216} else {217let mut lower_bound = new_min_reduction(dtype.clone(), false);218let mut upper_bound = new_max_reduction(dtype, false);219220lower_bound.resize(1);221upper_bound.resize(1);222223(Some(lower_bound), Some(upper_bound))224};225226Self {227null_count: 0,228nan_count: 0,229lower_bound,230upper_bound,231}232}233}234235236