Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/firecracker
Path: blob/main/tests/framework/utils_vsock.py
1956 views
1
# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
# SPDX-License-Identifier: Apache-2.0
3
"""Helper functions for testing vsock device."""
4
5
import hashlib
6
import os.path
7
from select import select
8
from socket import socket, AF_UNIX, SOCK_STREAM
9
from threading import Thread, Event
10
import re
11
12
from host_tools.network import SSHConnection
13
14
ECHO_SERVER_PORT = 5252
15
SERVER_ACCEPT_BACKLOG = 128
16
TEST_CONNECTION_COUNT = 50
17
BLOB_SIZE = 20 * 1024 * 1024
18
BUF_SIZE = 64 * 1024
19
20
21
class HostEchoServer(Thread):
22
"""A simple "echo" server for vsock.
23
24
This server will accept incoming connections (initiated by the guest vm),
25
and, for each connection, it will read any incoming data and then echo it
26
right back.
27
"""
28
29
def __init__(self, vm, path):
30
"""."""
31
super().__init__()
32
self.vm = vm
33
self.sock = socket(AF_UNIX, SOCK_STREAM)
34
self.sock.bind(path)
35
self.sock.listen(SERVER_ACCEPT_BACKLOG)
36
self.error = None
37
self.clients = []
38
self.exit_evt = Event()
39
40
# Link the listening Unix socket into the VM's jail, so that
41
# Firecracker can connect to it.
42
vm.create_jailed_resource(path)
43
44
def run(self):
45
"""Thread code payload.
46
47
Wrap up the real "run" into a catch-all block, because Python cannot
48
into threads - if this thread were to raise an unhandled exception,
49
the whole process would lock.
50
"""
51
try:
52
self._run()
53
# pylint: disable=broad-except
54
except Exception as err:
55
self.error = err
56
57
def _run(self):
58
while not self.exit_evt.is_set():
59
watch_list = self.clients + [self.sock]
60
rd_list, _, _ = select(watch_list, [], [], 1)
61
for rdo in rd_list:
62
if rdo == self.sock:
63
# Read event on the listening socket: a new client
64
# wants to connect.
65
(client, _) = self.sock.accept()
66
self.clients.append(client)
67
continue
68
# Read event on a connected socket: new data is
69
# available from some client.
70
buf = rdo.recv(BUF_SIZE)
71
if not buf:
72
# Zero-length read: connection reset by peer.
73
self.clients.remove(rdo)
74
continue
75
sent = 0
76
while sent < len(buf):
77
# Send back everything we just read.
78
sent += rdo.send(buf[sent:])
79
80
def exit(self):
81
"""Shut down the echo server and wait for it to exit.
82
83
This method can be called from any thread. Upon returning, the
84
echo server will have shut down.
85
"""
86
self.exit_evt.set()
87
self.join()
88
89
90
class HostEchoWorker(Thread):
91
"""A vsock echo worker, connecting to a guest echo server.
92
93
This will initiate a connection to a guest echo server, then start sending
94
it the contents of the file at `blob_path`. The echo server should send
95
the exact same data back, so a hash is performed on everything received
96
from the server. This hash will later be checked against the hashed
97
contents of `blob_path`.
98
"""
99
100
def __init__(self, uds_path, blob_path):
101
"""."""
102
super().__init__()
103
self.uds_path = uds_path
104
self.blob_path = blob_path
105
self.hash = None
106
self.error = None
107
self.sock = _vsock_connect_to_guest(self.uds_path, ECHO_SERVER_PORT)
108
109
def run(self):
110
"""Thread code payload.
111
112
Wrap up the real "run" into a catch-all block, because Python cannot
113
into threads - if this thread were to raise an unhandled exception,
114
the whole process would lock.
115
"""
116
try:
117
self._run()
118
# pylint: disable=broad-except
119
except Exception as err:
120
self.error = err
121
122
def close_uds(self):
123
"""Close vsock UDS connection."""
124
self.sock.close()
125
126
def _run(self):
127
blob_file = open(self.blob_path, 'rb')
128
hash_obj = hashlib.md5()
129
130
while True:
131
132
buf = blob_file.read(BUF_SIZE)
133
if not buf:
134
break
135
136
sent = self.sock.send(buf)
137
while sent < len(buf):
138
sent += self.sock.send(buf[sent:])
139
140
buf = self.sock.recv(sent)
141
while len(buf) < sent:
142
buf += self.sock.recv(sent - len(buf))
143
144
hash_obj.update(buf)
145
146
self.hash = hash_obj.hexdigest()
147
148
149
def make_blob(dst_dir):
150
"""Generate a random data file."""
151
blob_path = os.path.join(dst_dir, "vsock-test.blob")
152
blob_file = open(blob_path, 'wb')
153
left = BLOB_SIZE
154
blob_hash = hashlib.md5()
155
while left > 0:
156
count = min(left, 4096)
157
buf = os.urandom(count)
158
blob_hash.update(buf)
159
blob_file.write(buf)
160
left -= count
161
blob_file.close()
162
163
return blob_path, blob_hash.hexdigest()
164
165
166
def check_host_connections(vm, uds_path, blob_path, blob_hash):
167
"""Test host-initiated connections.
168
169
This will start a daemonized echo server on the guest VM, and then spawn
170
`TEST_CONNECTION_COUNT` `HostEchoWorker` threads.
171
After the workers are done transferring the data read from `blob_path`,
172
the hashes they computed for the data echoed back by the server are
173
checked against `blob_hash`.
174
"""
175
conn = SSHConnection(vm.ssh_config)
176
cmd = "vsock_helper echosrv -d {}". format(ECHO_SERVER_PORT)
177
ecode, _, _ = conn.execute_command(cmd)
178
assert ecode == 0
179
180
workers = []
181
for _ in range(TEST_CONNECTION_COUNT):
182
worker = HostEchoWorker(uds_path, blob_path)
183
workers.append(worker)
184
worker.start()
185
186
for wrk in workers:
187
wrk.join()
188
189
for wrk in workers:
190
assert wrk.hash == blob_hash
191
192
193
def check_guest_connections(vm, server_port_path, blob_path, blob_hash):
194
"""Test guest-initiated connections.
195
196
This will start an echo server on the host (in its own thread), then
197
start `TEST_CONNECTION_COUNT` workers inside the guest VM, all
198
communicating with the echo server.
199
"""
200
echo_server = HostEchoServer(vm, server_port_path)
201
echo_server.start()
202
conn = SSHConnection(vm.ssh_config)
203
204
# Increase maximum process count for the ssh service.
205
# Avoids: "bash: fork: retry: Resource temporarily unavailable"
206
# Needed to execute the bash script that tests for concurrent
207
# vsock guest initiated connections.
208
ecode, _, _ = conn.execute_command("echo 1024 > \
209
/sys/fs/cgroup/pids/system.slice/ssh.service/pids.max")
210
assert ecode == 0, "Unable to set max process count for guest ssh service."
211
212
# Build the guest worker sub-command.
213
# `vsock_helper` will read the blob file from STDIN and send the echo
214
# server response to STDOUT. This response is then hashed, and the
215
# hash is compared against `blob_hash` (computed on the host). This
216
# comparison sets the exit status of the worker command.
217
worker_cmd = "hash=$("
218
worker_cmd += "cat {}".format(blob_path)
219
worker_cmd += " | vsock_helper echo 2 {}".format(ECHO_SERVER_PORT)
220
worker_cmd += " | md5sum | cut -f1 -d\\ "
221
worker_cmd += ")"
222
worker_cmd += " && [[ \"$hash\" = \"{}\" ]]".format(blob_hash)
223
224
# Run `TEST_CONNECTION_COUNT` concurrent workers, using the above
225
# worker sub-command.
226
# If any worker fails, this command will fail. If all worker sub-commands
227
# succeed, this will also succeed.
228
cmd = "workers=\"\";"
229
cmd += "for i in $(seq 1 {}); do".format(TEST_CONNECTION_COUNT)
230
cmd += " ({})& ".format(worker_cmd)
231
cmd += " workers=\"$workers $!\";"
232
cmd += "done;"
233
cmd += "for w in $workers; do wait $w || exit -1; done"
234
235
ecode, _, _ = conn.execute_command(cmd)
236
237
echo_server.exit()
238
assert echo_server.error is None
239
240
assert ecode == 0, ecode
241
242
243
def _vsock_connect_to_guest(uds_path, port):
244
"""Return a Unix socket, connected to the guest vsock port `port`."""
245
sock = socket(AF_UNIX, SOCK_STREAM)
246
sock.connect(uds_path)
247
248
buf = bytearray("CONNECT {}\n".format(port).encode("utf-8"))
249
sock.send(buf)
250
251
ack_buf = sock.recv(32)
252
assert re.match("^OK [0-9]+\n$", ack_buf.decode('utf-8')) is not None
253
254
return sock
255
256