Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/firecracker
Path: blob/main/tests/framework/stats/consumer.py
1956 views
1
# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
# SPDX-License-Identifier: Apache-2.0
3
4
"""Module for multiple statistics consumers."""
5
6
from abc import ABC, abstractmethod
7
from numbers import Number
8
from typing import Any, Callable
9
from collections import defaultdict
10
from framework.utils import ExceptionAggregator
11
12
from .criteria import CriteriaException
13
from .metadata import Provider as MetadataProvider
14
from .types import MeasurementDef
15
16
17
class ProcessingException(ExceptionAggregator):
18
"""Exception to be raised when criteria fails."""
19
20
def __init__(self, stats=None, custom=None):
21
"""Initialize the exception."""
22
super().__init__()
23
self.stats = stats
24
self.custom = custom
25
26
27
class Consumer(ABC):
28
"""Base class for statistics aggregation class."""
29
30
UNIT_KEY = "_unit"
31
DATA_KEY = "_data"
32
33
# pylint: disable=W0102
34
def __init__(self,
35
metadata_provider: MetadataProvider = None,
36
custom=None):
37
"""Initialize a consumer."""
38
self._iteration = 0
39
self._results = defaultdict() # Aggregated results.
40
self._custom = dict() if not custom else custom
41
self._metadata_provider = metadata_provider
42
43
self._measurements_defs = dict()
44
if metadata_provider:
45
self._measurements_defs = metadata_provider.measurements
46
47
# Final statistics.
48
self._statistics = dict()
49
50
self._failure_aggregator = ProcessingException()
51
52
@abstractmethod
53
def ingest(self, iteration: int, raw_data: Any):
54
"""Abstract method for ingesting the raw result."""
55
56
def consume_data(self, ms_name: str, value: Number):
57
"""Aggregate measurement."""
58
results = self._results.get(ms_name)
59
if not results:
60
self._results[ms_name] = dict()
61
self._results[ms_name][self.DATA_KEY] = list()
62
self._results[ms_name][self.DATA_KEY].append(value)
63
64
def consume_stat(self, st_name: str, ms_name: str, value: Number):
65
"""Aggregate statistics."""
66
results = self._results.get(ms_name)
67
if not results:
68
self._results[ms_name] = dict()
69
self._results[ms_name][st_name] = value
70
71
def consume_custom(self, name: str, value: Any):
72
"""Aggregate custom information."""
73
if not self._custom.get(self._iteration):
74
self._custom[self._iteration] = dict()
75
76
if not self._custom[self._iteration].get(name):
77
self._custom[self._iteration][name] = list()
78
79
self._custom[self._iteration][name].append(value)
80
81
def set_measurement_def(self, value: MeasurementDef):
82
"""Set measurement definition."""
83
self._measurements_defs[value.name] = value
84
85
def _validate(self):
86
"""Verify that the statistics/measurements correspondence...
87
88
is backed by corresponding measurements definitions.
89
"""
90
for ms_name in self._results:
91
if ms_name not in self._measurements_defs:
92
self._failure_aggregator.add_row(
93
f"'{ms_name}' measurement does not have a "
94
"corresponding measurement definition.")
95
96
if self._failure_aggregator.has_any():
97
raise self._failure_aggregator
98
99
def _reset(self):
100
"""Reset the results of this consumer, used in a previous exercise."""
101
self._results = defaultdict()
102
103
def process(self, fail_fast=False) -> (dict, dict):
104
"""Generate statistics as a dictionary."""
105
self._validate()
106
for ms_name in self._results:
107
self._statistics.setdefault(ms_name, {})[self.UNIT_KEY] \
108
= self._measurements_defs[ms_name].unit
109
has_data = Consumer.DATA_KEY in self._results[ms_name]
110
st_defs = self._measurements_defs[ms_name].statistics
111
for st_def in st_defs:
112
if st_def.name not in self._results[ms_name]:
113
if not has_data:
114
self._failure_aggregator.add_row(
115
f"Processing '{st_def.name}' statistic failed due "
116
f"to lack of data points for '{ms_name}' "
117
"measurement.")
118
continue
119
self._statistics[ms_name][st_def.name] = \
120
self._statistics[ms_name][st_def.name] = {
121
"value": st_def.func(self._results[ms_name][
122
self.DATA_KEY])
123
}
124
else:
125
self._statistics[ms_name][st_def.name] = {
126
"value": self._results[ms_name][st_def.name]
127
}
128
129
pass_criteria = st_def.pass_criteria
130
if pass_criteria:
131
self._statistics[ms_name][st_def.name][
132
"pass_criteria"] = {
133
pass_criteria.name: pass_criteria.baseline
134
}
135
res = self._statistics[ms_name][st_def.name]["value"]
136
try:
137
pass_criteria.check(res)
138
self._statistics[ms_name][st_def.name]["outcome"] = \
139
"PASSED"
140
except CriteriaException as err:
141
# pylint: disable=W0707
142
self._statistics[ms_name][st_def.name]["outcome"] = \
143
"FAILED"
144
fail_msg = f"'{ms_name}/{st_def.name}': {err}"
145
self._failure_aggregator.add_row(fail_msg)
146
if fail_fast:
147
raise self._failure_aggregator
148
149
self._reset()
150
151
if self._failure_aggregator.has_any():
152
self._failure_aggregator.stats = self._statistics
153
self._failure_aggregator.custom = self._custom
154
raise self._failure_aggregator
155
156
return self._statistics, self._custom
157
158
159
class LambdaConsumer(Consumer):
160
"""Consumer which executes a function in the ingestion step.
161
162
The function called in the ingestion step must have the following
163
signature: `def func_name(cons: Consumer, raw_output: Any, **kw_args)`.
164
"""
165
166
def __init__(self,
167
func: Callable,
168
func_kwargs=None,
169
metadata_provider: MetadataProvider = None):
170
"""Initialize the LambdaConsumer."""
171
super().__init__(metadata_provider)
172
self._func = func
173
self._func_kwargs = func_kwargs
174
175
def ingest(self, iteration, raw_data):
176
"""Execute the function with or without arguments."""
177
self._iteration = iteration
178
if self._func_kwargs:
179
self._func(self, raw_data, **self._func_kwargs)
180
else:
181
self._func(self, raw_data)
182
183