Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/firecracker
Path: blob/main/tests/integration_tests/performance/test_block_performance.py
1958 views
1
# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
# SPDX-License-Identifier: Apache-2.0
3
"""Performance benchmark for block device emulation."""
4
import concurrent
5
import json
6
import logging
7
import os
8
from enum import Enum
9
import shutil
10
import pytest
11
12
from conftest import _test_images_s3_bucket
13
from framework.artifacts import ArtifactCollection, ArtifactSet
14
from framework.builder import MicrovmBuilder
15
from framework.matrix import TestContext, TestMatrix
16
from framework.stats import core
17
from framework.stats.baseline import Provider as BaselineProvider
18
from framework.stats.metadata import DictProvider as DictMetadataProvider
19
from framework.utils import get_cpu_percent, CmdBuilder, DictQuery, run_cmd
20
from framework.utils_cpuid import get_cpu_model_name
21
import host_tools.drive as drive_tools
22
import host_tools.network as net_tools # pylint: disable=import-error
23
import framework.stats as st
24
from integration_tests.performance.configs import defs
25
from integration_tests.performance.utils import handle_failure, \
26
dump_test_result
27
28
DEBUG = False
29
TEST_ID = "block_device_performance"
30
FIO = "fio"
31
32
# Measurements tags.
33
CPU_UTILIZATION_VMM = "cpu_utilization_vmm"
34
CPU_UTILIZATION_VMM_SAMPLES_TAG = "cpu_utilization_vmm_samples"
35
CPU_UTILIZATION_VCPUS_TOTAL = "cpu_utilization_vcpus_total"
36
CONFIG = json.load(open(defs.CFG_LOCATION /
37
"block_performance_test_config.json"))
38
39
40
# pylint: disable=R0903
41
class BlockBaselinesProvider(BaselineProvider):
42
"""Implementation of a baseline provider for the block performance test."""
43
44
def __init__(self, env_id, fio_id):
45
"""Block baseline provider initialization."""
46
cpu_model_name = get_cpu_model_name()
47
baselines = list(filter(
48
lambda cpu_baseline: cpu_baseline["model"] == cpu_model_name,
49
CONFIG["hosts"]["instances"]["m5d.metal"]["cpus"]))
50
51
super().__init__(DictQuery(dict()))
52
if len(baselines) > 0:
53
super().__init__(DictQuery(baselines[0]))
54
55
self._tag = "baselines/{}/" + env_id + "/{}/" + fio_id
56
57
def get(self, ms_name: str, st_name: str) -> dict:
58
"""Return the baseline value corresponding to the key."""
59
key = self._tag.format(ms_name, st_name)
60
baseline = self._baselines.get(key)
61
if baseline:
62
target = baseline.get("target")
63
delta_percentage = baseline.get("delta_percentage")
64
return {
65
"target": target,
66
"delta": delta_percentage * target / 100,
67
}
68
return None
69
70
71
def run_fio(env_id, basevm, ssh_conn, mode, bs):
72
"""Run a fio test in the specified mode with block size bs."""
73
logs_path = f"{basevm.jailer.chroot_base_with_id()}/{env_id}/{mode}{bs}"
74
75
# Compute the fio command. Pin it to the first guest CPU.
76
cmd = CmdBuilder(FIO) \
77
.with_arg(f"--name={mode}-{bs}") \
78
.with_arg(f"--rw={mode}") \
79
.with_arg(f"--bs={bs}") \
80
.with_arg("--filename=/dev/vdb") \
81
.with_arg("--time_base=1") \
82
.with_arg(f"--size={CONFIG['block_device_size']}M") \
83
.with_arg("--direct=1") \
84
.with_arg("--ioengine=libaio") \
85
.with_arg("--iodepth=32") \
86
.with_arg(f"--ramp_time={CONFIG['omit']}") \
87
.with_arg(f"--numjobs={CONFIG['load_factor'] * basevm.vcpus_count}") \
88
.with_arg("--randrepeat=0") \
89
.with_arg(f"--runtime={CONFIG['time']}") \
90
.with_arg(f"--write_iops_log={mode}{bs}") \
91
.with_arg(f"--write_bw_log={mode}{bs}") \
92
.with_arg("--log_avg_msec=1000") \
93
.with_arg("--output-format=json+") \
94
.build()
95
96
rc, _, stderr = ssh_conn.execute_command(
97
"echo 'none' > /sys/block/vdb/queue/scheduler")
98
assert rc == 0, stderr.read()
99
assert stderr.read() == ""
100
101
# First, flush all guest cached data to host, then drop guest FS caches.
102
rc, _, stderr = ssh_conn.execute_command("sync")
103
assert rc == 0, stderr.read()
104
assert stderr.read() == ""
105
rc, _, stderr = ssh_conn.execute_command(
106
"echo 3 > /proc/sys/vm/drop_caches")
107
assert rc == 0, stderr.read()
108
assert stderr.read() == ""
109
110
# Then, flush all host cached data to hardware, also drop host FS caches.
111
run_cmd("sync")
112
run_cmd("echo 3 > /proc/sys/vm/drop_caches")
113
114
# Start the CPU load monitor.
115
with concurrent.futures.ThreadPoolExecutor() as executor:
116
cpu_load_future = executor.submit(get_cpu_percent,
117
basevm.jailer_clone_pid,
118
CONFIG["time"],
119
omit=CONFIG["omit"])
120
121
# Print the fio command in the log and run it
122
rc, _, stderr = ssh_conn.execute_command(cmd)
123
assert rc == 0, stderr.read()
124
assert stderr.read() == ""
125
126
if os.path.isdir(logs_path):
127
shutil.rmtree(logs_path)
128
129
os.makedirs(logs_path)
130
131
ssh_conn.scp_get_file("*.log", logs_path)
132
rc, _, stderr = ssh_conn.execute_command("rm *.log")
133
assert rc == 0, stderr.read()
134
135
result = dict()
136
cpu_load = cpu_load_future.result()
137
tag = "firecracker"
138
assert tag in cpu_load and len(cpu_load[tag]) == 1
139
140
data = list(cpu_load[tag].values())[0]
141
data_len = len(data)
142
assert data_len == CONFIG["time"]
143
144
result[CPU_UTILIZATION_VMM] = sum(data) / data_len
145
if DEBUG:
146
result[CPU_UTILIZATION_VMM_SAMPLES_TAG] = data
147
148
vcpus_util = 0
149
for vcpu in range(basevm.vcpus_count):
150
# We expect a single fc_vcpu thread tagged with
151
# f`fc_vcpu {vcpu}`.
152
tag = f"fc_vcpu {vcpu}"
153
assert tag in cpu_load and len(cpu_load[tag]) == 1
154
data = list(cpu_load[tag].values())[0]
155
data_len = len(data)
156
157
assert data_len == CONFIG["time"]
158
if DEBUG:
159
samples_tag = f"cpu_utilization_fc_vcpu_{vcpu}_samples"
160
result[samples_tag] = data
161
vcpus_util += sum(data) / data_len
162
163
result[CPU_UTILIZATION_VCPUS_TOTAL] = vcpus_util
164
return result
165
166
167
class DataDirection(Enum):
168
"""Operation type."""
169
170
READ = 0
171
WRITE = 1
172
TRIM = 2
173
174
def __str__(self):
175
"""Representation as string."""
176
# pylint: disable=W0143
177
if self.value == 0:
178
return "read"
179
# pylint: disable=W0143
180
if self.value == 1:
181
return "write"
182
# pylint: disable=W0143
183
if self.value == 2:
184
return "trim"
185
return ""
186
187
188
def read_values(cons, numjobs, env_id, mode, bs, measurement, logs_path):
189
"""Read the values for each measurement.
190
191
The values are logged once every second. The time resolution is in msec.
192
The log file format documentation can be found here:
193
https://fio.readthedocs.io/en/latest/fio_doc.html#log-file-formats
194
"""
195
values = dict()
196
197
for job_id in range(numjobs):
198
file_path = f"{logs_path}/{env_id}/{mode}{bs}/{mode}" \
199
f"{bs}_{measurement}.{job_id + 1}.log"
200
file = open(file_path)
201
lines = file.readlines()
202
203
direction_count = 1
204
if mode.endswith("readwrite") or mode.endswith("rw"):
205
direction_count = 2
206
207
for idx in range(0, len(lines), direction_count):
208
value_idx = idx//direction_count
209
for direction in range(direction_count):
210
data = lines[idx + direction].split(sep=",")
211
data_dir = DataDirection(int(data[2].strip()))
212
213
measurement_id = f"{measurement}_{str(data_dir)}"
214
if measurement_id not in values:
215
values[measurement_id] = dict()
216
217
if value_idx not in values[measurement_id]:
218
values[measurement_id][value_idx] = list()
219
values[measurement_id][value_idx].append(int(data[1].strip()))
220
221
for measurement_id in values:
222
for idx in values[measurement_id]:
223
# Discard data points which were not measured by all jobs.
224
if len(values[measurement_id][idx]) != numjobs:
225
continue
226
227
value = sum(values[measurement_id][idx])
228
if DEBUG:
229
cons.consume_custom(measurement_id, value)
230
cons.consume_data(measurement_id, value)
231
232
233
def consume_fio_output(cons, result, numjobs, mode, bs, env_id, logs_path):
234
"""Consumer function."""
235
cpu_utilization_vmm = result[CPU_UTILIZATION_VMM]
236
cpu_utilization_vcpus = result[CPU_UTILIZATION_VCPUS_TOTAL]
237
238
cons.consume_stat("Avg", CPU_UTILIZATION_VMM, cpu_utilization_vmm)
239
cons.consume_stat("Avg",
240
CPU_UTILIZATION_VCPUS_TOTAL,
241
cpu_utilization_vcpus)
242
243
read_values(cons, numjobs, env_id, mode, bs, "iops", logs_path)
244
read_values(cons, numjobs, env_id, mode, bs, "bw", logs_path)
245
246
247
@pytest.mark.nonci
248
@pytest.mark.timeout(CONFIG["time"] * 1000) # 1.40 hours
249
def test_block_performance(bin_cloner_path, results_file_dumper):
250
"""Test network throughput driver for multiple artifacts."""
251
logger = logging.getLogger(TEST_ID)
252
artifacts = ArtifactCollection(_test_images_s3_bucket())
253
microvm_artifacts = ArtifactSet(artifacts.microvms(keyword="2vcpu_1024mb"))
254
microvm_artifacts.insert(artifacts.microvms(keyword="1vcpu_1024mb"))
255
kernel_artifacts = ArtifactSet(artifacts.kernels())
256
disk_artifacts = ArtifactSet(artifacts.disks(keyword="ubuntu"))
257
258
# Create a test context and add builder, logger, network.
259
test_context = TestContext()
260
test_context.custom = {
261
'builder': MicrovmBuilder(bin_cloner_path),
262
'logger': logger,
263
'name': TEST_ID,
264
'results_file_dumper': results_file_dumper
265
}
266
267
test_matrix = TestMatrix(context=test_context,
268
artifact_sets=[
269
microvm_artifacts,
270
kernel_artifacts,
271
disk_artifacts
272
])
273
test_matrix.run_test(fio_workload)
274
275
276
def fio_workload(context):
277
"""Execute block device emulation benchmarking scenarios."""
278
vm_builder = context.custom['builder']
279
logger = context.custom["logger"]
280
file_dumper = context.custom["results_file_dumper"]
281
282
# Create a rw copy artifact.
283
rw_disk = context.disk.copy()
284
# Get ssh key from read-only artifact.
285
ssh_key = context.disk.ssh_key()
286
# Create a fresh microvm from artifacts.
287
vm_instance = vm_builder.build(kernel=context.kernel,
288
disks=[rw_disk],
289
ssh_key=ssh_key,
290
config=context.microvm)
291
basevm = vm_instance.vm
292
293
# Add a secondary block device for benchmark tests.
294
fs = drive_tools.FilesystemFile(
295
os.path.join(basevm.fsfiles, 'scratch'),
296
CONFIG["block_device_size"]
297
)
298
basevm.add_drive('scratch', fs.path)
299
basevm.start()
300
301
# Get names of threads in Firecracker.
302
current_cpu_id = 0
303
basevm.pin_vmm(current_cpu_id)
304
current_cpu_id += 1
305
basevm.pin_api(current_cpu_id)
306
for vcpu_id in range(basevm.vcpus_count):
307
current_cpu_id += 1
308
basevm.pin_vcpu(vcpu_id, current_cpu_id)
309
310
st_core = core.Core(name=TEST_ID,
311
iterations=1,
312
custom={"microvm": context.microvm.name(),
313
"kernel": context.kernel.name(),
314
"disk": context.disk.name(),
315
"cpu_model_name": get_cpu_model_name()})
316
317
logger.info("Testing with microvm: \"{}\", kernel {}, disk {}"
318
.format(context.microvm.name(),
319
context.kernel.name(),
320
context.disk.name()))
321
322
ssh_connection = net_tools.SSHConnection(basevm.ssh_config)
323
env_id = f"{context.kernel.name()}/{context.disk.name()}/" \
324
f"{context.microvm.name()}"
325
326
for mode in CONFIG["fio_modes"]:
327
for bs in CONFIG["fio_blk_sizes"]:
328
fio_id = f"{mode}-bs{bs}"
329
st_prod = st.producer.LambdaProducer(
330
func=run_fio,
331
func_kwargs={"env_id": env_id, "basevm": basevm,
332
"ssh_conn": ssh_connection, "mode": mode,
333
"bs": bs})
334
st_cons = st.consumer.LambdaConsumer(
335
metadata_provider=DictMetadataProvider(
336
CONFIG["measurements"],
337
BlockBaselinesProvider(env_id,
338
fio_id)),
339
func=consume_fio_output,
340
func_kwargs={"numjobs": basevm.vcpus_count, "mode": mode,
341
"bs": bs, "env_id": env_id,
342
"logs_path": basevm.jailer.chroot_base_with_id()})
343
st_core.add_pipe(st_prod, st_cons, tag=f"{env_id}/{fio_id}")
344
345
# Gather results and verify pass criteria.
346
try:
347
result = st_core.run_exercise()
348
except core.CoreException as err:
349
handle_failure(file_dumper, err)
350
351
dump_test_result(file_dumper, result)
352
353