Path: blob/master/stream/stream_monitor.py
641 views
import argparse1import json2import logging3import os4import re5import subprocess6import sys7import threading8import time9from datetime import datetime, timedelta10from functools import partial11from http.server import BaseHTTPRequestHandler, HTTPServer1213LOG_LEVEL = os.environ.get("LOGGING", "INFO").upper()1415logging.basicConfig(16stream=sys.stdout,17level=LOG_LEVEL,18format="%(asctime)s %(levelname)s %(name)s %(threadName)s : %(message)s",19)2021_state = {"container_active": False, "last_log_times": {}}2223log_line_regex = r"(\w+):([^:]*):(.*):\b"24compiled = re.compile(log_line_regex)252627class GetHandler(BaseHTTPRequestHandler):28def __init__(self, offline_diff_duration, *args, **kwargs):29self.offline_diff_duration = offline_diff_duration30super().__init__(*args, **kwargs)3132def do_GET(self):33self.send_response(200)34self.send_header("Content-type", "application/json")35self.end_headers()36logging.debug("state: ")37logging.debug(_state)3839online_time = datetime.now() - timedelta(seconds=self.offline_diff_duration)40response = {"active": _state["container_running"]}4142cameras = {}4344for k, v in _state["last_log_times"].items():45logging.debug(f"Comparing times: [{v}] and [{online_time}]")46if online_time < v:47cameras[k] = {"status": "running"}48else:49cameras[k] = {"status": "offline"}5051response["cameras"] = cameras5253logging.debug("response")54logging.debug(response)55self.wfile.write(json.dumps(response).encode())56self.wfile.write(b"\n")575859def parse_log_line(line):60"""61:param line: Stream log line62:return: Log split into [level, camera, time]63"""64m = compiled.match(line)65if m:66groups = m.groups()67return [groups[0], groups[1], groups[2]]686970def monitor_worker(container_name, check_interval, log_lines):71captures = 072previous_log_time = None7374while True:75result = subprocess.run(76["docker", "logs", "--tail", str(log_lines * 2), container_name],77stdout=subprocess.PIPE,78stderr=subprocess.STDOUT,79)8081docker_log = result.stdout.decode("utf-8")82logging.debug(f"docker_log: {docker_log}")8384if "No such container" in docker_log:85_state["container_running"] = False86time.sleep(check_interval)87continue88else:89_state["container_running"] = True9091if len(docker_log) > 0:92for line in docker_log.splitlines():93log_line = parse_log_line(line)94logging.debug(f"log_line: {log_line}")95if log_line:96log_time = log_line[2]97if previous_log_time and log_time == previous_log_time:98logging.debug("No new logs detected")99else:100camera = log_line[1]101_state["last_log_times"][camera] = datetime.now()102captures += 1103previous_log_time = log_time104else:105logging.debug("Log lines empty")106107time.sleep(check_interval)108logging.debug(f"Captures: {captures}")109110111def server_worker(host, port, offline_diff_duration):112handler = partial(GetHandler, offline_diff_duration)113server = HTTPServer((host, port), handler)114logging.info("Starting server, use <Ctrl-C> to stop")115server.serve_forever()116117118if __name__ == "__main__":119parser = argparse.ArgumentParser()120# Container Name121parser.add_argument(122"-c",123"--container",124type=str,125default="stream",126help="Stream Container Name or ID",127)128# Server listening HOST and PORT129parser.add_argument(130"-l", "--listen", type=str, default="localhost", help="Server listen address"131)132parser.add_argument(133"-p", "--port", type=int, default=8001, help="Server listen port"134)135# Check Interval136parser.add_argument(137"-i",138"--interval",139type=int,140default=2,141help="Interval between reading logs in seconds",142)143# Active Duration144parser.add_argument(145"-d",146"--duration",147type=int,148default=20,149help="Duration to use in considering a camera as offline in seconds",150)151# Log Lines to Process152parser.add_argument(153"-n", "--count", type=int, default=1, help="Number of cameras in config.ini"154)155156args = parser.parse_args()157monitor = threading.Thread(158target=monitor_worker, args=(args.container, args.interval, args.count)159)160server = threading.Thread(161target=server_worker, args=(args.listen, args.port, args.duration)162)163monitor.start()164server.start()165166167