Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
parkpow
GitHub Repository: parkpow/deep-license-plate-recognition
Path: blob/master/plate_recognition.py
640 views
1
#!/usr/bin/env python
2
3
import argparse
4
import csv
5
import io
6
import json
7
import math
8
import sys
9
import time
10
from collections import OrderedDict
11
from itertools import combinations
12
from pathlib import Path
13
14
import requests
15
from PIL import Image, ImageDraw, ImageFont
16
17
if sys.version_info.major == 3 and sys.version_info.minor >= 10:
18
from collections.abc import MutableMapping
19
else:
20
# ruff: noqa
21
from collections import MutableMapping # type: ignore[attr-defined]
22
23
24
def parse_arguments(args_hook=lambda _: _):
25
parser = argparse.ArgumentParser(
26
description="Read license plates from images and output the result as JSON or CSV.",
27
epilog="""Examples:
28
Process images from a folder:
29
python plate_recognition.py -a MY_API_KEY /path/to/vehicle-*.jpg
30
Use the Snapshot SDK instead of the Cloud Api:
31
python plate_recognition.py -s http://localhost:8080 /path/to/vehicle-*.jpg
32
Specify Camera ID and/or two Regions:
33
plate_recognition.py -a MY_API_KEY --camera-id Camera1 -r us-ca -r th-37 /path/to/vehicle-*.jpg""",
34
formatter_class=argparse.RawTextHelpFormatter,
35
)
36
parser.add_argument("-a", "--api-key", help="Your API key.", required=False)
37
parser.add_argument(
38
"-r",
39
"--regions",
40
help="Match the license plate pattern of a specific region",
41
required=False,
42
action="append",
43
)
44
parser.add_argument(
45
"-s",
46
"--sdk-url",
47
help="Url to self hosted sdk For example, http://localhost:8080",
48
required=False,
49
)
50
parser.add_argument(
51
"--camera-id", help="Name of the source camera.", required=False
52
)
53
parser.add_argument("files", nargs="+", type=Path, help="Path to vehicle images")
54
args_hook(parser)
55
args = parser.parse_args()
56
if not args.sdk_url and not args.api_key:
57
raise Exception("api-key is required")
58
return args
59
60
61
_session = None
62
63
64
def recognition_api(
65
fp,
66
regions=None,
67
api_key=None,
68
sdk_url=None,
69
config=None,
70
camera_id=None,
71
timestamp=None,
72
mmc=None,
73
exit_on_error=True,
74
):
75
if regions is None:
76
regions = []
77
if config is None:
78
config = {}
79
global _session
80
data = dict(regions=regions, config=json.dumps(config))
81
if camera_id:
82
data["camera_id"] = camera_id
83
if mmc:
84
data["mmc"] = mmc
85
if timestamp:
86
data["timestamp"] = timestamp
87
response = None
88
if sdk_url:
89
fp.seek(0)
90
if "container-api" in sdk_url:
91
response = requests.post(
92
"https://container-api.parkpow.com/api/v1/predict/",
93
files=dict(image=fp),
94
headers={"Authorization": "Token " + api_key},
95
)
96
else:
97
response = requests.post(
98
sdk_url + "/v1/plate-reader/", files=dict(upload=fp), data=data
99
)
100
else:
101
if not _session:
102
_session = requests.Session()
103
_session.headers.update({"Authorization": "Token " + api_key})
104
for _ in range(3):
105
fp.seek(0)
106
response = _session.post(
107
"https://api.platerecognizer.com/v1/plate-reader/",
108
files=dict(upload=fp),
109
data=data,
110
)
111
if response.status_code == 429: # Max calls per second reached
112
time.sleep(1)
113
else:
114
break
115
116
if response is None:
117
return {}
118
if response.status_code < 200 or response.status_code > 300:
119
print(response.text)
120
if exit_on_error:
121
exit(1)
122
return response.json(object_pairs_hook=OrderedDict)
123
124
125
def flatten_dict(d, parent_key="", sep="_"):
126
items = []
127
for k, v in d.items():
128
new_key = parent_key + sep + k if parent_key else k
129
if isinstance(v, MutableMapping):
130
items.extend(flatten_dict(v, new_key, sep=sep).items())
131
else:
132
if isinstance(v, list):
133
items.append((new_key, json.dumps(v)))
134
else:
135
items.append((new_key, v))
136
return dict(items)
137
138
139
def flatten(result):
140
plates = result["results"]
141
del result["results"]
142
if "usage" in result:
143
del result["usage"]
144
flattened_data = [] # Accumulate flattened data for each plate
145
if not plates:
146
data = result.copy()
147
data.update(flatten_dict({})) # Assuming flatten_dict can handle an empty dict
148
flattened_data.append(data)
149
else:
150
for plate in plates:
151
data = result.copy()
152
data.update(flatten_dict(plate))
153
flattened_data.append(data)
154
return flattened_data
155
156
157
def save_cropped(api_res, path, args):
158
dest = args.crop_lp or args.crop_vehicle
159
dest.mkdir(exist_ok=True, parents=True)
160
image = Image.open(path).convert("RGB")
161
for i, result in enumerate(api_res.get("results", []), 1):
162
if args.crop_lp and result["plate"]:
163
box = result["box"]
164
cropped = image.crop((box["xmin"], box["ymin"], box["xmax"], box["ymax"]))
165
cropped.save(
166
dest / f'{result["plate"]}_{result["region"]["code"]}_{path.name}'
167
)
168
if args.crop_vehicle and result["vehicle"]["score"]:
169
box = result["vehicle"]["box"]
170
cropped = image.crop((box["xmin"], box["ymin"], box["xmax"], box["ymax"]))
171
make_model = result.get("model_make", [None])[0]
172
filename = f'{i}_{result["vehicle"]["type"]}_{path.name}'
173
if make_model:
174
filename = f'{make_model["make"]}_{make_model["model"]}_' + filename
175
cropped.save(dest / filename)
176
177
178
def is_detection_mode_vehicle(engine_config):
179
if not engine_config:
180
return False
181
182
try:
183
engine_config_dict = json.loads(engine_config)
184
except (TypeError, json.JSONDecodeError):
185
return False
186
187
return engine_config_dict.get("detection_mode") == "vehicle"
188
189
190
def transform_result(input_data):
191
output = OrderedDict(
192
[
193
("filename", input_data.get("filename")),
194
("timestamp", input_data.get("timestamp")),
195
("camera_id", input_data.get("camera_id")),
196
("results", []),
197
]
198
)
199
200
no_plate_box = OrderedDict(
201
[("xmin", None), ("ymin", None), ("xmax", None), ("ymax", None)]
202
)
203
no_plate_region = OrderedDict([("code", None), ("score", None)])
204
205
for result in input_data.get("results", []):
206
# Process plate data if available
207
plate_data = result.get("plate")
208
if plate_data:
209
props = plate_data.get("props", {})
210
plate_candidates = props.get("plate", [])
211
top_plate = plate_candidates[0] if plate_candidates else None
212
213
region_candidates = props.get("region", [])
214
top_region = region_candidates[0] if region_candidates else None
215
region_entry = (
216
OrderedDict(
217
[
218
("code", top_region.get("value")),
219
("score", top_region.get("score")),
220
]
221
)
222
if top_region
223
else None
224
)
225
else:
226
plate_candidates = []
227
top_plate = None
228
region_entry = None
229
230
# Skip if vehicle data is missing
231
vehicle_data = result.get("vehicle")
232
if not vehicle_data:
233
continue
234
235
# Process vehicle properties
236
v_props = vehicle_data.get("props", {})
237
model_make = v_props.get("make_model", [])
238
239
colors = [
240
OrderedDict([("color", c.get("value")), ("score", c.get("score"))])
241
for c in v_props.get("color", [])
242
]
243
orientations = [
244
OrderedDict([("orientation", o.get("value")), ("score", o.get("score"))])
245
for o in v_props.get("orientation", [])
246
]
247
248
candidates = [
249
OrderedDict([("score", cand.get("score")), ("plate", cand.get("value"))])
250
for cand in plate_candidates
251
]
252
253
vehicle_entry = OrderedDict(
254
[
255
("score", vehicle_data.get("score")),
256
("type", vehicle_data.get("type")),
257
("box", vehicle_data.get("box")),
258
]
259
)
260
261
transformed_result = OrderedDict(
262
[
263
("box", plate_data.get("box") if plate_data else no_plate_box),
264
("plate", top_plate.get("value") if top_plate else None),
265
("region", region_entry if top_plate else no_plate_region),
266
("score", top_plate.get("score") if top_plate else None),
267
("candidates", candidates if plate_data else None),
268
("dscore", plate_data.get("score") if plate_data else None),
269
("vehicle", vehicle_entry),
270
("model_make", model_make),
271
("color", colors),
272
("orientation", orientations),
273
]
274
)
275
276
output["results"].append(transformed_result)
277
278
output["usage"] = input_data.get("usage", {})
279
output["processing_time"] = input_data.get("processing_time")
280
return output
281
282
283
def save_results(results, args):
284
path = args.output_file
285
if not Path(path).parent.exists():
286
print("%s does not exist" % path)
287
return
288
if not results:
289
return
290
if args.format == "json":
291
with open(path, "w") as fp:
292
json.dump(results, fp)
293
elif args.format == "csv":
294
fieldnames = []
295
for result in results[:10]:
296
data = (
297
transform_result(result)
298
if is_detection_mode_vehicle(args.engine_config)
299
else result
300
)
301
candidates = flatten(data.copy())
302
for candidate in candidates:
303
if len(fieldnames) < len(candidate):
304
fieldnames = candidate.keys()
305
with open(path, "w", newline="") as fp:
306
writer = csv.DictWriter(fp, fieldnames=fieldnames)
307
writer.writeheader()
308
for result in results:
309
result_data = (
310
transform_result(result)
311
if is_detection_mode_vehicle(args.engine_config)
312
else result
313
)
314
flattened_results = flatten(result_data)
315
for flattened_result in flattened_results:
316
writer.writerow(flattened_result)
317
318
319
def custom_args(parser):
320
parser.epilog += """
321
Specify additional engine configuration:
322
plate_recognition.py -a MY_API_KEY --engine-config \'{"region":"strict"}\' /path/to/vehicle-*.jpg
323
Specify an output file and format for the results:
324
plate_recognition.py -a MY_API_KEY -o data.csv --format csv /path/to/vehicle-*.jpg
325
Enable Make Model and Color prediction:
326
plate_recognition.py -a MY_API_KEY --mmc /path/to/vehicle-*.jpg"""
327
328
parser.add_argument("--engine-config", help="Engine configuration.")
329
parser.add_argument(
330
"--crop-lp", type=Path, help="Save cropped license plates to folder."
331
)
332
parser.add_argument(
333
"--crop-vehicle", type=Path, help="Save cropped vehicles to folder."
334
)
335
parser.add_argument("-o", "--output-file", type=Path, help="Save result to file.")
336
parser.add_argument(
337
"--format",
338
help="Format of the result.",
339
default="json",
340
choices="json csv".split(),
341
)
342
parser.add_argument(
343
"--mmc",
344
action="store_true",
345
help="Predict vehicle make and model. Only available to paying users.",
346
)
347
parser.add_argument(
348
"--show-boxes",
349
action="store_true",
350
help="Draw bounding boxes around license plates and display the resulting image.",
351
)
352
parser.add_argument(
353
"--annotate-images",
354
action="store_true",
355
help="Draw bounding boxes around license plates and save the resulting image.",
356
)
357
parser.add_argument(
358
"--split-image",
359
action="store_true",
360
help="Do extra lookups on parts of the image. Useful on high resolution images.",
361
)
362
363
parser.add_argument("--split-x", type=int, default=0, help="Splits on the x-axis")
364
365
parser.add_argument("--split-y", type=int, default=0, help="Splits on the y-axis")
366
367
parser.add_argument(
368
"--split-overlap",
369
type=int,
370
default=10,
371
help="Percentage of window overlap when splitting",
372
)
373
374
375
def draw_bb(im, data, new_size=(1920, 1050), text_func=None):
376
draw = ImageDraw.Draw(im)
377
font_path = Path("assets/DejaVuSansMono.ttf")
378
if font_path.exists():
379
font = ImageFont.truetype(str(font_path), 10)
380
else:
381
font = ImageFont.load_default()
382
rect_color = (0, 255, 0)
383
for result in data:
384
b = result["box"]
385
coord = [(b["xmin"], b["ymin"]), (b["xmax"], b["ymax"])]
386
draw.rectangle(coord, outline=rect_color)
387
draw.rectangle(
388
((coord[0][0] - 1, coord[0][1] - 1), (coord[1][0] - 1, coord[1][1] - 1)),
389
outline=rect_color,
390
)
391
draw.rectangle(
392
((coord[0][0] - 2, coord[0][1] - 2), (coord[1][0] - 2, coord[1][1] - 2)),
393
outline=rect_color,
394
)
395
if text_func:
396
text = text_func(result)
397
(text_width, text_height) = font.font.getsize(text)[0]
398
margin = math.ceil(0.05 * text_height)
399
draw.rectangle(
400
[
401
(b["xmin"] - margin, b["ymin"] - text_height - 2 * margin),
402
(b["xmin"] + text_width + 2 * margin, b["ymin"]),
403
],
404
fill="white",
405
)
406
draw.text(
407
(b["xmin"] + margin, b["ymin"] - text_height - margin),
408
text,
409
fill="black",
410
font=font,
411
)
412
413
if new_size:
414
im = im.resize(new_size)
415
return im
416
417
418
def text_function(result):
419
return result["plate"]
420
421
422
def bb_iou(a, b):
423
# determine the (x, y)-coordinates of the intersection rectangle
424
x_a = max(a["xmin"], b["xmin"])
425
y_a = max(a["ymin"], b["ymin"])
426
x_b = min(a["xmax"], b["xmax"])
427
y_b = min(a["ymax"], b["ymax"])
428
429
# compute the area of both the prediction and ground-truth
430
# rectangles
431
area_a = (a["xmax"] - a["xmin"]) * (a["ymax"] - a["ymin"])
432
area_b = (b["xmax"] - b["xmin"]) * (b["ymax"] - b["ymin"])
433
434
# compute the area of intersection rectangle
435
area_inter = max(0, x_b - x_a) * max(0, y_b - y_a)
436
return area_inter / float(max(area_a + area_b - area_inter, 1))
437
438
439
def clean_objs(objects, threshold=0.1):
440
# Only keep the ones with best score or no overlap
441
for o1, o2 in combinations(objects, 2):
442
if (
443
"remove" in o1
444
or "remove" in o2
445
or bb_iou(o1["box"], o2["box"]) <= threshold
446
):
447
continue
448
if o1["score"] > o2["score"]:
449
o2["remove"] = True
450
else:
451
o1["remove"] = True
452
return [x for x in objects if "remove" not in x]
453
454
455
def merge_results(images):
456
result = dict(results=[])
457
for data in images:
458
for item in data["prediction"]["results"]:
459
result["results"].append(item)
460
for b in [item["box"], item["vehicle"].get("box", {})]:
461
b["ymin"] += data["y"]
462
b["xmin"] += data["x"]
463
b["ymax"] += data["y"]
464
b["xmax"] += data["x"]
465
result["results"] = clean_objs(result["results"])
466
return result
467
468
469
def inside(a, b):
470
return (
471
a["xmin"] > b["xmin"]
472
and a["ymin"] > b["ymin"]
473
and a["xmax"] < b["xmax"]
474
and a["ymax"] < b["ymax"]
475
)
476
477
478
def post_processing(results):
479
new_list = []
480
for item in results["results"]:
481
if item["score"] < 0.2 and any(
482
[inside(x["box"], item["box"]) for x in results["results"] if x != item]
483
):
484
continue
485
new_list.append(item)
486
results["results"] = new_list
487
return results
488
489
490
def output_image(args, path, results):
491
if args.show_boxes or args.annotate_images and "results" in results:
492
image = Image.open(path)
493
annotated_image = draw_bb(image, results["results"], None, text_function)
494
if args.show_boxes:
495
annotated_image.show()
496
if args.annotate_images:
497
annotated_image.save(path.with_name(f"{path.stem}_annotated{path.suffix}"))
498
if args.crop_lp or args.crop_vehicle:
499
save_cropped(results, path, args)
500
501
502
def process_split_image(path, args, engine_config):
503
if args.split_x == 0 or args.split_y == 0:
504
raise ValueError("Please specify --split-x or --split-y")
505
506
# Predictions
507
fp = Image.open(path)
508
if fp.mode != "RGB":
509
fp = fp.convert("RGB")
510
images = [((0, 0), fp)] # Entire image
511
512
overlap_pct = args.split_overlap
513
514
window_width = fp.width / (args.split_x + 1)
515
window_height = fp.height / (args.split_y + 1)
516
517
overlap_width = int(window_width * overlap_pct / 100)
518
overlap_height = int(window_height * overlap_pct / 100)
519
520
draw = ImageDraw.Draw(fp)
521
522
for i in range(args.split_x + 1):
523
for j in range(args.split_y + 1):
524
ymin = j * window_height
525
ymax = ymin + window_height
526
527
xmin = i * window_width
528
xmax = xmin + window_width
529
530
# Add x-axis Overlap
531
if i == 0: # Overlap `end` of first Window only
532
xmax = xmax + overlap_width
533
elif i == args.split_x: # Overlap `start` of last Window only
534
xmin = xmin - overlap_width
535
else: # Overlap both `start` and `end` of middle Windows
536
xmin = xmin - overlap_width
537
xmax = xmax + overlap_width
538
539
# Add y-axis Overlap
540
if j == 0: # Overlap `bottom` of first Window only
541
ymax = ymax + overlap_height
542
pass
543
elif j == args.split_y: # Overlap `top` of last Window only
544
ymin = ymin - overlap_height
545
pass
546
else: # Overlap both `top` and `bottom` of middle Windows
547
ymin = ymin - overlap_height
548
ymax = ymax + overlap_height
549
550
images.append(((xmin, ymin), fp.crop((xmin, ymin, xmax, ymax))))
551
552
# Inference
553
api_results = {}
554
results = []
555
usage = []
556
camera_ids = []
557
timestamps = []
558
processing_times = []
559
for (x, y), im in images:
560
im_bytes = io.BytesIO()
561
im.save(im_bytes, "JPEG", quality=95)
562
im_bytes.seek(0)
563
api_res = recognition_api(
564
im_bytes,
565
args.regions,
566
args.api_key,
567
args.sdk_url,
568
config=engine_config,
569
camera_id=args.camera_id,
570
mmc=args.mmc,
571
)
572
results.append(dict(prediction=api_res, x=x, y=y))
573
if "usage" in api_res:
574
usage.append(api_res["usage"])
575
camera_ids.append(api_res["camera_id"])
576
timestamps.append(api_res["timestamp"])
577
processing_times.append(api_res["processing_time"])
578
579
api_results["filename"] = Path(path).name
580
api_results["timestamp"] = timestamps[len(timestamps) - 1]
581
api_results["camera_id"] = camera_ids[len(camera_ids) - 1]
582
results = post_processing(merge_results(results))
583
results = OrderedDict(list(api_results.items()) + list(results.items()))
584
if len(usage):
585
results["usage"] = usage[len(usage) - 1]
586
results["processing_time"] = round(sum(processing_times), 3)
587
588
# Set bounding box padding
589
for item in results["results"]:
590
# Decrease padding size for large bounding boxes
591
b = item["box"]
592
width, height = b["xmax"] - b["xmin"], b["ymax"] - b["ymin"]
593
padding_x = int(max(0, width * (0.3 * math.exp(-10 * width / fp.width))))
594
padding_y = int(max(0, height * (0.3 * math.exp(-10 * height / fp.height))))
595
b["xmin"] = b["xmin"] - padding_x
596
b["ymin"] = b["ymin"] - padding_y
597
b["xmax"] = b["xmax"] + padding_x
598
b["ymax"] = b["ymax"] + padding_y
599
600
output_image(args, path, results)
601
return results
602
603
604
def process_full_image(path, args, engine_config):
605
with open(path, "rb") as fp:
606
api_res = recognition_api(
607
fp,
608
args.regions,
609
args.api_key,
610
args.sdk_url,
611
config=engine_config,
612
camera_id=args.camera_id,
613
mmc=args.mmc,
614
)
615
616
output_image(args, path, api_res)
617
return api_res
618
619
620
def main():
621
args = parse_arguments(custom_args)
622
paths = args.files
623
624
results = []
625
engine_config = {}
626
if args.engine_config:
627
try:
628
engine_config = json.loads(args.engine_config)
629
except json.JSONDecodeError as e:
630
print(e)
631
return
632
for path in paths:
633
if not path.exists():
634
continue
635
if Path(path).is_file():
636
if args.split_image:
637
results.append(process_split_image(path, args, engine_config))
638
else:
639
results.append(process_full_image(path, args, engine_config))
640
if args.output_file:
641
save_results(results, args)
642
else:
643
print(json.dumps(results, indent=2))
644
645
646
if __name__ == "__main__":
647
main()
648
649