Path: blob/master/plate_recognition.py
640 views
#!/usr/bin/env python12import argparse3import csv4import io5import json6import math7import sys8import time9from collections import OrderedDict10from itertools import combinations11from pathlib import Path1213import requests14from PIL import Image, ImageDraw, ImageFont1516if sys.version_info.major == 3 and sys.version_info.minor >= 10:17from collections.abc import MutableMapping18else:19# ruff: noqa20from collections import MutableMapping # type: ignore[attr-defined]212223def parse_arguments(args_hook=lambda _: _):24parser = argparse.ArgumentParser(25description="Read license plates from images and output the result as JSON or CSV.",26epilog="""Examples:27Process images from a folder:28python plate_recognition.py -a MY_API_KEY /path/to/vehicle-*.jpg29Use the Snapshot SDK instead of the Cloud Api:30python plate_recognition.py -s http://localhost:8080 /path/to/vehicle-*.jpg31Specify Camera ID and/or two Regions:32plate_recognition.py -a MY_API_KEY --camera-id Camera1 -r us-ca -r th-37 /path/to/vehicle-*.jpg""",33formatter_class=argparse.RawTextHelpFormatter,34)35parser.add_argument("-a", "--api-key", help="Your API key.", required=False)36parser.add_argument(37"-r",38"--regions",39help="Match the license plate pattern of a specific region",40required=False,41action="append",42)43parser.add_argument(44"-s",45"--sdk-url",46help="Url to self hosted sdk For example, http://localhost:8080",47required=False,48)49parser.add_argument(50"--camera-id", help="Name of the source camera.", required=False51)52parser.add_argument("files", nargs="+", type=Path, help="Path to vehicle images")53args_hook(parser)54args = parser.parse_args()55if not args.sdk_url and not args.api_key:56raise Exception("api-key is required")57return args585960_session = None616263def recognition_api(64fp,65regions=None,66api_key=None,67sdk_url=None,68config=None,69camera_id=None,70timestamp=None,71mmc=None,72exit_on_error=True,73):74if regions is None:75regions = []76if config is None:77config = {}78global _session79data = dict(regions=regions, config=json.dumps(config))80if camera_id:81data["camera_id"] = camera_id82if mmc:83data["mmc"] = mmc84if timestamp:85data["timestamp"] = timestamp86response = None87if sdk_url:88fp.seek(0)89if "container-api" in sdk_url:90response = requests.post(91"https://container-api.parkpow.com/api/v1/predict/",92files=dict(image=fp),93headers={"Authorization": "Token " + api_key},94)95else:96response = requests.post(97sdk_url + "/v1/plate-reader/", files=dict(upload=fp), data=data98)99else:100if not _session:101_session = requests.Session()102_session.headers.update({"Authorization": "Token " + api_key})103for _ in range(3):104fp.seek(0)105response = _session.post(106"https://api.platerecognizer.com/v1/plate-reader/",107files=dict(upload=fp),108data=data,109)110if response.status_code == 429: # Max calls per second reached111time.sleep(1)112else:113break114115if response is None:116return {}117if response.status_code < 200 or response.status_code > 300:118print(response.text)119if exit_on_error:120exit(1)121return response.json(object_pairs_hook=OrderedDict)122123124def flatten_dict(d, parent_key="", sep="_"):125items = []126for k, v in d.items():127new_key = parent_key + sep + k if parent_key else k128if isinstance(v, MutableMapping):129items.extend(flatten_dict(v, new_key, sep=sep).items())130else:131if isinstance(v, list):132items.append((new_key, json.dumps(v)))133else:134items.append((new_key, v))135return dict(items)136137138def flatten(result):139plates = result["results"]140del result["results"]141if "usage" in result:142del result["usage"]143flattened_data = [] # Accumulate flattened data for each plate144if not plates:145data = result.copy()146data.update(flatten_dict({})) # Assuming flatten_dict can handle an empty dict147flattened_data.append(data)148else:149for plate in plates:150data = result.copy()151data.update(flatten_dict(plate))152flattened_data.append(data)153return flattened_data154155156def save_cropped(api_res, path, args):157dest = args.crop_lp or args.crop_vehicle158dest.mkdir(exist_ok=True, parents=True)159image = Image.open(path).convert("RGB")160for i, result in enumerate(api_res.get("results", []), 1):161if args.crop_lp and result["plate"]:162box = result["box"]163cropped = image.crop((box["xmin"], box["ymin"], box["xmax"], box["ymax"]))164cropped.save(165dest / f'{result["plate"]}_{result["region"]["code"]}_{path.name}'166)167if args.crop_vehicle and result["vehicle"]["score"]:168box = result["vehicle"]["box"]169cropped = image.crop((box["xmin"], box["ymin"], box["xmax"], box["ymax"]))170make_model = result.get("model_make", [None])[0]171filename = f'{i}_{result["vehicle"]["type"]}_{path.name}'172if make_model:173filename = f'{make_model["make"]}_{make_model["model"]}_' + filename174cropped.save(dest / filename)175176177def is_detection_mode_vehicle(engine_config):178if not engine_config:179return False180181try:182engine_config_dict = json.loads(engine_config)183except (TypeError, json.JSONDecodeError):184return False185186return engine_config_dict.get("detection_mode") == "vehicle"187188189def transform_result(input_data):190output = OrderedDict(191[192("filename", input_data.get("filename")),193("timestamp", input_data.get("timestamp")),194("camera_id", input_data.get("camera_id")),195("results", []),196]197)198199no_plate_box = OrderedDict(200[("xmin", None), ("ymin", None), ("xmax", None), ("ymax", None)]201)202no_plate_region = OrderedDict([("code", None), ("score", None)])203204for result in input_data.get("results", []):205# Process plate data if available206plate_data = result.get("plate")207if plate_data:208props = plate_data.get("props", {})209plate_candidates = props.get("plate", [])210top_plate = plate_candidates[0] if plate_candidates else None211212region_candidates = props.get("region", [])213top_region = region_candidates[0] if region_candidates else None214region_entry = (215OrderedDict(216[217("code", top_region.get("value")),218("score", top_region.get("score")),219]220)221if top_region222else None223)224else:225plate_candidates = []226top_plate = None227region_entry = None228229# Skip if vehicle data is missing230vehicle_data = result.get("vehicle")231if not vehicle_data:232continue233234# Process vehicle properties235v_props = vehicle_data.get("props", {})236model_make = v_props.get("make_model", [])237238colors = [239OrderedDict([("color", c.get("value")), ("score", c.get("score"))])240for c in v_props.get("color", [])241]242orientations = [243OrderedDict([("orientation", o.get("value")), ("score", o.get("score"))])244for o in v_props.get("orientation", [])245]246247candidates = [248OrderedDict([("score", cand.get("score")), ("plate", cand.get("value"))])249for cand in plate_candidates250]251252vehicle_entry = OrderedDict(253[254("score", vehicle_data.get("score")),255("type", vehicle_data.get("type")),256("box", vehicle_data.get("box")),257]258)259260transformed_result = OrderedDict(261[262("box", plate_data.get("box") if plate_data else no_plate_box),263("plate", top_plate.get("value") if top_plate else None),264("region", region_entry if top_plate else no_plate_region),265("score", top_plate.get("score") if top_plate else None),266("candidates", candidates if plate_data else None),267("dscore", plate_data.get("score") if plate_data else None),268("vehicle", vehicle_entry),269("model_make", model_make),270("color", colors),271("orientation", orientations),272]273)274275output["results"].append(transformed_result)276277output["usage"] = input_data.get("usage", {})278output["processing_time"] = input_data.get("processing_time")279return output280281282def save_results(results, args):283path = args.output_file284if not Path(path).parent.exists():285print("%s does not exist" % path)286return287if not results:288return289if args.format == "json":290with open(path, "w") as fp:291json.dump(results, fp)292elif args.format == "csv":293fieldnames = []294for result in results[:10]:295data = (296transform_result(result)297if is_detection_mode_vehicle(args.engine_config)298else result299)300candidates = flatten(data.copy())301for candidate in candidates:302if len(fieldnames) < len(candidate):303fieldnames = candidate.keys()304with open(path, "w", newline="") as fp:305writer = csv.DictWriter(fp, fieldnames=fieldnames)306writer.writeheader()307for result in results:308result_data = (309transform_result(result)310if is_detection_mode_vehicle(args.engine_config)311else result312)313flattened_results = flatten(result_data)314for flattened_result in flattened_results:315writer.writerow(flattened_result)316317318def custom_args(parser):319parser.epilog += """320Specify additional engine configuration:321plate_recognition.py -a MY_API_KEY --engine-config \'{"region":"strict"}\' /path/to/vehicle-*.jpg322Specify an output file and format for the results:323plate_recognition.py -a MY_API_KEY -o data.csv --format csv /path/to/vehicle-*.jpg324Enable Make Model and Color prediction:325plate_recognition.py -a MY_API_KEY --mmc /path/to/vehicle-*.jpg"""326327parser.add_argument("--engine-config", help="Engine configuration.")328parser.add_argument(329"--crop-lp", type=Path, help="Save cropped license plates to folder."330)331parser.add_argument(332"--crop-vehicle", type=Path, help="Save cropped vehicles to folder."333)334parser.add_argument("-o", "--output-file", type=Path, help="Save result to file.")335parser.add_argument(336"--format",337help="Format of the result.",338default="json",339choices="json csv".split(),340)341parser.add_argument(342"--mmc",343action="store_true",344help="Predict vehicle make and model. Only available to paying users.",345)346parser.add_argument(347"--show-boxes",348action="store_true",349help="Draw bounding boxes around license plates and display the resulting image.",350)351parser.add_argument(352"--annotate-images",353action="store_true",354help="Draw bounding boxes around license plates and save the resulting image.",355)356parser.add_argument(357"--split-image",358action="store_true",359help="Do extra lookups on parts of the image. Useful on high resolution images.",360)361362parser.add_argument("--split-x", type=int, default=0, help="Splits on the x-axis")363364parser.add_argument("--split-y", type=int, default=0, help="Splits on the y-axis")365366parser.add_argument(367"--split-overlap",368type=int,369default=10,370help="Percentage of window overlap when splitting",371)372373374def draw_bb(im, data, new_size=(1920, 1050), text_func=None):375draw = ImageDraw.Draw(im)376font_path = Path("assets/DejaVuSansMono.ttf")377if font_path.exists():378font = ImageFont.truetype(str(font_path), 10)379else:380font = ImageFont.load_default()381rect_color = (0, 255, 0)382for result in data:383b = result["box"]384coord = [(b["xmin"], b["ymin"]), (b["xmax"], b["ymax"])]385draw.rectangle(coord, outline=rect_color)386draw.rectangle(387((coord[0][0] - 1, coord[0][1] - 1), (coord[1][0] - 1, coord[1][1] - 1)),388outline=rect_color,389)390draw.rectangle(391((coord[0][0] - 2, coord[0][1] - 2), (coord[1][0] - 2, coord[1][1] - 2)),392outline=rect_color,393)394if text_func:395text = text_func(result)396(text_width, text_height) = font.font.getsize(text)[0]397margin = math.ceil(0.05 * text_height)398draw.rectangle(399[400(b["xmin"] - margin, b["ymin"] - text_height - 2 * margin),401(b["xmin"] + text_width + 2 * margin, b["ymin"]),402],403fill="white",404)405draw.text(406(b["xmin"] + margin, b["ymin"] - text_height - margin),407text,408fill="black",409font=font,410)411412if new_size:413im = im.resize(new_size)414return im415416417def text_function(result):418return result["plate"]419420421def bb_iou(a, b):422# determine the (x, y)-coordinates of the intersection rectangle423x_a = max(a["xmin"], b["xmin"])424y_a = max(a["ymin"], b["ymin"])425x_b = min(a["xmax"], b["xmax"])426y_b = min(a["ymax"], b["ymax"])427428# compute the area of both the prediction and ground-truth429# rectangles430area_a = (a["xmax"] - a["xmin"]) * (a["ymax"] - a["ymin"])431area_b = (b["xmax"] - b["xmin"]) * (b["ymax"] - b["ymin"])432433# compute the area of intersection rectangle434area_inter = max(0, x_b - x_a) * max(0, y_b - y_a)435return area_inter / float(max(area_a + area_b - area_inter, 1))436437438def clean_objs(objects, threshold=0.1):439# Only keep the ones with best score or no overlap440for o1, o2 in combinations(objects, 2):441if (442"remove" in o1443or "remove" in o2444or bb_iou(o1["box"], o2["box"]) <= threshold445):446continue447if o1["score"] > o2["score"]:448o2["remove"] = True449else:450o1["remove"] = True451return [x for x in objects if "remove" not in x]452453454def merge_results(images):455result = dict(results=[])456for data in images:457for item in data["prediction"]["results"]:458result["results"].append(item)459for b in [item["box"], item["vehicle"].get("box", {})]:460b["ymin"] += data["y"]461b["xmin"] += data["x"]462b["ymax"] += data["y"]463b["xmax"] += data["x"]464result["results"] = clean_objs(result["results"])465return result466467468def inside(a, b):469return (470a["xmin"] > b["xmin"]471and a["ymin"] > b["ymin"]472and a["xmax"] < b["xmax"]473and a["ymax"] < b["ymax"]474)475476477def post_processing(results):478new_list = []479for item in results["results"]:480if item["score"] < 0.2 and any(481[inside(x["box"], item["box"]) for x in results["results"] if x != item]482):483continue484new_list.append(item)485results["results"] = new_list486return results487488489def output_image(args, path, results):490if args.show_boxes or args.annotate_images and "results" in results:491image = Image.open(path)492annotated_image = draw_bb(image, results["results"], None, text_function)493if args.show_boxes:494annotated_image.show()495if args.annotate_images:496annotated_image.save(path.with_name(f"{path.stem}_annotated{path.suffix}"))497if args.crop_lp or args.crop_vehicle:498save_cropped(results, path, args)499500501def process_split_image(path, args, engine_config):502if args.split_x == 0 or args.split_y == 0:503raise ValueError("Please specify --split-x or --split-y")504505# Predictions506fp = Image.open(path)507if fp.mode != "RGB":508fp = fp.convert("RGB")509images = [((0, 0), fp)] # Entire image510511overlap_pct = args.split_overlap512513window_width = fp.width / (args.split_x + 1)514window_height = fp.height / (args.split_y + 1)515516overlap_width = int(window_width * overlap_pct / 100)517overlap_height = int(window_height * overlap_pct / 100)518519draw = ImageDraw.Draw(fp)520521for i in range(args.split_x + 1):522for j in range(args.split_y + 1):523ymin = j * window_height524ymax = ymin + window_height525526xmin = i * window_width527xmax = xmin + window_width528529# Add x-axis Overlap530if i == 0: # Overlap `end` of first Window only531xmax = xmax + overlap_width532elif i == args.split_x: # Overlap `start` of last Window only533xmin = xmin - overlap_width534else: # Overlap both `start` and `end` of middle Windows535xmin = xmin - overlap_width536xmax = xmax + overlap_width537538# Add y-axis Overlap539if j == 0: # Overlap `bottom` of first Window only540ymax = ymax + overlap_height541pass542elif j == args.split_y: # Overlap `top` of last Window only543ymin = ymin - overlap_height544pass545else: # Overlap both `top` and `bottom` of middle Windows546ymin = ymin - overlap_height547ymax = ymax + overlap_height548549images.append(((xmin, ymin), fp.crop((xmin, ymin, xmax, ymax))))550551# Inference552api_results = {}553results = []554usage = []555camera_ids = []556timestamps = []557processing_times = []558for (x, y), im in images:559im_bytes = io.BytesIO()560im.save(im_bytes, "JPEG", quality=95)561im_bytes.seek(0)562api_res = recognition_api(563im_bytes,564args.regions,565args.api_key,566args.sdk_url,567config=engine_config,568camera_id=args.camera_id,569mmc=args.mmc,570)571results.append(dict(prediction=api_res, x=x, y=y))572if "usage" in api_res:573usage.append(api_res["usage"])574camera_ids.append(api_res["camera_id"])575timestamps.append(api_res["timestamp"])576processing_times.append(api_res["processing_time"])577578api_results["filename"] = Path(path).name579api_results["timestamp"] = timestamps[len(timestamps) - 1]580api_results["camera_id"] = camera_ids[len(camera_ids) - 1]581results = post_processing(merge_results(results))582results = OrderedDict(list(api_results.items()) + list(results.items()))583if len(usage):584results["usage"] = usage[len(usage) - 1]585results["processing_time"] = round(sum(processing_times), 3)586587# Set bounding box padding588for item in results["results"]:589# Decrease padding size for large bounding boxes590b = item["box"]591width, height = b["xmax"] - b["xmin"], b["ymax"] - b["ymin"]592padding_x = int(max(0, width * (0.3 * math.exp(-10 * width / fp.width))))593padding_y = int(max(0, height * (0.3 * math.exp(-10 * height / fp.height))))594b["xmin"] = b["xmin"] - padding_x595b["ymin"] = b["ymin"] - padding_y596b["xmax"] = b["xmax"] + padding_x597b["ymax"] = b["ymax"] + padding_y598599output_image(args, path, results)600return results601602603def process_full_image(path, args, engine_config):604with open(path, "rb") as fp:605api_res = recognition_api(606fp,607args.regions,608args.api_key,609args.sdk_url,610config=engine_config,611camera_id=args.camera_id,612mmc=args.mmc,613)614615output_image(args, path, api_res)616return api_res617618619def main():620args = parse_arguments(custom_args)621paths = args.files622623results = []624engine_config = {}625if args.engine_config:626try:627engine_config = json.loads(args.engine_config)628except json.JSONDecodeError as e:629print(e)630return631for path in paths:632if not path.exists():633continue634if Path(path).is_file():635if args.split_image:636results.append(process_split_image(path, args, engine_config))637else:638results.append(process_full_image(path, args, engine_config))639if args.output_file:640save_results(results, args)641else:642print(json.dumps(results, indent=2))643644645if __name__ == "__main__":646main()647648649