Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/firecracker
Path: blob/main/tests/host_tools/logging.py
1956 views
1
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2
# SPDX-License-Identifier: Apache-2.0
3
"""Utilities for testing the logging system (metrics, common logs)."""
4
5
import fcntl
6
import os
7
import sys
8
9
from queue import Queue
10
from threading import Thread
11
12
13
class Fifo:
14
"""Facility for creating and working with named pipes (FIFOs)."""
15
16
path = None
17
fifo = None
18
19
def __init__(self, path, blocking=False):
20
"""Create a new named pipe."""
21
if os.path.exists(path):
22
raise FileExistsError("Named pipe {} already exists.".format(path))
23
24
os.mkfifo(path)
25
if not blocking:
26
fd = os.open(path, os.O_NONBLOCK)
27
self.fifo = os.fdopen(fd, "r")
28
else:
29
self.fifo = open(path, "r")
30
31
self.path = path
32
33
def sequential_reader(self, max_lines):
34
"""Return up to `max_lines` lines from a non blocking fifo.
35
36
:return: A list containing the read lines.
37
"""
38
return self.fifo.readlines()[:max_lines]
39
40
@property
41
def flags(self):
42
"""Return flags of the opened fifo.
43
44
:return An integer with flags of the opened file.
45
"""
46
fd = self.fifo.fileno()
47
return fcntl.fcntl(fd, fcntl.F_GETFL)
48
49
@flags.setter
50
def flags(self, flags):
51
"""Set new flags for the opened fifo."""
52
fd = self.fifo.fileno()
53
fcntl.fcntl(fd, fcntl.F_SETFL, flags)
54
55
def threaded_reader(self, check_func, *args):
56
"""Start a thread to read fifo.
57
58
The thread that runs the `check_func` on each line
59
in the FIFO and enqueues any exceptions in the `exceptions_queue`.
60
"""
61
exceptions_queue = Queue()
62
metric_reader_thread = Thread(
63
target=self._do_thread_reader, args=(
64
exceptions_queue,
65
check_func,
66
*args
67
)
68
)
69
metric_reader_thread.start()
70
return exceptions_queue
71
72
def _do_thread_reader(self, exceptions_queue, check_func, *args):
73
"""Read from a FIFO opened as read-only.
74
75
This applies a function for checking output on each
76
line of the logs received.
77
Failures and exceptions are propagated to the main thread
78
through the `exceptions_queue`.
79
"""
80
max_iter = 20
81
while max_iter > 0:
82
data = self.fifo.readline()
83
if not data:
84
break
85
try:
86
check_func(
87
"{0}".format(data), *args
88
)
89
# pylint: disable=broad-except
90
# We need to propagate all type of exceptions to the main thread.
91
except Exception:
92
exceptions_queue.put(sys.exc_info())
93
max_iter = max_iter-1
94
exceptions_queue.put("Done")
95
96
def __del__(self):
97
"""Destructor cleaning up the FIFO from where it was created."""
98
if self.path:
99
try:
100
os.remove(self.path)
101
except OSError:
102
pass
103
104