Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
parkpow
GitHub Repository: parkpow/deep-license-plate-recognition
Path: blob/master/webhooks/middleware/consumer.py
1080 views
1
# common_webhook_consumer.py
2
import atexit
3
import hmac
4
import importlib
5
import json
6
import logging
7
import os
8
import signal
9
import subprocess
10
import sys
11
from logging.handlers import RotatingFileHandler
12
from typing import Any
13
14
from flask import Flask, Response, jsonify, request, stream_with_context
15
from waitress import serve # type: ignore
16
17
LOG_FILE = "/tmp/middleware.log"
18
19
logging.basicConfig(
20
level=logging.INFO,
21
format="%(asctime)s - %(levelname)s - %(message)s",
22
handlers=[
23
logging.StreamHandler(),
24
RotatingFileHandler(LOG_FILE, maxBytes=100 * 1024 * 1024, backupCount=1),
25
],
26
)
27
28
app = Flask(__name__)
29
30
middleware: Any | None = None
31
32
33
def cleanup_middleware():
34
"""Cleanup middleware resources on shutdown."""
35
global middleware
36
if middleware and hasattr(middleware, "shutdown"):
37
logging.info("Shutting down middleware...")
38
try:
39
middleware.shutdown()
40
except Exception as e:
41
logging.error(f"Error during middleware shutdown: {e}")
42
43
44
def signal_handler(signum, frame):
45
"""Handle termination signals gracefully."""
46
logging.info(f"Received signal {signum}, initiating graceful shutdown...")
47
cleanup_middleware()
48
sys.exit(0)
49
50
51
def load_middleware():
52
middleware_name = os.getenv("MIDDLEWARE_NAME")
53
if not middleware_name:
54
logging.error("MIDDLEWARE_NAME environment variable is not set.")
55
return None
56
57
try:
58
middleware = importlib.import_module(f"protocols.{middleware_name}")
59
logging.info(f"Using middleware: {middleware_name}")
60
61
if hasattr(middleware, "initialize"):
62
middleware.initialize()
63
64
return middleware
65
except ModuleNotFoundError:
66
logging.error(f"Middleware '{middleware_name}' not found.")
67
return None
68
69
70
@app.route("/health", methods=["GET"])
71
def health_check():
72
"""Health check endpoint for load balancers and monitoring (front_rear only)."""
73
middleware_name = os.getenv("MIDDLEWARE_NAME")
74
if middleware_name != "front_rear":
75
return jsonify({"error": "Not found"}), 404
76
77
global middleware
78
if not middleware:
79
return jsonify({"status": "unhealthy", "reason": "Middleware not loaded"}), 503
80
return jsonify({"status": "healthy", "middleware": middleware_name}), 200
81
82
83
@app.route("/logs", methods=["GET"])
84
def stream_logs():
85
"""Stream logs in real-time as a plain text stream (like `docker logs -f`)."""
86
auth_header = request.headers.get("Authorization", "")
87
auth_token = auth_header.replace("Token ", "").replace("Bearer ", "")
88
admin_token = os.getenv("ADMIN_TOKEN")
89
90
if not admin_token:
91
return jsonify({"error": "Admin access not configured"}), 503
92
93
if not hmac.compare_digest(auth_token, admin_token):
94
return jsonify({"error": "Unauthorized"}), 401
95
96
def generate():
97
"""Generate log stream in plain text format."""
98
lines_str = request.args.get("lines", "50")
99
if not lines_str.isdigit():
100
yield "Error: 'lines' parameter must be an integer.\n"
101
return
102
lines = lines_str
103
104
try:
105
proc = subprocess.Popen(
106
["tail", "-f", "-n", lines, LOG_FILE],
107
stdout=subprocess.PIPE,
108
stderr=subprocess.STDOUT,
109
universal_newlines=True,
110
)
111
112
if proc.stdout:
113
try:
114
yield from proc.stdout
115
except GeneratorExit:
116
proc.terminate()
117
proc.wait()
118
except Exception as e:
119
yield f"Error streaming logs: {e}\n"
120
121
return Response(
122
stream_with_context(generate()),
123
mimetype="text/plain",
124
headers={
125
"Cache-Control": "no-cache",
126
"X-Accel-Buffering": "no", # Disable nginx buffering
127
},
128
)
129
130
131
@app.route("/", methods=["POST"])
132
def handle_webhook():
133
global middleware
134
if not middleware:
135
return jsonify({"error": "Middleware not found"}), 500
136
137
if request.content_type.startswith("multipart/form-data"):
138
json_data = request.form.get("json")
139
if json_data:
140
json_data = json.loads(json_data)
141
else:
142
return jsonify({"error": "Missing JSON data in multipart form"}), 400
143
144
uploaded_files = request.files
145
else:
146
try:
147
if request.content_type == "application/json":
148
json_data = request.get_json()
149
else: # For application/x-www-form-urlencoded
150
raw_data = request.form.get("json")
151
if raw_data:
152
json_data = json.loads(raw_data)
153
else:
154
return jsonify({"error": "Missing JSON data"}), 400
155
uploaded_files = {}
156
except json.JSONDecodeError:
157
return jsonify({"error": "Invalid JSON format"}), 400
158
159
try:
160
webhook_header = {
161
"mac_address": request.headers.get("mac-address"),
162
"camera_name": request.headers.get("camera-name"),
163
"serial_number": request.headers.get("serial-number"),
164
"camera_id": request.headers.get("camera-id"),
165
"Authorization": request.headers.get("Authorization"),
166
}
167
168
webhook_header = {k: v for k, v in webhook_header.items() if v is not None}
169
170
if webhook_header and isinstance(json_data, dict):
171
json_data["webhook_header"] = webhook_header
172
173
files = {}
174
for file in uploaded_files:
175
files[file] = uploaded_files[file].read()
176
response, status_code = middleware.process_request(json_data, files)
177
return jsonify({"message": response}), status_code
178
except Exception as e:
179
logging.error(f"Error processing the request: {e}")
180
return jsonify({"error": "Error processing the request"}), 500
181
182
183
if __name__ == "__main__":
184
middleware = load_middleware()
185
186
if not middleware:
187
logging.error("Failed to load middleware. Exiting..")
188
exit(1)
189
190
atexit.register(cleanup_middleware)
191
signal.signal(signal.SIGTERM, signal_handler)
192
signal.signal(signal.SIGINT, signal_handler)
193
194
try:
195
serve(app, host="0.0.0.0", port=8002)
196
except KeyboardInterrupt:
197
logging.info("Interrupted by user")
198
finally:
199
cleanup_middleware()
200
201