Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
parkpow
GitHub Repository: parkpow/deep-license-plate-recognition
Path: blob/master/video-editor/video_editor.py
1085 views
1
import logging
2
import math
3
import os
4
import sys
5
import tempfile
6
import time
7
from pathlib import Path
8
9
import cv2
10
import ffmpegcv
11
import numpy as np
12
import requests
13
from flask import Flask, jsonify, request
14
from interpolator import Interpolator
15
from utils import draw_bounding_box_on_image
16
17
try:
18
LOG_LEVEL = int(os.environ.get("LOGGING", logging.INFO))
19
except ValueError as e:
20
raise RuntimeError(
21
"The LOGGING config should be a number, "
22
"See https://guides.platerecognizer.com/docs/blur/configuration#logging"
23
) from e
24
25
logging.basicConfig(
26
stream=sys.stdout,
27
level=LOG_LEVEL,
28
datefmt="%Y-%m-%d %H:%M:%S",
29
format="%(levelname)-5s [%(name)s.%(lineno)d] => %(message)s",
30
)
31
32
lgr = logging.getLogger("video-editor")
33
34
BASE_WORKING_DIR = "/user-data/"
35
36
37
def recognition_api(cv2_frame, data, sdk_url, api_key):
38
retval, buffer = cv2.imencode(".jpg", cv2_frame)
39
40
if sdk_url:
41
url = sdk_url + "/v1/plate-reader/"
42
headers = None
43
else:
44
if api_key is None:
45
raise Exception("A TOKEN is required if using Cloud API")
46
47
url = "https://api.platerecognizer.com/v1/plate-reader/"
48
headers = {"Authorization": "Token " + api_key}
49
50
while True:
51
response = requests.post(
52
url, files=dict(upload=buffer), headers=headers, data=data
53
)
54
55
if response.status_code < 200 or response.status_code > 300:
56
if response.status_code == 429:
57
time.sleep(1)
58
else:
59
logging.error(response.text)
60
raise Exception("Error running recognition")
61
else:
62
res_json = response.json()
63
if "error" in res_json:
64
logging.error(response.text)
65
raise Exception("Error running recognition")
66
67
return res_json
68
69
70
def visualize_frame(cv2_frame, sdk_url, snapshot_api_token):
71
run_recognition_response = recognition_api(
72
cv2_frame, {}, sdk_url, snapshot_api_token
73
)
74
75
for result in run_recognition_response["results"]:
76
plate_bounding_box = result["box"]
77
plate = result["plate"]
78
draw_bounding_box_on_image(
79
cv2_frame,
80
plate_bounding_box["ymin"],
81
plate_bounding_box["xmin"],
82
plate_bounding_box["ymax"],
83
plate_bounding_box["xmax"],
84
plate,
85
)
86
87
# Vehicle box
88
if result["vehicle"]["score"] > 0:
89
vehicle_bounding_box = result["vehicle"]["box"]
90
vehicle = result["vehicle"]["type"]
91
draw_bounding_box_on_image(
92
cv2_frame,
93
vehicle_bounding_box["ymin"],
94
vehicle_bounding_box["xmin"],
95
vehicle_bounding_box["ymax"],
96
vehicle_bounding_box["xmax"],
97
vehicle,
98
)
99
100
return cv2_frame
101
102
103
def blur_api(cv2_frame, blur_url):
104
retval, buffer = cv2.imencode(".jpg", cv2_frame)
105
106
response = requests.post(blur_url, files=dict(upload=("frame.jpg", buffer)))
107
if response.status_code < 200 or response.status_code > 300:
108
logging.error(response.text)
109
raise Exception("Error performing blur")
110
else:
111
return response
112
113
114
def ellipse_polygon(box, scale=1.0, num_points=64):
115
"""
116
Generate ellipse polygon points from a face bounding box:
117
'box': A dict with keys 'xmin', 'ymin', 'xmax', 'ymax'
118
119
scale: enlarge/shrink ellipse relative to the box
120
num_points: number of polygon vertices
121
"""
122
123
xmin, ymin = box["xmin"], box["ymin"]
124
xmax, ymax = box["xmax"], box["ymax"]
125
126
# Center of ellipse (middle of bounding box)
127
cx = (xmin + xmax) / 2
128
cy = (ymin + ymax) / 2
129
130
# Radii (half width/height of box)
131
rx = (xmax - xmin) / 2 * scale
132
ry = (ymax - ymin) / 2 * scale
133
134
pts = []
135
for i in range(num_points):
136
theta = 2 * math.pi * (i / num_points)
137
x = cx + rx * math.cos(theta)
138
y = cy + ry * math.sin(theta)
139
pts.append([x, y])
140
return pts
141
142
143
def get_blur_polygons(cv2_frame: np.ndarray, blur_url: str):
144
"""
145
Call Blur API to request polygons to be blurred.
146
"""
147
blur_response = blur_api(cv2_frame, blur_url)
148
polygons = [
149
np.array(plate["polygon"], dtype=np.float32)
150
for plate in blur_response.json()["plates"]
151
]
152
153
for face in blur_response.json()["faces"]:
154
polygon = ellipse_polygon(face["box"])
155
polygons.append(np.array(polygon, dtype=np.float32))
156
157
return polygons
158
159
160
def save_frame(count, cv2_image, save_dir, image_format="jpg"):
161
save_path = f"{save_dir}frame_{count}.{image_format}"
162
lgr.debug(f"saving frame to: {save_path}")
163
if image_format == "png":
164
# default 3, 9 is highest compression
165
cv2.imwrite(save_path, cv2_image, [int(cv2.IMWRITE_PNG_COMPRESSION), 3])
166
167
elif image_format == "jpg":
168
# default 95, 100 is best quality
169
cv2.imwrite(save_path, cv2_image, [cv2.IMWRITE_JPEG_QUALITY, 95])
170
171
else:
172
raise Exception(f"Unrecognized Output format: {image_format}")
173
174
175
def init_writer(filename, fps):
176
return ffmpegcv.noblock(ffmpegcv.VideoWriter, filename, "h264", fps)
177
178
179
def process_video(video, action): # noqa: C901 TODO: Break down to reduce cyclomatic complexity
180
filename = video.filename
181
lgr.debug(f"Processing video: {filename}")
182
183
# check processing actions for camera
184
lgr.debug(f"enabled_actions: {action}")
185
186
frames_enabled = "frames" in action
187
visualization_enabled = "visualization" in action
188
blur_enabled = "blur" in action
189
190
lgr.debug(f"CONFIG frames_enabled: {frames_enabled}")
191
lgr.debug(f"CONFIG visualization_enabled: {visualization_enabled}")
192
lgr.debug(f"CONFIG blur_enabled: {blur_enabled}")
193
194
out1, out2, frames_output_dir, sdk_url, snapshot_api_token, blur_url = (
195
None,
196
None,
197
None,
198
None,
199
None,
200
None,
201
)
202
203
temp_dir = tempfile.mkdtemp()
204
205
# Save the uploaded video file to the temporary directory
206
video_path = os.path.join(temp_dir, video.filename)
207
video.save(video_path)
208
209
cap = ffmpegcv.VideoCapture(video_path)
210
211
if not cap.isOpened():
212
lgr.debug("Error opening video stream or file")
213
exit(1)
214
215
filename_stem = Path(video_path).stem
216
video_format_ext = "mp4"
217
218
# Override FPS if provided
219
try:
220
fps = int(os.environ.get("FPS"))
221
except TypeError:
222
# ffmpegcv cap.fps is not reliable
223
# Calculate FPS manually by counting frames for 500ms
224
225
fps_cap = cv2.VideoCapture(video_path)
226
frame_count = 0
227
last_frame_time = 0
228
while fps_cap.isOpened():
229
ret, _ = fps_cap.read()
230
last_frame_time = fps_cap.get(cv2.CAP_PROP_POS_MSEC)
231
# Stop at half a second or no more frames
232
if not ret or last_frame_time >= 500:
233
break
234
frame_count += 1
235
fps_cap.release()
236
assert last_frame_time > 0, "Video too short or frames are not readable"
237
fps = frame_count * 1000 / last_frame_time
238
lgr.debug(f"FPS: {fps}")
239
240
if visualization_enabled:
241
output1_filename = (
242
f"{BASE_WORKING_DIR}{filename_stem}_visualization.{video_format_ext}"
243
)
244
out1 = init_writer(output1_filename, fps)
245
246
if blur_enabled:
247
output2_filename = f"{BASE_WORKING_DIR}{filename_stem}_blur.{video_format_ext}"
248
out2 = init_writer(output2_filename, fps)
249
250
# Create the output dir for frames if missing
251
if frames_enabled:
252
frames_output_dir = f"{BASE_WORKING_DIR}{filename_stem}_frames/"
253
Path(frames_output_dir).mkdir(parents=True, exist_ok=True)
254
lgr.debug(f"CONFIG frames_output_dir: {frames_output_dir}")
255
256
# Parse visualization parameters
257
if visualization_enabled:
258
sdk_url = os.environ.get("SDK_URL")
259
snapshot_api_token = os.environ.get("TOKEN")
260
261
lgr.debug(f"CONFIG sdk_url: {sdk_url}")
262
lgr.debug(f"CONFIG snapshot_api_token: {snapshot_api_token}")
263
264
# Parse blur parameters
265
if blur_enabled:
266
blur_url = os.environ.get("BLUR_URL")
267
268
try:
269
sample_rate = int(os.environ.get("SAMPLE"))
270
except Exception:
271
sample_rate = 5
272
keyframe_residue = 1 % sample_rate # for sample_rate = 1
273
interpolator = Interpolator(sample_rate, out2)
274
interpolator.start()
275
276
start = time.time()
277
frame_count = 0
278
while cap.isOpened():
279
ret, frame = cap.read()
280
if ret:
281
lgr.debug(f"Processing frame: {frame_count}")
282
frame_count += 1
283
284
if frames_enabled:
285
save_frame(frame_count, frame, frames_output_dir)
286
287
if visualization_enabled:
288
# adding filled rectangle on each frame
289
visualized_frame = visualize_frame(frame, sdk_url, snapshot_api_token)
290
out1.write(visualized_frame)
291
292
if blur_enabled:
293
if frame_count % sample_rate == keyframe_residue:
294
# Keyframe
295
polygons = get_blur_polygons(frame, blur_url)
296
interpolator.feed_keyframe(frame, frame_count, polygons)
297
else:
298
# Skipframes
299
interpolator.feed_skipframe(frame)
300
else:
301
break
302
303
# Flush the remaining skipframes
304
if blur_enabled and interpolator.is_flush_needed(frame_count):
305
frame, _ = interpolator.frame_buffer.queue[-1]
306
polygons = get_blur_polygons(frame, blur_url)
307
interpolator.flush(frame_count, polygons)
308
interpolator.close()
309
310
cap.release()
311
if out1:
312
out1.release()
313
if out2:
314
out2.release()
315
316
lgr.debug(f"Frame count: {frame_count}")
317
lgr.debug(f"Time taken: {time.time() - start}")
318
lgr.debug(f"Done processing video {filename}")
319
os.remove(video_path)
320
os.rmdir(temp_dir)
321
322
323
app = Flask(__name__)
324
325
326
@app.route("/process-video", methods=["POST"])
327
def process_video_route():
328
if "upload" not in request.files or "action" not in request.form:
329
return jsonify({"error": "Invalid request"}), 400
330
331
file = request.files["upload"]
332
action = request.form["action"]
333
334
if file.filename == "":
335
return jsonify({"error": "No selected file"}), 400
336
337
try:
338
process_video(file, action)
339
except Exception as e:
340
lgr.error("Error:", exc_info=e)
341
return jsonify({"error": str(e)}), 500
342
343
return jsonify("Done."), 200
344
345
346
app.run(host="0.0.0.0", port=8081, debug=True)
347
348