"""
Common modules
"""
import json
import math
import platform
import warnings
from collections import OrderedDict, namedtuple
from copy import copy
from pathlib import Path
import cv2
import numpy as np
import pandas as pd
import requests
import torch
import torch.nn as nn
from PIL import Image
from torch.cuda import amp
from utils.datasets import exif_transpose, letterbox
from utils.general import (LOGGER, check_requirements, check_suffix, check_version, colorstr, increment_path,
make_divisible, non_max_suppression, scale_coords, xywh2xyxy, xyxy2xywh)
from utils.plots import Annotator, colors, save_one_box
from utils.torch_utils import copy_attr, time_sync
def autopad(k, p=None):
if p is None:
p = k // 2 if isinstance(k, int) else [x // 2 for x in k]
return p
class Conv(nn.Module):
def __init__(self, c1, c2, k=1, s=1, p=None, g=1, act=True):
super().__init__()
self.conv = nn.Conv2d(c1, c2, k, s, autopad(k, p), groups=g, bias=False)
self.bn = nn.BatchNorm2d(c2)
self.act = nn.SiLU() if act is True else (act if isinstance(act, nn.Module) else nn.Identity())
def forward(self, x):
return self.act(self.bn(self.conv(x)))
def forward_fuse(self, x):
return self.act(self.conv(x))
class DWConv(Conv):
def __init__(self, c1, c2, k=1, s=1, act=True):
super().__init__(c1, c2, k, s, g=math.gcd(c1, c2), act=act)
class TransformerLayer(nn.Module):
def __init__(self, c, num_heads):
super().__init__()
self.q = nn.Linear(c, c, bias=False)
self.k = nn.Linear(c, c, bias=False)
self.v = nn.Linear(c, c, bias=False)
self.ma = nn.MultiheadAttention(embed_dim=c, num_heads=num_heads)
self.fc1 = nn.Linear(c, c, bias=False)
self.fc2 = nn.Linear(c, c, bias=False)
def forward(self, x):
x = self.ma(self.q(x), self.k(x), self.v(x))[0] + x
x = self.fc2(self.fc1(x)) + x
return x
class TransformerBlock(nn.Module):
def __init__(self, c1, c2, num_heads, num_layers):
super().__init__()
self.conv = None
if c1 != c2:
self.conv = Conv(c1, c2)
self.linear = nn.Linear(c2, c2)
self.tr = nn.Sequential(*(TransformerLayer(c2, num_heads) for _ in range(num_layers)))
self.c2 = c2
def forward(self, x):
if self.conv is not None:
x = self.conv(x)
b, _, w, h = x.shape
p = x.flatten(2).permute(2, 0, 1)
return self.tr(p + self.linear(p)).permute(1, 2, 0).reshape(b, self.c2, w, h)
class Bottleneck(nn.Module):
def __init__(self, c1, c2, shortcut=True, g=1, e=0.5):
super().__init__()
c_ = int(c2 * e)
self.cv1 = Conv(c1, c_, 1, 1)
self.cv2 = Conv(c_, c2, 3, 1, g=g)
self.add = shortcut and c1 == c2
def forward(self, x):
return x + self.cv2(self.cv1(x)) if self.add else self.cv2(self.cv1(x))
class BottleneckCSP(nn.Module):
def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5):
super().__init__()
c_ = int(c2 * e)
self.cv1 = Conv(c1, c_, 1, 1)
self.cv2 = nn.Conv2d(c1, c_, 1, 1, bias=False)
self.cv3 = nn.Conv2d(c_, c_, 1, 1, bias=False)
self.cv4 = Conv(2 * c_, c2, 1, 1)
self.bn = nn.BatchNorm2d(2 * c_)
self.act = nn.SiLU()
self.m = nn.Sequential(*(Bottleneck(c_, c_, shortcut, g, e=1.0) for _ in range(n)))
def forward(self, x):
y1 = self.cv3(self.m(self.cv1(x)))
y2 = self.cv2(x)
return self.cv4(self.act(self.bn(torch.cat((y1, y2), dim=1))))
class C3(nn.Module):
def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5):
super().__init__()
c_ = int(c2 * e)
self.cv1 = Conv(c1, c_, 1, 1)
self.cv2 = Conv(c1, c_, 1, 1)
self.cv3 = Conv(2 * c_, c2, 1)
self.m = nn.Sequential(*(Bottleneck(c_, c_, shortcut, g, e=1.0) for _ in range(n)))
def forward(self, x):
return self.cv3(torch.cat((self.m(self.cv1(x)), self.cv2(x)), dim=1))
class C3TR(C3):
def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5):
super().__init__(c1, c2, n, shortcut, g, e)
c_ = int(c2 * e)
self.m = TransformerBlock(c_, c_, 4, n)
class C3SPP(C3):
def __init__(self, c1, c2, k=(5, 9, 13), n=1, shortcut=True, g=1, e=0.5):
super().__init__(c1, c2, n, shortcut, g, e)
c_ = int(c2 * e)
self.m = SPP(c_, c_, k)
class C3Ghost(C3):
def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5):
super().__init__(c1, c2, n, shortcut, g, e)
c_ = int(c2 * e)
self.m = nn.Sequential(*(GhostBottleneck(c_, c_) for _ in range(n)))
class SPP(nn.Module):
def __init__(self, c1, c2, k=(5, 9, 13)):
super().__init__()
c_ = c1 // 2
self.cv1 = Conv(c1, c_, 1, 1)
self.cv2 = Conv(c_ * (len(k) + 1), c2, 1, 1)
self.m = nn.ModuleList([nn.MaxPool2d(kernel_size=x, stride=1, padding=x // 2) for x in k])
def forward(self, x):
x = self.cv1(x)
with warnings.catch_warnings():
warnings.simplefilter('ignore')
return self.cv2(torch.cat([x] + [m(x) for m in self.m], 1))
class SPPF(nn.Module):
def __init__(self, c1, c2, k=5):
super().__init__()
c_ = c1 // 2
self.cv1 = Conv(c1, c_, 1, 1)
self.cv2 = Conv(c_ * 4, c2, 1, 1)
self.m = nn.MaxPool2d(kernel_size=k, stride=1, padding=k // 2)
def forward(self, x):
x = self.cv1(x)
with warnings.catch_warnings():
warnings.simplefilter('ignore')
y1 = self.m(x)
y2 = self.m(y1)
return self.cv2(torch.cat([x, y1, y2, self.m(y2)], 1))
class Focus(nn.Module):
def __init__(self, c1, c2, k=1, s=1, p=None, g=1, act=True):
super().__init__()
self.conv = Conv(c1 * 4, c2, k, s, p, g, act)
def forward(self, x):
return self.conv(torch.cat([x[..., ::2, ::2], x[..., 1::2, ::2], x[..., ::2, 1::2], x[..., 1::2, 1::2]], 1))
class GhostConv(nn.Module):
def __init__(self, c1, c2, k=1, s=1, g=1, act=True):
super().__init__()
c_ = c2 // 2
self.cv1 = Conv(c1, c_, k, s, None, g, act)
self.cv2 = Conv(c_, c_, 5, 1, None, c_, act)
def forward(self, x):
y = self.cv1(x)
return torch.cat([y, self.cv2(y)], 1)
class GhostBottleneck(nn.Module):
def __init__(self, c1, c2, k=3, s=1):
super().__init__()
c_ = c2 // 2
self.conv = nn.Sequential(GhostConv(c1, c_, 1, 1),
DWConv(c_, c_, k, s, act=False) if s == 2 else nn.Identity(),
GhostConv(c_, c2, 1, 1, act=False))
self.shortcut = nn.Sequential(DWConv(c1, c1, k, s, act=False),
Conv(c1, c2, 1, 1, act=False)) if s == 2 else nn.Identity()
def forward(self, x):
return self.conv(x) + self.shortcut(x)
class Contract(nn.Module):
def __init__(self, gain=2):
super().__init__()
self.gain = gain
def forward(self, x):
b, c, h, w = x.size()
s = self.gain
x = x.view(b, c, h // s, s, w // s, s)
x = x.permute(0, 3, 5, 1, 2, 4).contiguous()
return x.view(b, c * s * s, h // s, w // s)
class Expand(nn.Module):
def __init__(self, gain=2):
super().__init__()
self.gain = gain
def forward(self, x):
b, c, h, w = x.size()
s = self.gain
x = x.view(b, s, s, c // s ** 2, h, w)
x = x.permute(0, 3, 4, 1, 5, 2).contiguous()
return x.view(b, c // s ** 2, h * s, w * s)
class Concat(nn.Module):
def __init__(self, dimension=1):
super().__init__()
self.d = dimension
def forward(self, x):
return torch.cat(x, self.d)
class DetectMultiBackend(nn.Module):
def __init__(self, weights='yolov5s.pt', device=None, dnn=False):
from models.experimental import attempt_download, attempt_load
super().__init__()
w = str(weights[0] if isinstance(weights, list) else weights)
suffix = Path(w).suffix.lower()
suffixes = ['.pt', '.torchscript', '.onnx', '.engine', '.tflite', '.pb', '', '.mlmodel']
check_suffix(w, suffixes)
pt, jit, onnx, engine, tflite, pb, saved_model, coreml = (suffix == x for x in suffixes)
stride, names = 64, [f'class{i}' for i in range(1000)]
w = attempt_download(w)
if jit:
LOGGER.info(f'Loading {w} for TorchScript inference...')
extra_files = {'config.txt': ''}
model = torch.jit.load(w, _extra_files=extra_files)
if extra_files['config.txt']:
d = json.loads(extra_files['config.txt'])
stride, names = int(d['stride']), d['names']
elif pt:
model = attempt_load(weights if isinstance(weights, list) else w, map_location=device)
stride = int(model.stride.max())
names = model.module.names if hasattr(model, 'module') else model.names
self.model = model
elif coreml:
LOGGER.info(f'Loading {w} for CoreML inference...')
import coremltools as ct
model = ct.models.MLModel(w)
elif dnn:
LOGGER.info(f'Loading {w} for ONNX OpenCV DNN inference...')
check_requirements(('opencv-python>=4.5.4',))
net = cv2.dnn.readNetFromONNX(w)
elif onnx:
LOGGER.info(f'Loading {w} for ONNX Runtime inference...')
cuda = torch.cuda.is_available()
check_requirements(('onnx', 'onnxruntime-gpu' if cuda else 'onnxruntime'))
import onnxruntime
providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] if cuda else ['CPUExecutionProvider']
session = onnxruntime.InferenceSession(w, providers=providers)
elif engine:
LOGGER.info(f'Loading {w} for TensorRT inference...')
import tensorrt as trt
check_version(trt.__version__, '8.0.0', verbose=True)
Binding = namedtuple('Binding', ('name', 'dtype', 'shape', 'data', 'ptr'))
logger = trt.Logger(trt.Logger.INFO)
with open(w, 'rb') as f, trt.Runtime(logger) as runtime:
model = runtime.deserialize_cuda_engine(f.read())
bindings = OrderedDict()
for index in range(model.num_bindings):
name = model.get_binding_name(index)
dtype = trt.nptype(model.get_binding_dtype(index))
shape = tuple(model.get_binding_shape(index))
data = torch.from_numpy(np.empty(shape, dtype=np.dtype(dtype))).to(device)
bindings[name] = Binding(name, dtype, shape, data, int(data.data_ptr()))
binding_addrs = OrderedDict((n, d.ptr) for n, d in bindings.items())
context = model.create_execution_context()
batch_size = bindings['images'].shape[0]
else:
if pb:
LOGGER.info(f'Loading {w} for TensorFlow *.pb inference...')
import tensorflow as tf
def wrap_frozen_graph(gd, inputs, outputs):
x = tf.compat.v1.wrap_function(lambda: tf.compat.v1.import_graph_def(gd, name=""), [])
return x.prune(tf.nest.map_structure(x.graph.as_graph_element, inputs),
tf.nest.map_structure(x.graph.as_graph_element, outputs))
graph_def = tf.Graph().as_graph_def()
graph_def.ParseFromString(open(w, 'rb').read())
frozen_func = wrap_frozen_graph(gd=graph_def, inputs="x:0", outputs="Identity:0")
elif saved_model:
LOGGER.info(f'Loading {w} for TensorFlow saved_model inference...')
import tensorflow as tf
model = tf.keras.models.load_model(w)
elif tflite:
if 'edgetpu' in w.lower():
LOGGER.info(f'Loading {w} for TensorFlow Lite Edge TPU inference...')
import tflite_runtime.interpreter as tfli
delegate = {'Linux': 'libedgetpu.so.1',
'Darwin': 'libedgetpu.1.dylib',
'Windows': 'edgetpu.dll'}[platform.system()]
interpreter = tfli.Interpreter(model_path=w, experimental_delegates=[tfli.load_delegate(delegate)])
else:
LOGGER.info(f'Loading {w} for TensorFlow Lite inference...')
import tensorflow as tf
interpreter = tf.lite.Interpreter(model_path=w)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
self.__dict__.update(locals())
def forward(self, im, augment=False, visualize=False, val=False):
b, ch, h, w = im.shape
if self.pt or self.jit:
y = self.model(im) if self.jit else self.model(im, augment=augment, visualize=visualize)
return y if val else y[0]
elif self.coreml:
im = im.permute(0, 2, 3, 1).cpu().numpy()
im = Image.fromarray((im[0] * 255).astype('uint8'))
y = self.model.predict({'image': im})
box = xywh2xyxy(y['coordinates'] * [[w, h, w, h]])
conf, cls = y['confidence'].max(1), y['confidence'].argmax(1).astype(np.float)
y = np.concatenate((box, conf.reshape(-1, 1), cls.reshape(-1, 1)), 1)
elif self.onnx:
im = im.cpu().numpy()
if self.dnn:
self.net.setInput(im)
y = self.net.forward()
else:
y = self.session.run([self.session.get_outputs()[0].name], {self.session.get_inputs()[0].name: im})[0]
elif self.engine:
assert im.shape == self.bindings['images'].shape, (im.shape, self.bindings['images'].shape)
self.binding_addrs['images'] = int(im.data_ptr())
self.context.execute_v2(list(self.binding_addrs.values()))
y = self.bindings['output'].data
else:
im = im.permute(0, 2, 3, 1).cpu().numpy()
if self.pb:
y = self.frozen_func(x=self.tf.constant(im)).numpy()
elif self.saved_model:
y = self.model(im, training=False).numpy()
elif self.tflite:
input, output = self.input_details[0], self.output_details[0]
int8 = input['dtype'] == np.uint8
if int8:
scale, zero_point = input['quantization']
im = (im / scale + zero_point).astype(np.uint8)
self.interpreter.set_tensor(input['index'], im)
self.interpreter.invoke()
y = self.interpreter.get_tensor(output['index'])
if int8:
scale, zero_point = output['quantization']
y = (y.astype(np.float32) - zero_point) * scale
y[..., 0] *= w
y[..., 1] *= h
y[..., 2] *= w
y[..., 3] *= h
y = torch.tensor(y) if isinstance(y, np.ndarray) else y
return (y, []) if val else y
def warmup(self, imgsz=(1, 3, 640, 640), half=False):
if self.pt or self.engine or self.onnx:
if isinstance(self.device, torch.device) and self.device.type != 'cpu':
im = torch.zeros(*imgsz).to(self.device).type(torch.half if half else torch.float)
self.forward(im)
class AutoShape(nn.Module):
conf = 0.25
iou = 0.45
agnostic = False
multi_label = False
classes = None
max_det = 1000
amp = False
def __init__(self, model):
super().__init__()
LOGGER.info('Adding AutoShape... ')
copy_attr(self, model, include=('yaml', 'nc', 'hyp', 'names', 'stride', 'abc'), exclude=())
self.dmb = isinstance(model, DetectMultiBackend)
self.pt = not self.dmb or model.pt
self.model = model.eval()
def _apply(self, fn):
self = super()._apply(fn)
if self.pt:
m = self.model.model.model[-1] if self.dmb else self.model.model[-1]
m.stride = fn(m.stride)
m.grid = list(map(fn, m.grid))
if isinstance(m.anchor_grid, list):
m.anchor_grid = list(map(fn, m.anchor_grid))
return self
@torch.no_grad()
def forward(self, imgs, size=640, augment=False, profile=False):
t = [time_sync()]
p = next(self.model.parameters()) if self.pt else torch.zeros(1)
autocast = self.amp and (p.device.type != 'cpu')
if isinstance(imgs, torch.Tensor):
with amp.autocast(enabled=autocast):
return self.model(imgs.to(p.device).type_as(p), augment, profile)
n, imgs = (len(imgs), imgs) if isinstance(imgs, list) else (1, [imgs])
shape0, shape1, files = [], [], []
for i, im in enumerate(imgs):
f = f'image{i}'
if isinstance(im, (str, Path)):
im, f = Image.open(requests.get(im, stream=True).raw if str(im).startswith('http') else im), im
im = np.asarray(exif_transpose(im))
elif isinstance(im, Image.Image):
im, f = np.asarray(exif_transpose(im)), getattr(im, 'filename', f) or f
files.append(Path(f).with_suffix('.jpg').name)
if im.shape[0] < 5:
im = im.transpose((1, 2, 0))
im = im[..., :3] if im.ndim == 3 else np.tile(im[..., None], 3)
s = im.shape[:2]
shape0.append(s)
g = (size / max(s))
shape1.append([y * g for y in s])
imgs[i] = im if im.data.contiguous else np.ascontiguousarray(im)
shape1 = [make_divisible(x, self.stride) for x in np.stack(shape1, 0).max(0)]
x = [letterbox(im, new_shape=shape1 if self.pt else size, auto=False)[0] for im in imgs]
x = np.stack(x, 0) if n > 1 else x[0][None]
x = np.ascontiguousarray(x.transpose((0, 3, 1, 2)))
x = torch.from_numpy(x).to(p.device).type_as(p) / 255
t.append(time_sync())
with amp.autocast(enabled=autocast):
y = self.model(x, augment, profile)
t.append(time_sync())
y = non_max_suppression(y if self.dmb else y[0], self.conf, iou_thres=self.iou, classes=self.classes,
agnostic=self.agnostic, multi_label=self.multi_label, max_det=self.max_det)
for i in range(n):
scale_coords(shape1, y[i][:, :4], shape0[i])
t.append(time_sync())
return Detections(imgs, y, files, t, self.names, x.shape)
class Detections:
def __init__(self, imgs, pred, files, times=(0, 0, 0, 0), names=None, shape=None):
super().__init__()
d = pred[0].device
gn = [torch.tensor([*(im.shape[i] for i in [1, 0, 1, 0]), 1, 1], device=d) for im in imgs]
self.imgs = imgs
self.pred = pred
self.names = names
self.files = files
self.times = times
self.xyxy = pred
self.xywh = [xyxy2xywh(x) for x in pred]
self.xyxyn = [x / g for x, g in zip(self.xyxy, gn)]
self.xywhn = [x / g for x, g in zip(self.xywh, gn)]
self.n = len(self.pred)
self.t = tuple((times[i + 1] - times[i]) * 1000 / self.n for i in range(3))
self.s = shape
def display(self, pprint=False, show=False, save=False, crop=False, render=False, save_dir=Path('')):
crops = []
for i, (im, pred) in enumerate(zip(self.imgs, self.pred)):
s = f'image {i + 1}/{len(self.pred)}: {im.shape[0]}x{im.shape[1]} '
if pred.shape[0]:
for c in pred[:, -1].unique():
n = (pred[:, -1] == c).sum()
s += f"{n} {self.names[int(c)]}{'s' * (n > 1)}, "
if show or save or render or crop:
annotator = Annotator(im, example=str(self.names))
for *box, conf, cls in reversed(pred):
label = f'{self.names[int(cls)]} {conf:.2f}'
if crop:
file = save_dir / 'crops' / self.names[int(cls)] / self.files[i] if save else None
crops.append({'box': box, 'conf': conf, 'cls': cls, 'label': label,
'im': save_one_box(box, im, file=file, save=save)})
else:
annotator.box_label(box, label, color=colors(cls))
im = annotator.im
else:
s += '(no detections)'
im = Image.fromarray(im.astype(np.uint8)) if isinstance(im, np.ndarray) else im
if pprint:
LOGGER.info(s.rstrip(', '))
if show:
im.show(self.files[i])
if save:
f = self.files[i]
im.save(save_dir / f)
if i == self.n - 1:
LOGGER.info(f"Saved {self.n} image{'s' * (self.n > 1)} to {colorstr('bold', save_dir)}")
if render:
self.imgs[i] = np.asarray(im)
if crop:
if save:
LOGGER.info(f'Saved results to {save_dir}\n')
return crops
def print(self):
self.display(pprint=True)
LOGGER.info(f'Speed: %.1fms pre-process, %.1fms inference, %.1fms NMS per image at shape {tuple(self.s)}' %
self.t)
def show(self):
self.display(show=True)
def save(self, save_dir='runs/detect/exp'):
save_dir = increment_path(save_dir, exist_ok=save_dir != 'runs/detect/exp', mkdir=True)
self.display(save=True, save_dir=save_dir)
def crop(self, save=True, save_dir='runs/detect/exp'):
save_dir = increment_path(save_dir, exist_ok=save_dir != 'runs/detect/exp', mkdir=True) if save else None
return self.display(crop=True, save=save, save_dir=save_dir)
def render(self):
self.display(render=True)
return self.imgs
def pandas(self):
new = copy(self)
ca = 'xmin', 'ymin', 'xmax', 'ymax', 'confidence', 'class', 'name'
cb = 'xcenter', 'ycenter', 'width', 'height', 'confidence', 'class', 'name'
for k, c in zip(['xyxy', 'xyxyn', 'xywh', 'xywhn'], [ca, ca, cb, cb]):
a = [[x[:5] + [int(x[5]), self.names[int(x[5])]] for x in x.tolist()] for x in getattr(self, k)]
setattr(new, k, [pd.DataFrame(x, columns=c) for x in a])
return new
def tolist(self):
r = range(self.n)
x = [Detections([self.imgs[i]], [self.pred[i]], [self.files[i]], self.times, self.names, self.s) for i in r]
return x
def __len__(self):
return self.n
class Classify(nn.Module):
def __init__(self, c1, c2, k=1, s=1, p=None, g=1):
super().__init__()
self.aap = nn.AdaptiveAvgPool2d(1)
self.conv = nn.Conv2d(c1, c2, k, s, autopad(k, p), groups=g)
self.flat = nn.Flatten()
def forward(self, x):
z = torch.cat([self.aap(y) for y in (x if isinstance(x, list) else [x])], 1)
return self.flat(self.conv(z))