Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/firecracker
Path: blob/main/tests/framework/stats/producer.py
1956 views
1
# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
# SPDX-License-Identifier: Apache-2.0
3
4
"""Producer of statistics."""
5
6
from abc import ABC, abstractmethod
7
from typing import Callable, Any
8
from framework import utils
9
10
11
# pylint: disable=R0903
12
class Producer(ABC):
13
"""Base class for raw results producer."""
14
15
@abstractmethod
16
def produce(self) -> Any:
17
"""Produce raw results."""
18
19
20
class SSHCommand(Producer):
21
"""Producer from executing ssh commands."""
22
23
def __init__(self, cmd, ssh_connection):
24
"""Initialize the raw data producer."""
25
self._cmd = cmd
26
self._ssh_connection = ssh_connection
27
28
def produce(self) -> Any:
29
"""Return the output of the executed ssh command."""
30
rc, stdout, stderr = \
31
self._ssh_connection.execute_command(self._cmd)
32
assert rc == 0
33
assert stderr.read() == ""
34
35
return stdout.read()
36
37
@property
38
def ssh_connection(self):
39
"""Return the ssh connection used by the producer.
40
41
The ssh connection used by the producer to execute commands on
42
the guest.
43
"""
44
return self._ssh_connection
45
46
@ssh_connection.setter
47
def ssh_connection(self, ssh_connection):
48
"""Set the ssh connection used by the producer."""
49
self._ssh_connection = ssh_connection
50
51
@property
52
def cmd(self):
53
"""Return the command executed on guest."""
54
return self._cmd
55
56
@cmd.setter
57
def cmd(self, cmd):
58
"""Set the command executed on guest."""
59
self._cmd = cmd
60
61
62
class HostCommand(Producer):
63
"""Producer from executing commands on host."""
64
65
def __init__(self, cmd):
66
"""Initialize the raw data producer."""
67
self._cmd = cmd
68
69
def produce(self) -> Any:
70
"""Return output of the executed command."""
71
result = utils.run_cmd(self._cmd)
72
return result.stdout
73
74
@property
75
def cmd(self):
76
"""Return the command executed on host."""
77
return self._cmd
78
79
@cmd.setter
80
def cmd(self, cmd):
81
"""Set the command executed on host."""
82
self._cmd = cmd
83
84
85
class LambdaProducer(Producer):
86
"""Producer from calling python functions."""
87
88
def __init__(self, func: Callable, func_kwargs=None):
89
"""Initialize the raw data producer."""
90
super().__init__()
91
assert callable(func)
92
self._func = func
93
self._func_kwargs = func_kwargs
94
95
# pylint: disable=R1710
96
def produce(self) -> Any:
97
"""Call `self._func`."""
98
if self._func_kwargs:
99
raw_data = self._func(**self._func_kwargs)
100
return raw_data
101
102
raw_data = self._func()
103
return raw_data
104
105
@property
106
def func(self):
107
"""Return producer function."""
108
return self._func
109
110
@func.setter
111
def func(self, func: Callable):
112
self._func = func
113
114
@property
115
def func_kwargs(self):
116
"""Return producer function arguments."""
117
return self._func_kwargs
118
119
@func_kwargs.setter
120
def func_kwargs(self, func_kwargs):
121
self._func_kwargs = func_kwargs
122
123