Path: blob/master/webhooks/middleware/consumer.py
1080 views
# common_webhook_consumer.py1import atexit2import hmac3import importlib4import json5import logging6import os7import signal8import subprocess9import sys10from logging.handlers import RotatingFileHandler11from typing import Any1213from flask import Flask, Response, jsonify, request, stream_with_context14from waitress import serve # type: ignore1516LOG_FILE = "/tmp/middleware.log"1718logging.basicConfig(19level=logging.INFO,20format="%(asctime)s - %(levelname)s - %(message)s",21handlers=[22logging.StreamHandler(),23RotatingFileHandler(LOG_FILE, maxBytes=100 * 1024 * 1024, backupCount=1),24],25)2627app = Flask(__name__)2829middleware: Any | None = None303132def cleanup_middleware():33"""Cleanup middleware resources on shutdown."""34global middleware35if middleware and hasattr(middleware, "shutdown"):36logging.info("Shutting down middleware...")37try:38middleware.shutdown()39except Exception as e:40logging.error(f"Error during middleware shutdown: {e}")414243def signal_handler(signum, frame):44"""Handle termination signals gracefully."""45logging.info(f"Received signal {signum}, initiating graceful shutdown...")46cleanup_middleware()47sys.exit(0)484950def load_middleware():51middleware_name = os.getenv("MIDDLEWARE_NAME")52if not middleware_name:53logging.error("MIDDLEWARE_NAME environment variable is not set.")54return None5556try:57middleware = importlib.import_module(f"protocols.{middleware_name}")58logging.info(f"Using middleware: {middleware_name}")5960if hasattr(middleware, "initialize"):61middleware.initialize()6263return middleware64except ModuleNotFoundError:65logging.error(f"Middleware '{middleware_name}' not found.")66return None676869@app.route("/health", methods=["GET"])70def health_check():71"""Health check endpoint for load balancers and monitoring (front_rear only)."""72middleware_name = os.getenv("MIDDLEWARE_NAME")73if middleware_name != "front_rear":74return jsonify({"error": "Not found"}), 4047576global middleware77if not middleware:78return jsonify({"status": "unhealthy", "reason": "Middleware not loaded"}), 50379return jsonify({"status": "healthy", "middleware": middleware_name}), 200808182@app.route("/logs", methods=["GET"])83def stream_logs():84"""Stream logs in real-time as a plain text stream (like `docker logs -f`)."""85auth_header = request.headers.get("Authorization", "")86auth_token = auth_header.replace("Token ", "").replace("Bearer ", "")87admin_token = os.getenv("ADMIN_TOKEN")8889if not admin_token:90return jsonify({"error": "Admin access not configured"}), 5039192if not hmac.compare_digest(auth_token, admin_token):93return jsonify({"error": "Unauthorized"}), 4019495def generate():96"""Generate log stream in plain text format."""97lines_str = request.args.get("lines", "50")98if not lines_str.isdigit():99yield "Error: 'lines' parameter must be an integer.\n"100return101lines = lines_str102103try:104proc = subprocess.Popen(105["tail", "-f", "-n", lines, LOG_FILE],106stdout=subprocess.PIPE,107stderr=subprocess.STDOUT,108universal_newlines=True,109)110111if proc.stdout:112try:113yield from proc.stdout114except GeneratorExit:115proc.terminate()116proc.wait()117except Exception as e:118yield f"Error streaming logs: {e}\n"119120return Response(121stream_with_context(generate()),122mimetype="text/plain",123headers={124"Cache-Control": "no-cache",125"X-Accel-Buffering": "no", # Disable nginx buffering126},127)128129130@app.route("/", methods=["POST"])131def handle_webhook():132global middleware133if not middleware:134return jsonify({"error": "Middleware not found"}), 500135136if request.content_type.startswith("multipart/form-data"):137json_data = request.form.get("json")138if json_data:139json_data = json.loads(json_data)140else:141return jsonify({"error": "Missing JSON data in multipart form"}), 400142143uploaded_files = request.files144else:145try:146if request.content_type == "application/json":147json_data = request.get_json()148else: # For application/x-www-form-urlencoded149raw_data = request.form.get("json")150if raw_data:151json_data = json.loads(raw_data)152else:153return jsonify({"error": "Missing JSON data"}), 400154uploaded_files = {}155except json.JSONDecodeError:156return jsonify({"error": "Invalid JSON format"}), 400157158try:159webhook_header = {160"mac_address": request.headers.get("mac-address"),161"camera_name": request.headers.get("camera-name"),162"serial_number": request.headers.get("serial-number"),163"camera_id": request.headers.get("camera-id"),164"Authorization": request.headers.get("Authorization"),165}166167webhook_header = {k: v for k, v in webhook_header.items() if v is not None}168169if webhook_header and isinstance(json_data, dict):170json_data["webhook_header"] = webhook_header171172files = {}173for file in uploaded_files:174files[file] = uploaded_files[file].read()175response, status_code = middleware.process_request(json_data, files)176return jsonify({"message": response}), status_code177except Exception as e:178logging.error(f"Error processing the request: {e}")179return jsonify({"error": "Error processing the request"}), 500180181182if __name__ == "__main__":183middleware = load_middleware()184185if not middleware:186logging.error("Failed to load middleware. Exiting..")187exit(1)188189atexit.register(cleanup_middleware)190signal.signal(signal.SIGTERM, signal_handler)191signal.signal(signal.SIGINT, signal_handler)192193try:194serve(app, host="0.0.0.0", port=8002)195except KeyboardInterrupt:196logging.info("Interrupted by user")197finally:198cleanup_middleware()199200201