Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/firecracker
Path: blob/main/tests/framework/stats/core.py
1956 views
1
# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
# SPDX-License-Identifier: Apache-2.0
3
4
"""Core module for statistics component management."""
5
6
7
from datetime import datetime
8
from collections import namedtuple, defaultdict
9
import types
10
from typing_extensions import TypedDict
11
from framework.utils import ExceptionAggregator
12
13
from .producer import Producer
14
from .consumer import Consumer, ProcessingException
15
16
17
class CoreException(ExceptionAggregator):
18
"""Exception used to return core messages.
19
20
The caller should handle the exception accordingly.
21
"""
22
23
def __init__(self, result=None):
24
"""Initialize the exception."""
25
super().__init__()
26
self.result = result
27
28
29
class Result(TypedDict):
30
"""Data class for aggregated statistic results."""
31
32
name: str
33
iterations: int
34
results: dict
35
custom: dict
36
37
38
Pipe = namedtuple("Pipe", "producer consumer")
39
40
41
class Core:
42
"""Base class for statistics core driver."""
43
44
# pylint: disable=W0102
45
def __init__(self, name, iterations, custom={}):
46
"""Core constructor."""
47
self._pipes = defaultdict(Pipe)
48
self._result = Result(name=name,
49
iterations=iterations,
50
results={},
51
custom=custom)
52
self._failure_aggregator = CoreException()
53
54
def add_pipe(self, producer: Producer, consumer: Consumer, tag=None):
55
"""Add a new producer-consumer pipe."""
56
if tag is None:
57
tag = self._result['name'] + "_" + \
58
str(datetime.timestamp(datetime.now()))
59
self._pipes[tag] = Pipe(producer, consumer)
60
61
def run_exercise(self, fail_fast=False) -> Result:
62
"""Drive the statistics producers until completion."""
63
iterations = self._result['iterations']
64
# This is used for identation purposes.
65
for tag, pipe in self._pipes.items():
66
for iteration in range(iterations):
67
raw_data = pipe.producer.produce()
68
if isinstance(raw_data, types.GeneratorType):
69
for data in raw_data:
70
pipe.consumer.ingest(iteration, data)
71
else:
72
pipe.consumer.ingest(iteration, raw_data)
73
try:
74
stats, custom = pipe.consumer.process(fail_fast)
75
except ProcessingException as err:
76
self._failure_aggregator.add_row(f"Failed on '{tag}':")
77
self._failure_aggregator.add_row(err)
78
stats = err.stats
79
custom = err.custom
80
if fail_fast:
81
raise self._failure_aggregator
82
83
self._result['results'][tag] = stats
84
85
# Custom information extracted from all the iterations.
86
if len(custom) > 0:
87
self._result['custom'][tag] = custom
88
89
if self._failure_aggregator.has_any():
90
self._failure_aggregator.result = self._result
91
raise self._failure_aggregator
92
93
return self._result
94
95
@property
96
def name(self):
97
"""Return statistics name."""
98
return self._result.name
99
100
@name.setter
101
def name(self, name):
102
"""Set statistics name."""
103
self._result.name = name
104
105
@property
106
def iterations(self):
107
"""Return statistics iterations count."""
108
return self._result.iterations
109
110
@iterations.setter
111
def iterations(self, iterations):
112
"""Set statistics iterations count."""
113
self._result.iterations = iterations
114
115
@property
116
def custom(self):
117
"""Return statistics custom information."""
118
return self._result.custom
119
120
@custom.setter
121
def custom(self, custom):
122
"""Set statistics custom information."""
123
self._result.custom = custom
124
125
@property
126
def statistics(self):
127
"""Return statistics gathered so far."""
128
return self._result
129
130