Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/firecracker
Path: blob/main/tests/integration_tests/performance/test_vsock_throughput.py
1958 views
1
# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
# SPDX-License-Identifier: Apache-2.0
3
"""Tests the VSOCK throughput of Firecracker uVMs."""
4
5
6
import os
7
import json
8
import logging
9
import time
10
import concurrent.futures
11
12
import pytest
13
from conftest import _test_images_s3_bucket
14
from framework.artifacts import ArtifactCollection, ArtifactSet
15
from framework.matrix import TestMatrix, TestContext
16
from framework.builder import MicrovmBuilder
17
from framework.stats import core, consumer, producer
18
from framework.stats.baseline import Provider as BaselineProvider
19
from framework.stats.metadata import DictProvider as DictMetadataProvider
20
from framework.utils import CpuMap, CmdBuilder, run_cmd, get_cpu_percent, \
21
DictQuery
22
from framework.utils_cpuid import get_cpu_model_name
23
import host_tools.network as net_tools
24
from integration_tests.performance.configs import defs
25
from integration_tests.performance.utils import handle_failure, \
26
dump_test_result
27
28
CONFIG = json.load(open(defs.CFG_LOCATION /
29
"vsock_throughput_test_config.json"))
30
SERVER_STARTUP_TIME = CONFIG["server_startup_time"]
31
VSOCK_UDS_PATH = "v.sock"
32
IPERF3 = "iperf3-vsock"
33
THROUGHPUT = "throughput"
34
DURATION = "duration"
35
BASE_PORT = 5201
36
CPU_UTILIZATION_VMM = "cpu_utilization_vmm"
37
CPU_UTILIZATION_VCPUS_TOTAL = "cpu_utilization_vcpus_total"
38
IPERF3_CPU_UTILIZATION_PERCENT_OUT_TAG = "cpu_utilization_percent"
39
IPERF3_END_RESULTS_TAG = "end"
40
TARGET_TAG = "target"
41
DELTA_PERCENTAGE_TAG = "delta_percentage"
42
THROUGHPUT_UNIT = "Mbps"
43
DURATION_UNIT = "seconds"
44
CPU_UTILIZATION_UNIT = "percentage"
45
46
47
# pylint: disable=R0903
48
class VsockThroughputBaselineProvider(BaselineProvider):
49
"""Implementation of a baseline provider for the vsock throughput...
50
51
...performance test.
52
"""
53
54
def __init__(self, env_id, iperf_id):
55
"""Vsock throughput baseline provider initialization."""
56
cpu_model_name = get_cpu_model_name()
57
baselines = list(filter(
58
lambda cpu_baseline: cpu_baseline["model"] == cpu_model_name,
59
CONFIG["hosts"]["instances"]["m5d.metal"]["cpus"]))
60
super().__init__(DictQuery(dict()))
61
if len(baselines) > 0:
62
super().__init__(DictQuery(baselines[0]))
63
64
self._tag = "baselines/{}/" + env_id + "/{}/" + iperf_id
65
66
def get(self, ms_name: str, st_name: str) -> dict:
67
"""Return the baseline corresponding to the key."""
68
key = self._tag.format(ms_name, st_name)
69
baseline = self._baselines.get(key)
70
if baseline:
71
target = baseline.get("target")
72
delta_percentage = baseline.get("delta_percentage")
73
return {
74
"target": target,
75
"delta": delta_percentage * target / 100,
76
}
77
return None
78
79
80
def produce_iperf_output(basevm,
81
guest_cmd_builder,
82
current_avail_cpu,
83
runtime,
84
omit,
85
load_factor,
86
modes):
87
"""Produce iperf raw output from server-client connection."""
88
# Check if we have enough CPUs to pin the servers on the host.
89
# The available CPUs are the total minus vcpus, vmm and API threads.
90
assert load_factor * basevm.vcpus_count < CpuMap.len() - \
91
basevm.vcpus_count - 2
92
93
host_uds_path = os.path.join(
94
basevm.path,
95
VSOCK_UDS_PATH
96
)
97
98
# Start the servers.
99
for server_idx in range(load_factor*basevm.vcpus_count):
100
assigned_cpu = CpuMap(current_avail_cpu)
101
iperf_server = \
102
CmdBuilder(f"taskset --cpu-list {assigned_cpu}") \
103
.with_arg(IPERF3) \
104
.with_arg("-sD") \
105
.with_arg("--vsock") \
106
.with_arg("-B", host_uds_path) \
107
.with_arg("-p", f"{BASE_PORT + server_idx}") \
108
.with_arg("-1") \
109
.build()
110
111
run_cmd(iperf_server)
112
current_avail_cpu += 1
113
114
# Wait for iperf3 servers to start.
115
time.sleep(SERVER_STARTUP_TIME)
116
117
# Start `vcpus` iperf3 clients. We can not use iperf3 parallel streams
118
# due to non deterministic results and lack of scaling.
119
def spawn_iperf_client(conn, client_idx, mode):
120
# Add the port where the iperf3 client is going to send/receive.
121
cmd = guest_cmd_builder.with_arg(
122
"-p", BASE_PORT + client_idx).with_arg(mode).build()
123
124
# Bind the UDS in the jailer's root.
125
basevm.create_jailed_resource(os.path.join(
126
basevm.path,
127
_make_host_port_path(VSOCK_UDS_PATH, BASE_PORT + client_idx)))
128
129
pinned_cmd = f"taskset --cpu-list {client_idx % basevm.vcpus_count}" \
130
f" {cmd}"
131
rc, stdout, _ = conn.execute_command(pinned_cmd)
132
133
assert rc == 0
134
135
return stdout.read()
136
137
with concurrent.futures.ThreadPoolExecutor() as executor:
138
futures = list()
139
cpu_load_future = executor.submit(get_cpu_percent,
140
basevm.jailer_clone_pid,
141
runtime - SERVER_STARTUP_TIME,
142
omit)
143
144
modes_len = len(modes)
145
ssh_connection = net_tools.SSHConnection(basevm.ssh_config)
146
for client_idx in range(load_factor*basevm.vcpus_count):
147
futures.append(executor.submit(spawn_iperf_client,
148
ssh_connection,
149
client_idx,
150
# Distribute the modes evenly.
151
modes[client_idx % modes_len]))
152
153
cpu_load = cpu_load_future.result()
154
for future in futures[:-1]:
155
res = json.loads(future.result())
156
res[IPERF3_END_RESULTS_TAG][
157
IPERF3_CPU_UTILIZATION_PERCENT_OUT_TAG] = None
158
yield res
159
160
# Attach the real CPU utilization vmm/vcpus to
161
# the last iperf3 server-client pair measurements.
162
res = json.loads(futures[-1].result())
163
164
# We expect a single emulation thread tagged with `firecracker` name.
165
tag = "firecracker"
166
assert tag in cpu_load and len(cpu_load[tag]) == 1
167
thread_id = list(cpu_load[tag])[0]
168
data = cpu_load[tag][thread_id]
169
vmm_util = sum(data)/len(data)
170
cpu_util_perc = res[IPERF3_END_RESULTS_TAG][
171
IPERF3_CPU_UTILIZATION_PERCENT_OUT_TAG] = dict()
172
cpu_util_perc[CPU_UTILIZATION_VMM] = vmm_util
173
174
vcpus_util = 0
175
for vcpu in range(basevm.vcpus_count):
176
# We expect a single fc_vcpu thread tagged with
177
# f`fc_vcpu {vcpu}`.
178
tag = f"fc_vcpu {vcpu}"
179
assert tag in cpu_load and len(cpu_load[tag]) == 1
180
thread_id = list(cpu_load[tag])[0]
181
data = cpu_load[tag][thread_id]
182
vcpus_util += (sum(data)/len(data))
183
184
cpu_util_perc[CPU_UTILIZATION_VCPUS_TOTAL] = vcpus_util
185
186
yield res
187
188
189
def consume_iperf_output(cons, result):
190
"""Consume iperf3 output result for TCP workload."""
191
total_received = result[IPERF3_END_RESULTS_TAG]['sum_received']
192
duration = float(total_received['seconds'])
193
cons.consume_data(DURATION, duration)
194
195
# Computed at the receiving end.
196
total_recv_bytes = int(total_received['bytes'])
197
tput = round((total_recv_bytes*8) / (1024*1024*duration), 2)
198
cons.consume_data(THROUGHPUT, tput)
199
200
cpu_util = result[IPERF3_END_RESULTS_TAG][
201
IPERF3_CPU_UTILIZATION_PERCENT_OUT_TAG]
202
if cpu_util:
203
cpu_util_host = cpu_util[CPU_UTILIZATION_VMM]
204
cpu_util_guest = cpu_util[CPU_UTILIZATION_VCPUS_TOTAL]
205
206
cons.consume_stat("Avg", CPU_UTILIZATION_VMM, cpu_util_host)
207
cons.consume_stat("Avg", CPU_UTILIZATION_VCPUS_TOTAL, cpu_util_guest)
208
209
210
def pipes(basevm, current_avail_cpu, env_id):
211
"""Producer/Consumer pipes generator."""
212
for mode in CONFIG["modes"]:
213
# We run bi-directional tests only on uVM with more than 2 vCPus
214
# because we need to pin one iperf3/direction per vCPU, and since we
215
# have two directions, we need at least two vCPUs.
216
if mode == "bd" and basevm.vcpus_count < 2:
217
continue
218
219
for protocol in CONFIG["protocols"]:
220
for payload_length in protocol["payload_length"]:
221
iperf_guest_cmd_builder = CmdBuilder(IPERF3) \
222
.with_arg("--vsock") \
223
.with_arg("-c", 2) \
224
.with_arg("--json") \
225
.with_arg("--omit", protocol["omit"]) \
226
.with_arg("--time", CONFIG["time"])
227
228
if payload_length != "DEFAULT":
229
iperf_guest_cmd_builder = iperf_guest_cmd_builder \
230
.with_arg("--len", f"{payload_length}")
231
232
iperf3_id = f"vsock-p{payload_length}-{mode}"
233
234
cons = consumer.LambdaConsumer(
235
metadata_provider=DictMetadataProvider(
236
CONFIG["measurements"],
237
VsockThroughputBaselineProvider(env_id, iperf3_id)),
238
func=consume_iperf_output
239
)
240
241
prod_kwargs = {
242
"guest_cmd_builder": iperf_guest_cmd_builder,
243
"basevm": basevm,
244
"current_avail_cpu": current_avail_cpu,
245
"runtime": CONFIG["time"],
246
"omit": protocol["omit"],
247
"load_factor": CONFIG["load_factor"],
248
"modes": CONFIG["modes"][mode],
249
}
250
prod = producer.LambdaProducer(produce_iperf_output,
251
prod_kwargs)
252
yield cons, prod, f"{env_id}/{iperf3_id}"
253
254
255
@pytest.mark.nonci
256
@pytest.mark.timeout(600)
257
def test_vsock_throughput(bin_cloner_path, results_file_dumper):
258
"""Test vsock throughput driver for multiple artifacts."""
259
logger = logging.getLogger("vsock_throughput")
260
artifacts = ArtifactCollection(_test_images_s3_bucket())
261
microvm_artifacts = ArtifactSet(artifacts.microvms(keyword="1vcpu_1024mb"))
262
microvm_artifacts.insert(artifacts.microvms(keyword="2vcpu_1024mb"))
263
kernel_artifacts = ArtifactSet(
264
artifacts.kernels(keyword="vmlinux-4.14.bin"))
265
disk_artifacts = ArtifactSet(artifacts.disks(keyword="ubuntu"))
266
267
# Create a test context and add builder, logger, network.
268
test_context = TestContext()
269
test_context.custom = {
270
'builder': MicrovmBuilder(bin_cloner_path),
271
'logger': logger,
272
'name': 'vsock_throughput',
273
'results_file_dumper': results_file_dumper
274
}
275
276
test_matrix = TestMatrix(context=test_context,
277
artifact_sets=[
278
microvm_artifacts,
279
kernel_artifacts,
280
disk_artifacts
281
])
282
test_matrix.run_test(iperf_workload)
283
284
285
def iperf_workload(context):
286
"""Run a statistic exercise."""
287
vm_builder = context.custom['builder']
288
logger = context.custom["logger"]
289
file_dumper = context.custom['results_file_dumper']
290
291
# Create a rw copy artifact.
292
rw_disk = context.disk.copy()
293
# Get ssh key from read-only artifact.
294
ssh_key = context.disk.ssh_key()
295
# Create a fresh microvm from artifacts.
296
vm_instance = vm_builder.build(kernel=context.kernel,
297
disks=[rw_disk],
298
ssh_key=ssh_key,
299
config=context.microvm)
300
basevm = vm_instance.vm
301
# Create a vsock device
302
basevm.vsock.put(
303
vsock_id="vsock0",
304
guest_cid=3,
305
uds_path="/" + VSOCK_UDS_PATH
306
)
307
308
basevm.start()
309
310
st_core = core.Core(name="vsock_throughput",
311
iterations=1,
312
custom={'cpu_model_name': get_cpu_model_name()})
313
314
# Check if the needed CPU cores are available. We have the API thread, VMM
315
# thread and then one thread for each configured vCPU.
316
assert CpuMap.len() >= 2 + basevm.vcpus_count
317
318
# Pin uVM threads to physical cores.
319
current_avail_cpu = 0
320
assert basevm.pin_vmm(current_avail_cpu), \
321
"Failed to pin firecracker thread."
322
current_avail_cpu += 1
323
assert basevm.pin_api(current_avail_cpu), \
324
"Failed to pin fc_api thread."
325
for i in range(basevm.vcpus_count):
326
current_avail_cpu += 1
327
assert basevm.pin_vcpu(i, current_avail_cpu), \
328
f"Failed to pin fc_vcpu {i} thread."
329
330
logger.info("Testing with microvm: \"{}\", kernel {}, disk {}"
331
.format(context.microvm.name(),
332
context.kernel.name(),
333
context.disk.name()))
334
335
for cons, prod, tag in \
336
pipes(basevm,
337
current_avail_cpu + 1,
338
f"{context.kernel.name()}/{context.disk.name()}/"
339
f"{context.microvm.name()}"):
340
st_core.add_pipe(prod, cons, tag)
341
342
# Start running the commands on guest, gather results and verify pass
343
# criteria.
344
try:
345
result = st_core.run_exercise()
346
except core.CoreException as err:
347
handle_failure(file_dumper, err)
348
349
dump_test_result(file_dumper, result)
350
351
352
def _make_host_port_path(uds_path, port):
353
"""Build the path for a Unix socket, mapped to host vsock port `port`."""
354
return "{}_{}".format(uds_path, port)
355
356