Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
parkpow
GitHub Repository: parkpow/deep-license-plate-recognition
Path: blob/master/stream/stream_monitor.py
641 views
1
import argparse
2
import json
3
import logging
4
import os
5
import re
6
import subprocess
7
import sys
8
import threading
9
import time
10
from datetime import datetime, timedelta
11
from functools import partial
12
from http.server import BaseHTTPRequestHandler, HTTPServer
13
14
LOG_LEVEL = os.environ.get("LOGGING", "INFO").upper()
15
16
logging.basicConfig(
17
stream=sys.stdout,
18
level=LOG_LEVEL,
19
format="%(asctime)s %(levelname)s %(name)s %(threadName)s : %(message)s",
20
)
21
22
_state = {"container_active": False, "last_log_times": {}}
23
24
log_line_regex = r"(\w+):([^:]*):(.*):\b"
25
compiled = re.compile(log_line_regex)
26
27
28
class GetHandler(BaseHTTPRequestHandler):
29
def __init__(self, offline_diff_duration, *args, **kwargs):
30
self.offline_diff_duration = offline_diff_duration
31
super().__init__(*args, **kwargs)
32
33
def do_GET(self):
34
self.send_response(200)
35
self.send_header("Content-type", "application/json")
36
self.end_headers()
37
logging.debug("state: ")
38
logging.debug(_state)
39
40
online_time = datetime.now() - timedelta(seconds=self.offline_diff_duration)
41
response = {"active": _state["container_running"]}
42
43
cameras = {}
44
45
for k, v in _state["last_log_times"].items():
46
logging.debug(f"Comparing times: [{v}] and [{online_time}]")
47
if online_time < v:
48
cameras[k] = {"status": "running"}
49
else:
50
cameras[k] = {"status": "offline"}
51
52
response["cameras"] = cameras
53
54
logging.debug("response")
55
logging.debug(response)
56
self.wfile.write(json.dumps(response).encode())
57
self.wfile.write(b"\n")
58
59
60
def parse_log_line(line):
61
"""
62
:param line: Stream log line
63
:return: Log split into [level, camera, time]
64
"""
65
m = compiled.match(line)
66
if m:
67
groups = m.groups()
68
return [groups[0], groups[1], groups[2]]
69
70
71
def monitor_worker(container_name, check_interval, log_lines):
72
captures = 0
73
previous_log_time = None
74
75
while True:
76
result = subprocess.run(
77
["docker", "logs", "--tail", str(log_lines * 2), container_name],
78
stdout=subprocess.PIPE,
79
stderr=subprocess.STDOUT,
80
)
81
82
docker_log = result.stdout.decode("utf-8")
83
logging.debug(f"docker_log: {docker_log}")
84
85
if "No such container" in docker_log:
86
_state["container_running"] = False
87
time.sleep(check_interval)
88
continue
89
else:
90
_state["container_running"] = True
91
92
if len(docker_log) > 0:
93
for line in docker_log.splitlines():
94
log_line = parse_log_line(line)
95
logging.debug(f"log_line: {log_line}")
96
if log_line:
97
log_time = log_line[2]
98
if previous_log_time and log_time == previous_log_time:
99
logging.debug("No new logs detected")
100
else:
101
camera = log_line[1]
102
_state["last_log_times"][camera] = datetime.now()
103
captures += 1
104
previous_log_time = log_time
105
else:
106
logging.debug("Log lines empty")
107
108
time.sleep(check_interval)
109
logging.debug(f"Captures: {captures}")
110
111
112
def server_worker(host, port, offline_diff_duration):
113
handler = partial(GetHandler, offline_diff_duration)
114
server = HTTPServer((host, port), handler)
115
logging.info("Starting server, use <Ctrl-C> to stop")
116
server.serve_forever()
117
118
119
if __name__ == "__main__":
120
parser = argparse.ArgumentParser()
121
# Container Name
122
parser.add_argument(
123
"-c",
124
"--container",
125
type=str,
126
default="stream",
127
help="Stream Container Name or ID",
128
)
129
# Server listening HOST and PORT
130
parser.add_argument(
131
"-l", "--listen", type=str, default="localhost", help="Server listen address"
132
)
133
parser.add_argument(
134
"-p", "--port", type=int, default=8001, help="Server listen port"
135
)
136
# Check Interval
137
parser.add_argument(
138
"-i",
139
"--interval",
140
type=int,
141
default=2,
142
help="Interval between reading logs in seconds",
143
)
144
# Active Duration
145
parser.add_argument(
146
"-d",
147
"--duration",
148
type=int,
149
default=20,
150
help="Duration to use in considering a camera as offline in seconds",
151
)
152
# Log Lines to Process
153
parser.add_argument(
154
"-n", "--count", type=int, default=1, help="Number of cameras in config.ini"
155
)
156
157
args = parser.parse_args()
158
monitor = threading.Thread(
159
target=monitor_worker, args=(args.container, args.interval, args.count)
160
)
161
server = threading.Thread(
162
target=server_worker, args=(args.listen, args.port, args.duration)
163
)
164
monitor.start()
165
server.start()
166
167