Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/firecracker
Path: blob/main/tests/framework/utils.py
1956 views
1
# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
# SPDX-License-Identifier: Apache-2.0
3
"""Generic utility functions that are used in the framework."""
4
import asyncio
5
import functools
6
import glob
7
import logging
8
import os
9
import re
10
import subprocess
11
import threading
12
import typing
13
import time
14
15
from collections import namedtuple, defaultdict
16
import psutil
17
from retry import retry
18
19
CommandReturn = namedtuple("CommandReturn", "returncode stdout stderr")
20
CMDLOG = logging.getLogger("commands")
21
GET_CPU_LOAD = "top -bn1 -H -p {} | tail -n+8"
22
23
24
class ProcessManager:
25
"""Host process manager.
26
27
TODO: Extend the management to guest processes.
28
TODO: Extend with automated process/cpu_id pinning accountability.
29
"""
30
31
@staticmethod
32
def get_threads(pid: int) -> dict:
33
"""Return dict consisting of child threads."""
34
threads_map = defaultdict(list)
35
proc = psutil.Process(pid)
36
for thread in proc.threads():
37
threads_map[psutil.Process(thread.id).name()].append(thread.id)
38
return threads_map
39
40
@staticmethod
41
def get_cpu_affinity(pid: int) -> list:
42
"""Get CPU affinity for a thread."""
43
return psutil.Process(pid).cpu_affinity()
44
45
@staticmethod
46
def set_cpu_affinity(pid: int, cpulist: list) -> list:
47
"""Set CPU affinity for a thread."""
48
real_cpulist = list(map(CpuMap, cpulist))
49
return psutil.Process(pid).cpu_affinity(real_cpulist)
50
51
@staticmethod
52
def get_cpu_percent(pid: int) -> float:
53
"""Return the instant process CPU utilization percent."""
54
_, stdout, _ = run_cmd(GET_CPU_LOAD.format(pid))
55
cpu_percentages = dict()
56
57
# Take all except the last line
58
lines = stdout.strip().split(sep="\n")
59
for line in lines:
60
info = line.strip().split()
61
# We need at least CPU utilization and threads names cols (which
62
# might be two cols e.g `fc_vcpu 0`).
63
info_len = len(info)
64
assert info_len > 11
65
66
cpu_percent = float(info[8])
67
task_id = info[0]
68
69
# Handles `fc_vcpu 0` case as well.
70
thread_name = info[11] + (" " + info[12] if info_len > 12 else "")
71
if thread_name not in cpu_percentages:
72
cpu_percentages[thread_name] = dict()
73
cpu_percentages[thread_name][task_id] = cpu_percent
74
75
return cpu_percentages
76
77
78
# pylint: disable=R0903
79
class CpuMap:
80
"""Cpu map from real cpu cores to containers visible cores.
81
82
When a docker container is restricted in terms of assigned cpu cores,
83
the information from `/proc/cpuinfo` will present all the cpu cores
84
of the machine instead of showing only the container assigned cores.
85
This class maps the real assigned host cpu cores to virtual cpu cores,
86
starting from 0.
87
"""
88
89
arr = list()
90
91
def __new__(cls, x):
92
"""Instantiate the class field."""
93
assert CpuMap.len() > x
94
if not CpuMap.arr:
95
CpuMap.arr = CpuMap._cpus()
96
return CpuMap.arr[x]
97
98
@staticmethod
99
def len():
100
"""Get the host cpus count."""
101
if not CpuMap.arr:
102
CpuMap.arr = CpuMap._cpus()
103
return len(CpuMap.arr)
104
105
@classmethod
106
def _cpuset_mountpoint(cls):
107
"""Obtain the cpuset mountpoint."""
108
cmd = "cat /proc/mounts | grep cgroup | grep cpuset | cut -d' ' -f2"
109
_, stdout, _ = run_cmd(cmd)
110
return stdout.strip()
111
112
@classmethod
113
def _cpus(cls):
114
"""Obtain the real processor map.
115
116
See this issue for details:
117
https://github.com/moby/moby/issues/20770.
118
"""
119
cmd = "cat {}/cpuset.cpus".format(CpuMap._cpuset_mountpoint())
120
_, cpulist, _ = run_cmd(cmd)
121
return ListFormatParser(cpulist).parse()
122
123
124
class ListFormatParser:
125
"""Parser class for LIST FORMAT strings."""
126
127
def __init__(self, content):
128
"""Initialize the parser with the content."""
129
self._content = content.strip()
130
131
@classmethod
132
def _is_range(cls, rng):
133
"""Return true if the parser content is a range.
134
135
E.g ranges: 0-10.
136
"""
137
match = re.search("([0-9][1-9]*)-([0-9][1-9]*)", rng)
138
# Group is a singular value.
139
return match is not None
140
141
@classmethod
142
def _range_to_list(cls, rng):
143
"""Return a range of integers based on the content.
144
145
The content respects the LIST FORMAT defined in the
146
cpuset documentation.
147
See: https://man7.org/linux/man-pages/man7/cpuset.7.html.
148
"""
149
ends = rng.split("-")
150
if len(ends) != 2:
151
return []
152
153
return list(range(int(ends[0]), int(ends[1]) + 1))
154
155
def parse(self):
156
"""Parse list formats for cpuset and mems.
157
158
See LIST FORMAT here:
159
https://man7.org/linux/man-pages/man7/cpuset.7.html.
160
"""
161
if len(self._content) == 0:
162
return []
163
164
groups = self._content.split(",")
165
arr = set()
166
167
def func(acc, cpu):
168
if ListFormatParser._is_range(cpu):
169
acc.update(ListFormatParser._range_to_list(cpu))
170
else:
171
acc.add(int(cpu))
172
return acc
173
174
return list(functools.reduce(func, groups, arr))
175
176
177
class CmdBuilder:
178
"""Command builder class."""
179
180
def __init__(self, bin_path):
181
"""Initialize the command builder."""
182
self._bin_path = bin_path
183
self._args = {}
184
185
def with_arg(self, flag, value=""):
186
"""Add a new argument."""
187
self._args[flag] = value
188
return self
189
190
def build(self):
191
"""Build the command."""
192
cmd = self._bin_path + " "
193
for flag in self._args:
194
cmd += flag + " " + "{}".format(self._args[flag]) + " "
195
return cmd
196
197
198
class StoppableThread(threading.Thread):
199
"""
200
Thread class with a stop() method.
201
202
The thread itself has to check regularly for the stopped() condition.
203
"""
204
205
def __init__(self, *args, **kwargs):
206
"""Set up a Stoppable thread."""
207
super().__init__(*args, **kwargs)
208
self._should_stop = False
209
210
def stop(self):
211
"""Set that the thread should stop."""
212
self._should_stop = True
213
214
def stopped(self):
215
"""Check if the thread was stopped."""
216
return self._should_stop
217
218
219
# pylint: disable=R0903
220
class DictQuery:
221
"""Utility class to query python dicts key paths.
222
223
The keys from the path must be `str`s.
224
Example:
225
> d = {
226
"a": {
227
"b": {
228
"c": 0
229
}
230
},
231
"d": 1
232
}
233
> dq = DictQuery(d)
234
> print(dq.get("a/b/c"))
235
0
236
> print(dq.get("d"))
237
1
238
"""
239
240
def __init__(self, d: dict):
241
"""Initialize the dict query."""
242
self._inner = d
243
244
def get(self, keys_path: str, default=None):
245
"""Retrieve value corresponding to the key path."""
246
keys = keys_path.strip().split("/")
247
if len(keys) < 1:
248
return default
249
250
result = self._inner
251
for key in keys:
252
if not result:
253
return default
254
255
result = result.get(key)
256
257
return result
258
259
def __str__(self):
260
"""Representation as a string."""
261
return str(self._inner)
262
263
264
class ExceptionAggregator(Exception):
265
"""Abstraction over an exception with message formatter."""
266
267
def __init__(self, add_newline=False):
268
"""Initialize the exception aggregator."""
269
super().__init__()
270
self.failures = list()
271
272
# If `add_newline` is True then the failures will start one row below,
273
# in the logs. This is useful for having the failures starting on an
274
# empty line, keeping the formatting nice and clean.
275
if add_newline:
276
self.failures.append("")
277
278
def add_row(self, failure: str):
279
"""Add a failure entry."""
280
self.failures.append(f"{failure}")
281
282
def has_any(self) -> bool:
283
"""Return whether there are failures or not."""
284
if len(self.failures) == 1:
285
return self.failures[0] != ""
286
287
return len(self.failures) > 1
288
289
def __str__(self):
290
"""Return custom as string implementation."""
291
return "\n\n".join(self.failures)
292
293
294
def search_output_from_cmd(cmd: str,
295
find_regex: typing.Pattern) -> typing.Match:
296
"""
297
Run a shell command and search a given regex object in stdout.
298
299
If the regex object is not found, a RuntimeError exception is raised.
300
301
:param cmd: command to run
302
:param find_regex: regular expression object to search for
303
:return: result of re.search()
304
"""
305
# Run the given command in a shell
306
_, stdout, _ = run_cmd(cmd)
307
308
# Search for the object
309
content = re.search(find_regex, stdout)
310
311
# If the result is not None, return it
312
if content:
313
return content
314
315
raise RuntimeError("Could not find '%s' in output for '%s'" %
316
(find_regex.pattern, cmd))
317
318
319
def get_files_from(find_path: str, pattern: str, exclude_names: list = None,
320
recursive: bool = True):
321
"""
322
Return a list of files from a given path, recursively.
323
324
:param find_path: path where to look for files
325
:param pattern: what pattern to apply to file names
326
:param exclude_names: folder names to exclude
327
:param recursive: do a recursive search for the given pattern
328
:return: list of found files
329
"""
330
found = []
331
# For each directory in the given path
332
for path_dir in os.scandir(find_path):
333
# Check if it should be skipped
334
if path_dir.name in exclude_names or os.path.isfile(path_dir):
335
continue
336
# Run glob inside the folder with the given pattern
337
found.extend(
338
glob.glob(f"{find_path}/{path_dir.name}/**/{pattern}",
339
recursive=recursive))
340
# scandir will not look at the files matching the pattern in the
341
# current directory.
342
found.extend(
343
glob.glob(f"{find_path}/./{pattern}"))
344
return found
345
346
347
def get_free_mem_ssh(ssh_connection):
348
"""
349
Get how much free memory in kB a guest sees, over ssh.
350
351
:param ssh_connection: connection to the guest
352
:return: available mem column output of 'free'
353
"""
354
_, stdout, stderr = ssh_connection.execute_command(
355
'cat /proc/meminfo | grep MemAvailable'
356
)
357
assert stderr.read() == ''
358
359
# Split "MemAvailable: 123456 kB" and validate it
360
meminfo_data = stdout.read().split()
361
if len(meminfo_data) == 3:
362
# Return the middle element in the array
363
return int(meminfo_data[1])
364
365
raise Exception('Available memory not found in `/proc/meminfo')
366
367
368
def run_cmd_sync(cmd, ignore_return_code=False, no_shell=False):
369
"""
370
Execute a given command.
371
372
:param cmd: command to execute
373
:param ignore_return_code: whether a non-zero return code should be ignored
374
:param noshell: don't run the command in a sub-shell
375
:return: return code, stdout, stderr
376
"""
377
if isinstance(cmd, list) or no_shell:
378
# Create the async process
379
proc = subprocess.Popen(
380
cmd,
381
stdout=subprocess.PIPE,
382
stderr=subprocess.PIPE)
383
else:
384
proc = subprocess.Popen(
385
cmd,
386
shell=True,
387
stdout=subprocess.PIPE,
388
stderr=subprocess.PIPE)
389
390
# Capture stdout/stderr
391
stdout, stderr = proc.communicate()
392
393
output_message = f"\n[{proc.pid}] Command:\n{cmd}"
394
# Append stdout/stderr to the output message
395
if stdout != "":
396
output_message += f"\n[{proc.pid}] stdout:\n{stdout.decode()}"
397
if stderr != "":
398
output_message += f"\n[{proc.pid}] stderr:\n{stderr.decode()}"
399
400
# If a non-zero return code was thrown, raise an exception
401
if not ignore_return_code and proc.returncode != 0:
402
output_message += \
403
f"\nReturned error code: {proc.returncode}"
404
405
if stderr != "":
406
output_message += \
407
f"\nstderr:\n{stderr.decode()}"
408
raise ChildProcessError(output_message)
409
410
# Log the message with one call so that multiple statuses
411
# don't get mixed up
412
CMDLOG.debug(output_message)
413
414
return CommandReturn(
415
proc.returncode,
416
stdout.decode(),
417
stderr.decode())
418
419
420
async def run_cmd_async(cmd, ignore_return_code=False, no_shell=False):
421
"""
422
Create a coroutine that executes a given command.
423
424
:param cmd: command to execute
425
:param ignore_return_code: whether a non-zero return code should be ignored
426
:param noshell: don't run the command in a sub-shell
427
:return: return code, stdout, stderr
428
"""
429
if isinstance(cmd, list) or no_shell:
430
# Create the async process
431
proc = await asyncio.create_subprocess_exec(
432
*cmd,
433
stdout=asyncio.subprocess.PIPE,
434
stderr=asyncio.subprocess.PIPE)
435
else:
436
proc = await asyncio.create_subprocess_shell(
437
cmd,
438
stdout=asyncio.subprocess.PIPE,
439
stderr=asyncio.subprocess.PIPE)
440
441
# Capture stdout/stderr
442
stdout, stderr = await proc.communicate()
443
444
output_message = f"\n[{proc.pid}] Command:\n{cmd}"
445
# Append stdout/stderr to the output message
446
if stdout.decode() != "":
447
output_message += f"\n[{proc.pid}] stdout:\n{stdout.decode()}"
448
if stderr.decode() != "":
449
output_message += f"\n[{proc.pid}] stderr:\n{stderr.decode()}"
450
451
# If a non-zero return code was thrown, raise an exception
452
if not ignore_return_code and proc.returncode != 0:
453
output_message += \
454
f"\nReturned error code: {proc.returncode}"
455
456
if stderr.decode() != "":
457
output_message += \
458
f"\nstderr:\n{stderr.decode()}"
459
raise ChildProcessError(output_message)
460
461
# Log the message with one call so that multiple statuses
462
# don't get mixed up
463
CMDLOG.debug(output_message)
464
465
return CommandReturn(
466
proc.returncode,
467
stdout.decode(),
468
stderr.decode())
469
470
471
def run_cmd_list_async(cmd_list):
472
"""
473
Run a list of commands asynchronously and wait for them to finish.
474
475
:param cmd_list: list of commands to execute
476
:return: None
477
"""
478
try:
479
loop = asyncio.get_event_loop()
480
except RuntimeError:
481
# Create event loop when one is not available
482
loop = asyncio.new_event_loop()
483
asyncio.set_event_loop(loop)
484
485
cmds = []
486
# Create a list of partial functions to run
487
for cmd in cmd_list:
488
cmds.append(run_cmd_async(cmd))
489
490
# Wait until all are complete
491
loop.run_until_complete(
492
asyncio.gather(
493
*cmds
494
)
495
)
496
497
498
def run_cmd(cmd, ignore_return_code=False, no_shell=False):
499
"""
500
Run a command using the sync function that logs the output.
501
502
:param cmd: command to run
503
:param ignore_return_code: whether a non-zero return code should be ignored
504
:param noshell: don't run the command in a sub-shell
505
:returns: tuple of (return code, stdout, stderr)
506
"""
507
return run_cmd_sync(cmd=cmd,
508
ignore_return_code=ignore_return_code,
509
no_shell=no_shell)
510
511
512
def eager_map(func, iterable):
513
"""Map version for Python 3.x which is eager and returns nothing."""
514
for _ in map(func, iterable):
515
continue
516
517
518
def assert_seccomp_level(pid, seccomp_level):
519
"""Test that seccomp_level applies to all threads of a process."""
520
# Get number of threads
521
cmd = 'ps -T --no-headers -p {} | awk \'{{print $2}}\''.format(
522
pid
523
)
524
process = run_cmd(cmd)
525
threads_out_lines = process.stdout.splitlines()
526
for tid in threads_out_lines:
527
# Verify each thread's Seccomp status
528
cmd = 'cat /proc/{}/status | grep Seccomp'.format(tid)
529
process = run_cmd(cmd)
530
seccomp_line = ''.join(process.stdout.split())
531
assert seccomp_line == "Seccomp:" + seccomp_level
532
533
534
def get_cpu_percent(pid: int, iterations: int, omit: int) -> dict:
535
"""Get total PID CPU percentage, as in system time plus user time.
536
537
If the PID has corresponding threads, creates a dictionary with the
538
lists of instant loads for each thread.
539
"""
540
assert iterations > 0
541
time.sleep(omit)
542
cpu_percentages = dict()
543
for _ in range(iterations):
544
current_cpu_percentages = ProcessManager.get_cpu_percent(pid)
545
assert len(current_cpu_percentages) > 0
546
547
for thread_name in current_cpu_percentages:
548
if not cpu_percentages.get(thread_name):
549
cpu_percentages[thread_name] = dict()
550
for task_id in current_cpu_percentages[thread_name]:
551
if not cpu_percentages[thread_name].get(task_id):
552
cpu_percentages[thread_name][task_id] = list()
553
cpu_percentages[thread_name][task_id].append(
554
current_cpu_percentages[thread_name][task_id])
555
time.sleep(1) # 1 second granularity.
556
return cpu_percentages
557
558
559
@retry(delay=0.5, tries=5)
560
def wait_process_termination(p_pid):
561
"""Wait for a process to terminate.
562
563
Will return sucessfully if the process
564
got indeed killed or raises an exception if the process
565
is still alive after retrying several times.
566
"""
567
try:
568
_, stdout, _ = run_cmd("ps --pid {} -o comm=".format(p_pid))
569
except ChildProcessError:
570
return
571
raise Exception("{} process is still alive: ".format(stdout.strip()))
572
573
574
def get_firecracker_version_from_toml():
575
"""
576
Return the version of the firecracker crate, from Cargo.toml.
577
578
Usually different from the output of `./firecracker --version`, if
579
the code has not been released.
580
"""
581
cmd = "cd ../src/firecracker && cargo pkgid | cut -d# -f2 | cut -d: -f2"
582
583
rc, stdout, _ = run_cmd(cmd)
584
assert rc == 0
585
586
return stdout
587
588
589
def compare_versions(first, second):
590
"""
591
Compare two versions with format `X.Y.Z`.
592
593
:param first: first version string
594
:param second: second version string
595
:returns: 0 if equal, <0 if first < second, >0 if second < first
596
"""
597
first = list(map(int, first.split('.')))
598
second = list(map(int, second.split('.')))
599
600
if first[0] == second[0]:
601
if first[1] == second[1]:
602
if first[2] == second[2]:
603
return 0
604
605
return first[2] - second[2]
606
607
return first[1] - second[1]
608
609
return first[0] - second[0]
610
611