Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
parkpow
GitHub Repository: parkpow/deep-license-plate-recognition
Path: blob/master/video-editor/interpolator.py
641 views
1
import math
2
import os
3
from multiprocessing import Event
4
from queue import Queue
5
from threading import Lock, Thread
6
from typing import Any
7
8
import cv2
9
import numpy as np
10
11
12
class Interpolator(Thread):
13
"""
14
Consumes keyframes with polygons and skipframes.
15
Interpolates polygon positions for skipframes.
16
Sends all frames to a writer in the order they were received.
17
"""
18
19
OPTICAL_FLOW_WINDOW = (31, 31)
20
GRAY_PADDING = OPTICAL_FLOW_WINDOW[0]
21
PADDING_PARAMS = (
22
GRAY_PADDING,
23
GRAY_PADDING,
24
GRAY_PADDING,
25
GRAY_PADDING,
26
cv2.BORDER_REPLICATE,
27
None,
28
0,
29
)
30
31
def __init__(self, sample_rate: int, writer: Any):
32
super().__init__()
33
self.writer = writer
34
# self.frame_buffer = FrameBuffer(2 * sample_rate)
35
self.frame_buffer: Queue[tuple[np.ndarray, np.ndarray]] = Queue(2 * sample_rate)
36
self.frame_buffer.put((np.array([]), np.array([]))) # prime with dummy frame
37
# State variables
38
self.cur_frame: np.ndarray = np.ndarray([], dtype=np.uint8)
39
self.cur_gray: np.ndarray = np.ndarray([], dtype=np.uint8)
40
self.cur_keyframe_num = 0
41
self.prev_keyframe_num = 0
42
self.cur_polygons: list[np.ndarray] = []
43
self.old_polygons: list[np.ndarray] = []
44
45
# Synchonization variables
46
self.keyframe_event = Event()
47
self.processing_lock = Lock()
48
self.quit = False
49
50
# Read blur parameters form environment
51
try:
52
video_blur_str = os.environ.get("VIDEO_BLUR")
53
if video_blur_str:
54
self.blur_amount = int(video_blur_str)
55
self.blur_amount = max(1, min(10, self.blur_amount))
56
except Exception:
57
self.blur_amount = 3
58
59
def run(self) -> None:
60
"""
61
Checks whether the new keyframe has arrived in a loop.
62
If so, sets a lock and starts processing it.
63
"""
64
while not self.quit:
65
self.keyframe_event.wait()
66
self.keyframe_event.clear()
67
if self.cur_keyframe_num > self.prev_keyframe_num:
68
with self.processing_lock:
69
self._process_keyframe()
70
71
def _bounding_box(self, polygon: np.ndarray, shape: tuple[int, ...]) -> list[int]:
72
"""
73
Returns the bounding box of a polygon.
74
Right and bottom edges are exclusive.
75
Trims the box to fit the frame.
76
"""
77
box = [
78
math.floor(np.min(polygon[:, 0])),
79
math.floor(np.min(polygon[:, 1])),
80
math.ceil(np.max(polygon[:, 0])) + 1,
81
math.ceil(np.max(polygon[:, 1])) + 1,
82
]
83
box[0] = max(0, box[0])
84
box[1] = max(0, box[1])
85
box[2] = min(shape[1], box[2])
86
box[3] = min(shape[0], box[3])
87
88
return box
89
90
def _blur_polygons(self, frame: np.ndarray, polygons: list[np.ndarray]):
91
"""
92
Draws blurred polygons to the frame.
93
"""
94
result = frame.copy()
95
channel_count = frame.shape[2]
96
ignore_mask_color = (255,) * channel_count
97
98
for poly in polygons:
99
box = self._bounding_box(poly, frame.shape)
100
polygon_height = box[3] - box[1]
101
polygon_width = box[2] - box[0]
102
if polygon_height < 1 or polygon_width < 1:
103
continue
104
105
# Prep mask
106
window = result[box[1] : box[3], box[0] : box[2]]
107
mask = np.zeros(window.shape, dtype=np.uint8)
108
cv2.fillConvexPoly(
109
mask, np.int32(poly - box[:2]), ignore_mask_color, cv2.LINE_AA
110
)
111
112
# Prep totally blurred image
113
kernel_width = (polygon_width // self.blur_amount) | 1
114
kernel_height = (polygon_height // self.blur_amount) | 1
115
blurred_window = cv2.GaussianBlur(window, (kernel_width, kernel_height), 0)
116
117
# Combine original and blur
118
result[box[1] : box[3], box[0] : box[2]] = cv2.bitwise_and(
119
window, cv2.bitwise_not(mask)
120
) + cv2.bitwise_and(blurred_window, mask)
121
122
self.writer.write(result)
123
124
def _is_consistent(self, shift: np.ndarray) -> bool:
125
"""
126
Checks if the optical flow shifts are consistent between each other.
127
"""
128
lengths = np.linalg.norm(shift, axis=1)
129
if np.max(lengths) < 2:
130
return True
131
132
# Check average displacement length
133
avg_shift = np.mean(shift, axis=0)
134
avg_shift_len = np.linalg.norm(avg_shift)
135
if avg_shift_len < 0.5:
136
return False
137
avg_shift /= avg_shift_len # normalize
138
139
# Check if direction is consistent with the average
140
for i, vec in enumerate(shift):
141
# This will not work if the polygon is moving towards the camera
142
# TODO: come up with a better check
143
if np.dot(vec, avg_shift) < 0.5 * lengths[i]:
144
return False
145
146
return True
147
148
def _interpolate_polygons(
149
self,
150
) -> list[tuple[np.ndarray, np.ndarray, list[np.ndarray]]]:
151
"""
152
Approximates blurred polygons between keyframes.
153
Returns a List of frames to write and their approximated polygons.
154
"""
155
# Propagate old polygons forward with optical flow
156
num_frames = self.cur_keyframe_num - self.prev_keyframe_num - 1
157
frames_to_blur = []
158
_, prev_gray = self.frame_buffer.get_nowait()
159
prev_polygons = self.old_polygons
160
for _ in range(num_frames):
161
next_frame, next_gray = self.frame_buffer.get_nowait()
162
polygons = []
163
for poly in prev_polygons:
164
new_poly, _, _ = cv2.calcOpticalFlowPyrLK(
165
prev_gray,
166
next_gray,
167
poly + self.GRAY_PADDING,
168
None,
169
winSize=self.OPTICAL_FLOW_WINDOW,
170
maxLevel=10,
171
criteria=(
172
cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT,
173
10,
174
0.03,
175
),
176
)
177
new_poly = new_poly - self.GRAY_PADDING
178
shift = new_poly - poly
179
if self._is_consistent(shift):
180
polygons.append(new_poly)
181
frames_to_blur.append((next_frame, next_gray, polygons))
182
prev_gray = next_gray
183
prev_polygons = polygons
184
185
# Propagate new polygons backward with optical flow
186
next_polygons = self.cur_polygons
187
next_gray = self.cur_gray
188
for i in range(num_frames):
189
_, prev_gray, inter_polygons = frames_to_blur[-i - 1]
190
polygons = []
191
for poly in next_polygons:
192
new_poly, _, _ = cv2.calcOpticalFlowPyrLK(
193
next_gray,
194
prev_gray,
195
poly + self.GRAY_PADDING,
196
None,
197
winSize=self.OPTICAL_FLOW_WINDOW,
198
maxLevel=10,
199
criteria=(
200
cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT,
201
10,
202
0.03,
203
),
204
)
205
new_poly -= self.GRAY_PADDING
206
shift = new_poly - poly
207
if self._is_consistent(shift):
208
polygons.append(new_poly)
209
inter_polygons.extend(polygons)
210
next_gray = prev_gray
211
next_polygons = polygons
212
213
return frames_to_blur
214
215
def _process_keyframe(self) -> None:
216
"""
217
Uses new keyframe to interpolates the previous sequence of skipframes.
218
Sends all frames and their polygons to the writer.
219
"""
220
# Draw skipframes
221
frames_to_blur = self._interpolate_polygons()
222
for skipframe, _, polygons in frames_to_blur:
223
self._blur_polygons(skipframe, polygons)
224
# Draw current frame
225
self._blur_polygons(self.cur_frame, self.cur_polygons)
226
227
# Update state variables
228
self.old_polygons = self.cur_polygons
229
self.prev_keyframe_num = self.cur_keyframe_num
230
231
def _gray_pad(self, frame: np.ndarray) -> np.ndarray:
232
"""
233
Converts a frame to grayscale and pads it.
234
"""
235
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
236
return cv2.copyMakeBorder(gray, *self.PADDING_PARAMS)
237
238
def feed_keyframe(
239
self, frame: np.ndarray, frame_count: int, polygons: list[np.ndarray]
240
) -> None:
241
"""
242
Waits if the previous keyframe is still being processed.
243
Otherwise records the new keyframe and its info.
244
Then triggers the processing event and returns.
245
"""
246
with self.processing_lock:
247
gray = self._gray_pad(frame)
248
self.frame_buffer.put((frame, gray))
249
250
# Update state variables
251
self.cur_frame = frame
252
self.cur_gray = gray
253
self.cur_keyframe_num = frame_count
254
self.cur_polygons = polygons
255
256
self.keyframe_event.set() # signal to start processing
257
258
def feed_skipframe(self, frame: np.ndarray) -> None:
259
"""
260
Stacks skipframes into a buffer.
261
"""
262
self.frame_buffer.put((frame, self._gray_pad(frame)))
263
264
def is_flush_needed(self, frame_count: int) -> bool:
265
"""
266
Determines if the buffer has remaining skipframes to be flushed.
267
"""
268
return self.cur_keyframe_num < frame_count
269
270
def flush(self, frame_count: int, polygons: list[np.ndarray]) -> None:
271
"""
272
Extracts the previously pushed skipframe.
273
Then processes it as a new keyframe.
274
"""
275
with self.processing_lock:
276
# Check if last frame is already processed
277
if self.cur_keyframe_num >= frame_count:
278
return
279
280
# Update state variables
281
self.cur_frame, self.cur_gray = self.frame_buffer.queue[-1]
282
self.cur_keyframe_num = frame_count
283
self.cur_polygons = polygons
284
285
self.keyframe_event.set() # signal to start processing
286
287
def close(self) -> None:
288
"""
289
Finishes processing frames and stops the thread.
290
"""
291
with self.processing_lock:
292
self.quit = True
293
self.keyframe_event.set()
294
self.join()
295
296