Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
parkpow
GitHub Repository: parkpow/deep-license-plate-recognition
Path: blob/master/video-editor/video_editor.py
641 views
1
import logging
2
import os
3
import sys
4
import tempfile
5
import time
6
from pathlib import Path
7
8
import cv2
9
import ffmpegcv
10
import numpy as np
11
import requests
12
from flask import Flask, jsonify, request
13
from interpolator import Interpolator
14
from utils import draw_bounding_box_on_image
15
16
LOG_LEVEL = os.environ.get("LOGGING", "INFO").upper()
17
18
logging.basicConfig(
19
stream=sys.stdout,
20
level=LOG_LEVEL,
21
datefmt="%Y-%m-%d %H:%M:%S",
22
format="%(levelname)-5s [%(name)s.%(lineno)d] => %(message)s",
23
)
24
25
lgr = logging.getLogger("video-editor")
26
27
BASE_WORKING_DIR = "/user-data/"
28
29
30
def recognition_api(cv2_frame, data, sdk_url, api_key):
31
retval, buffer = cv2.imencode(".jpg", cv2_frame)
32
33
if sdk_url:
34
url = sdk_url + "/v1/plate-reader/"
35
headers = None
36
else:
37
if api_key is None:
38
raise Exception("A TOKEN is required if using Cloud API")
39
40
url = "https://api.platerecognizer.com/v1/plate-reader/"
41
headers = {"Authorization": "Token " + api_key}
42
43
while True:
44
response = requests.post(
45
url, files=dict(upload=buffer), headers=headers, data=data
46
)
47
48
if response.status_code < 200 or response.status_code > 300:
49
if response.status_code == 429:
50
time.sleep(1)
51
else:
52
logging.error(response.text)
53
raise Exception("Error running recognition")
54
else:
55
res_json = response.json()
56
if "error" in res_json:
57
logging.error(response.text)
58
raise Exception("Error running recognition")
59
60
return res_json
61
62
63
def visualize_frame(cv2_frame, sdk_url, snapshot_api_token):
64
run_recognition_response = recognition_api(
65
cv2_frame, {}, sdk_url, snapshot_api_token
66
)
67
68
for result in run_recognition_response["results"]:
69
plate_bounding_box = result["box"]
70
plate = result["plate"]
71
draw_bounding_box_on_image(
72
cv2_frame,
73
plate_bounding_box["ymin"],
74
plate_bounding_box["xmin"],
75
plate_bounding_box["ymax"],
76
plate_bounding_box["xmax"],
77
plate,
78
)
79
80
# Vehicle box
81
if result["vehicle"]["score"] > 0:
82
vehicle_bounding_box = result["vehicle"]["box"]
83
vehicle = result["vehicle"]["type"]
84
draw_bounding_box_on_image(
85
cv2_frame,
86
vehicle_bounding_box["ymin"],
87
vehicle_bounding_box["xmin"],
88
vehicle_bounding_box["ymax"],
89
vehicle_bounding_box["xmax"],
90
vehicle,
91
)
92
93
return cv2_frame
94
95
96
def blur_api(cv2_frame, blur_url):
97
retval, buffer = cv2.imencode(".jpg", cv2_frame)
98
99
response = requests.post(blur_url, files=dict(upload=("frame.jpg", buffer)))
100
if response.status_code < 200 or response.status_code > 300:
101
logging.error(response.text)
102
raise Exception("Error performing blur")
103
else:
104
return response
105
106
107
def get_blur_polygons(cv2_frame: np.ndarray, blur_url: str):
108
"""
109
Call Blur API to request polygons to be blurred.
110
"""
111
blur_response = blur_api(cv2_frame, blur_url)
112
polygons = [
113
np.array(plate["polygon"], dtype=np.float32)
114
for plate in blur_response.json()["plates"]
115
]
116
117
return polygons
118
119
120
def save_frame(count, cv2_image, save_dir, image_format="jpg"):
121
save_path = f"{save_dir}frame_{count}.{image_format}"
122
lgr.debug(f"saving frame to: {save_path}")
123
if image_format == "png":
124
# default 3, 9 is highest compression
125
cv2.imwrite(save_path, cv2_image, [int(cv2.IMWRITE_PNG_COMPRESSION), 3])
126
127
elif image_format == "jpg":
128
# default 95, 100 is best quality
129
cv2.imwrite(save_path, cv2_image, [cv2.IMWRITE_JPEG_QUALITY, 95])
130
131
else:
132
raise Exception(f"Unrecognized Output format: {image_format}")
133
134
135
def init_writer(filename, fps):
136
return ffmpegcv.noblock(ffmpegcv.VideoWriter, filename, "h264", fps)
137
138
139
def process_video(video, action):
140
filename = video.filename
141
lgr.debug(f"Processing video: {filename}")
142
143
# check processing actions for camera
144
lgr.debug(f"enabled_actions: {action}")
145
146
frames_enabled = "frames" in action
147
visualization_enabled = "visualization" in action
148
blur_enabled = "blur" in action
149
150
lgr.debug(f"CONFIG frames_enabled: {frames_enabled}")
151
lgr.debug(f"CONFIG visualization_enabled: {visualization_enabled}")
152
lgr.debug(f"CONFIG blur_enabled: {blur_enabled}")
153
154
out1, out2, frames_output_dir, sdk_url, snapshot_api_token, blur_url = (
155
None,
156
None,
157
None,
158
None,
159
None,
160
None,
161
)
162
163
temp_dir = tempfile.mkdtemp()
164
165
# Save the uploaded video file to the temporary directory
166
video_path = os.path.join(temp_dir, video.filename)
167
video.save(video_path)
168
169
cap = ffmpegcv.VideoCapture(video_path)
170
171
if not cap.isOpened():
172
lgr.debug("Error opening video stream or file")
173
exit(1)
174
175
filename_stem = Path(video_path).stem
176
video_format_ext = "mp4"
177
178
# Override FPS if provided
179
try:
180
fps = int(os.environ.get("FPS"))
181
except TypeError:
182
# ffmpegcv cap.fps is not reliable
183
# Calculate FPS manually by counting frames for 500ms
184
185
fps_cap = cv2.VideoCapture(video_path)
186
frame_count = 0
187
last_frame_time = 0
188
while fps_cap.isOpened():
189
ret, _ = fps_cap.read()
190
last_frame_time = fps_cap.get(cv2.CAP_PROP_POS_MSEC)
191
# Stop at half a second or no more frames
192
if not ret or last_frame_time >= 500:
193
break
194
frame_count += 1
195
fps_cap.release()
196
assert last_frame_time > 0, "Video too short or frames are not readable"
197
fps = frame_count * 1000 / last_frame_time
198
lgr.debug(f"FPS: {fps}")
199
200
if visualization_enabled:
201
output1_filename = (
202
f"{BASE_WORKING_DIR}{filename_stem}_visualization.{video_format_ext}"
203
)
204
out1 = init_writer(output1_filename, fps)
205
206
if blur_enabled:
207
output2_filename = f"{BASE_WORKING_DIR}{filename_stem}_blur.{video_format_ext}"
208
out2 = init_writer(output2_filename, fps)
209
210
# Create the output dir for frames if missing
211
if frames_enabled:
212
frames_output_dir = f"{BASE_WORKING_DIR}{filename_stem}_frames/"
213
Path(frames_output_dir).mkdir(parents=True, exist_ok=True)
214
lgr.debug(f"CONFIG frames_output_dir: {frames_output_dir}")
215
216
# Parse visualization parameters
217
if visualization_enabled:
218
sdk_url = os.environ.get("SDK_URL")
219
snapshot_api_token = os.environ.get("TOKEN")
220
221
lgr.debug(f"CONFIG sdk_url: {sdk_url}")
222
lgr.debug(f"CONFIG snapshot_api_token: {snapshot_api_token}")
223
224
# Parse blur parameters
225
if blur_enabled:
226
blur_url = os.environ.get("BLUR_URL")
227
228
try:
229
sample_rate = int(os.environ.get("SAMPLE"))
230
except Exception:
231
sample_rate = 5
232
keyframe_residue = 1 % sample_rate # for sample_rate = 1
233
interpolator = Interpolator(sample_rate, out2)
234
interpolator.start()
235
236
start = time.time()
237
frame_count = 0
238
while cap.isOpened():
239
ret, frame = cap.read()
240
if ret:
241
lgr.debug(f"Processing frame: {frame_count}")
242
frame_count += 1
243
244
if frames_enabled:
245
save_frame(frame_count, frame, frames_output_dir)
246
247
if visualization_enabled:
248
# adding filled rectangle on each frame
249
visualized_frame = visualize_frame(frame, sdk_url, snapshot_api_token)
250
out1.write(visualized_frame)
251
252
if blur_enabled:
253
if frame_count % sample_rate == keyframe_residue:
254
# Keyframe
255
polygons = get_blur_polygons(frame, blur_url)
256
interpolator.feed_keyframe(frame, frame_count, polygons)
257
else:
258
# Skipframes
259
interpolator.feed_skipframe(frame)
260
else:
261
break
262
263
# Flush the remaining skipframes
264
if blur_enabled and interpolator.is_flush_needed(frame_count):
265
frame, _ = interpolator.frame_buffer.queue[-1]
266
polygons = get_blur_polygons(frame, blur_url)
267
interpolator.flush(frame_count, polygons)
268
interpolator.close()
269
270
cap.release()
271
if out1:
272
out1.release()
273
if out2:
274
out2.release()
275
276
lgr.debug(f"Frame count: {frame_count}")
277
lgr.debug(f"Time taken: {time.time() - start}")
278
lgr.debug(f"Done processing video {filename}")
279
os.remove(video_path)
280
os.rmdir(temp_dir)
281
282
283
app = Flask(__name__)
284
285
286
@app.route("/process-video", methods=["POST"])
287
def process_video_route():
288
if "upload" not in request.files or "action" not in request.form:
289
return jsonify({"error": "Invalid request"}), 400
290
291
file = request.files["upload"]
292
action = request.form["action"]
293
294
if file.filename == "":
295
return jsonify({"error": "No selected file"}), 400
296
297
try:
298
process_video(file, action)
299
except Exception as e:
300
lgr.error("Error:", exc_info=e)
301
return jsonify({"error": str(e)}), 500
302
303
return jsonify("Done."), 200
304
305
306
app.run(host="0.0.0.0", port=8081, debug=True)
307
308