文章目录[隐藏]
鉴于之前这篇文章问问题的小伙伴比较多,参考了很多大佬的代码,重新发布yolov3进行目标检测的文章,本来也是看着很多大佬的代码学的,全部开源,包含所有python代码,也符合互联网的分享精神,就这点东西,有些人还付费才能看,真是服了,希望大家好好研究。至于yolov3的网络结构我是看了这篇文章和这篇文章组织代码的。
一、文件结构
解释:
- font: 字体,写结果到图片上。
- logs: 日志保存文件,如果没有该文件夹,会在训练的时候自动创建
- model_data:
- ××_classes.txt:你的类别文件,需要自己修改
- mask_anchor.txt:
- yolov3.h5:预训练权重。(很多小伙伴问,自己不是会训练出权重文件吗,为什么还需要这个权重文件呢?这是为了初始化参数更合理。如果你不用它来初始化,而是随机初始化参数,那么大概率训练效果你不会满意。搜索一下下载放到该文件夹下)
-
netImages:有关网络结构的图片,可以对着代码看,代码对应的结构我都在图片中标识出来了。
-
predict文件夹:有关预测的文件;predict.py,yolo.py
-
train文件夹:有关训练的文件;train.py,util.py
-
VOCdevkit文件夹:存放数据集,里面是
---VOC2021
-------------Annotations (存放.xml文件)
-------------ImageSets/Main(存放生成的txt文件)
-------------JPEGImages(存放.jpg文件) - weights文件夹:训练生成的权重文件,用于预测。如果没有该文件夹会自动生成
- yolov3文件夹:有关网络模型的代码
- voc_annotation.py:生成训练需要使用的2021_train.txt,2021_val.txt。
二、代码
从第5个开始涉及代码,下面一一列出
2.1 predict.py
from PIL import Image
from yolo import YOLO
from matplotlib import pyplot as plt
import os
yolo = YOLO()
def predict(detectMode='image'):
""" 预测\n
2种模式可选: 'image' 和 'video'
"""
if detectMode == "image":
while (True):
imgPath = input('Input obsolute image filename:')
try:
image = Image.open(imgPath)
except:
print('Open Image Error! Please Try Again')
continue
else:
out_image = yolo.detect_image(image)
plt.imshow(out_image)
plt.axis('off')
plt.show()
elif detectMode == "video":
video_path = 0 # 可根据需要修改
output_path = '../outVideo/result.mp4' # 可根据需要修改
if not os.path.exists(output_path):
os.makedirs(output_path)
yolo.detect_video(video_path,output_path)
if __name__ == '__main__':
predict(detectMode='image')
2.2 yolo.py
import colorsys
from timeit import default_timer as timer
import numpy as np
from keras import backend as K
from tensorflow.keras.models import load_model
from tensorflow.keras.layers import Input
from PIL import Image, ImageFont, ImageDraw
from yolov3.nets import yolo_body,tiny_yolo_body
from yolov3.decode import (yolo_eval,)
from yolov3.utils import letterbox_image
import os
import tensorflow as tf
tf.compat.v1.disable_eager_execution()
import cv2
class YOLO(object):
def __init__(self,**kwargs):
self.model_path = "../weights/ep002-loss5072.751-val_loss5155.355.h5" # 训练出的模型
self.anchors_path = "../model_data/mask_anchor.txt"
self.classes_path = "../model_data/hy_classes.txt"
self.score = 0.3
self.iou = 0.3
self.model_image_size = (416,416)
self.class_names = self._get_class()
self.anchors = self._get_anchors()
self.sess = K.get_session()
self.boxes, self.scores, self.classes = self.generate()
def _get_class(self):
classes_path = os.path.expanduser(self.classes_path)
with open(classes_path) as f:
class_names = f.readlines()
class_names = [c.strip() for c in class_names]
return class_names
def _get_anchors(self):
anchors_path = os.path.expanduser(self.anchors_path)
with open(anchors_path) as f:
anchors = f.readline()
anchors = [float(x) for x in anchors.split(",")]
return np.array(anchors).reshape(-1, 2)
def generate(self):
model_path = os.path.expanduser(self.model_path)
assert model_path.endswith(".h5"), "Keras model or weights must be a .h5 file."
# Load model, or construct model and load weights.
num_anchors = len(self.anchors)
num_classes = len(self.class_names)
is_tiny_version = num_anchors == 6 # default setting
try:
self.yolo_model = load_model(model_path, compile=False)
except:
self.yolo_model = (
tiny_yolo_body(
Input(shape=(None, None, 3)), num_anchors // 2, num_classes
)
if is_tiny_version
else yolo_body(
Input(shape=(None, None, 3)), num_anchors // 3, num_classes
)
)
self.yolo_model.load_weights(
self.model_path
) # make sure model, anchors and classes match
else:
assert self.yolo_model.layers[-1].output_shape[-1] == num_anchors / len(
self.yolo_model.output
) * (
num_classes + 5
), "Mismatch between model and given anchor and class sizes"
print("{} model, anchors, and classes loaded.".format(model_path))
# Generate colors for drawing bounding boxes.
hsv_tuples = [
(x / len(self.class_names), 1.0, 1.0) for x in range(len(self.class_names))
]
self.colors = list(map(lambda x: colorsys.hsv_to_rgb(*x), hsv_tuples))
self.colors = list(
map(
lambda x: (int(x[0] * 255), int(x[1] * 255), int(x[2] * 255)),
self.colors,
)
)
np.random.seed(10101) # Fixed seed for consistent colors across runs.
np.random.shuffle(
self.colors
) # Shuffle colors to decorrelate adjacent classes.
np.random.seed(None) # Reset seed to default.
# Generate output tensor targets for filtered bounding boxes.
self.input_image_shape = K.placeholder(shape=(2,))
boxes, scores, classes = yolo_eval(
self.yolo_model.output,
self.anchors,
len(self.class_names),
self.input_image_shape,
score_threshold=self.score,
iou_threshold=self.iou,
)
return boxes, scores, classes
def detect_image(self, image):
start = timer()
if self.model_image_size != (None, None):
assert self.model_image_size[0] % 32 == 0, "Multiples of 32 required"
assert self.model_image_size[1] % 32 == 0, "Multiples of 32 required"
boxed_image = letterbox_image(image, tuple(reversed(self.model_image_size)))
else:
new_image_size = (
image.width - (image.width % 32),
image.height - (image.height % 32),
)
boxed_image = letterbox_image(image, new_image_size)
image_data = np.array(boxed_image, dtype="float32")
print(image_data.shape)
image_data /= 255.0
image_data = np.expand_dims(image_data, 0) # Add batch dimension. 增加一个维度 [batch,width,height,channel]
out_boxes, out_scores, out_classes = self.sess.run(
[self.boxes, self.scores, self.classes],
feed_dict={
self.yolo_model.input: image_data,
self.input_image_shape: [image.size[1], image.size[0]],
K.learning_phase(): 0,
},
)
print("Found {} boxes for {}".format(len(out_boxes), "img"))
font = ImageFont.truetype(
font="../font/times.ttf",
size=np.floor(3e-2 * image.size[1] + 0.5).astype("int32"),
)
thickness = (image.size[0] + image.size[1]) // 300
for i, c in reversed(list(enumerate(out_classes))):
predicted_class = self.class_names[c]
box = out_boxes[i]
score = out_scores[i]
label = "{} {:.2f}".format(predicted_class, score)
draw = ImageDraw.Draw(image)
label_size = draw.textsize(label, font)
top, left, bottom, right = box
top = max(0, np.floor(top + 0.5).astype("int32"))
left = max(0, np.floor(left + 0.5).astype("int32"))
bottom = min(image.size[1], np.floor(bottom + 0.5).astype("int32"))
right = min(image.size[0], np.floor(right + 0.5).astype("int32"))
print(label, (left, top), (right, bottom))
if top - label_size[1] >= 0:
text_origin = np.array([left, top - label_size[1]])
else:
text_origin = np.array([left, top + 1])
# My kingdom for a good redistributable image drawing library.
for i in range(thickness):
draw.rectangle(
[left + i, top + i, right - i, bottom - i], outline=self.colors[c]
)
draw.rectangle(
[tuple(text_origin), tuple(text_origin + label_size)],
fill=self.colors[c],
)
draw.text(text_origin, label, fill=(0, 0, 0), font=font)
del draw
end = timer()
print(end - start)
return image
def detect_video(self,video_path, output_path="result.mp4"):
vid = cv2.VideoCapture(video_path)
if not vid.isOpened():
raise IOError("Couldn't open camera or video, check it again")
video_FourCC = int(vid.get(cv2.CAP_PROP_FOURCC))
video_fps = vid.get(cv2.CAP_PROP_FPS)
video_size = (
int(vid.get(cv2.CAP_PROP_FRAME_WIDTH)),
int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT)),
)
isOutput = True if output_path != "" else False
if isOutput:
print(
"!!! TYPE:",
type(output_path),
type(video_FourCC),
type(video_fps),
type(video_size),
)
out = cv2.VideoWriter(output_path, video_FourCC, video_fps, video_size)
accum_time = 0
curr_fps = 0
fps = "FPS: ??"
prev_time = timer()
while True:
return_value, frame = vid.read()
image = Image.fromarray(frame)
image = self.detect_image(image)
result = np.asarray(image)
curr_time = timer()
exec_time = curr_time - prev_time
prev_time = curr_time
accum_time = accum_time + exec_time
curr_fps = curr_fps + 1
if accum_time > 1:
accum_time = accum_time - 1
fps = "FPS: " + str(curr_fps)
curr_fps = 0
cv2.putText(
result,
text=fps,
org=(3, 15),
fontFace=cv2.FONT_HERSHEY_SIMPLEX,
fontScale=0.50,
color=(255, 0, 0),
thickness=2,
)
cv2.namedWindow("result", cv2.WINDOW_NORMAL)
cv2.imshow("result", result)
if isOutput:
out.write(result)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
self.close_session()
else:
print("output_path is empty, check it again.")
return
def close_session(self):
self.sess.close()
2.3 train.py
import os
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import (TensorBoard,ModelCheckpoint,ReduceLROnPlateau,
EarlyStopping,)
import tensorflow as tf
from util import *
gpus = tf.config.experimental.list_physical_devices(device_type='GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
train_annotation_path = "../2021_train.txt"
val_annotation_path = "../2021_val.txt"
anchors_path = "../model_data/mask_anchor.txt"
classes_path = "../model_data/hy_classes.txt" # 修改
log_dir = "../logs/" # 日志
if not os.path.exists(log_dir):
os.makedirs(log_dir)
loss_history = LossHistory(log_dir)
weights_dir = "../weights/" # 训练结果
if not os.path.exists(weights_dir):
os.makedirs(weights_dir)
input_shape = (416, 416) # 必须是32的 倍数 yolo 的设定
preTrainedWeightsPath="../model_data/yolov3.h5", # 预训练权重
Freeze_Train = True
def train():
class_names = get_classes(classes_path)
num_classes = len(class_names)
anchors = get_anchors(anchors_path)
model = create_model(
input_shape=input_shape,
anchors=anchors,
num_classes=num_classes,
weights_path=preTrainedWeightsPath,
)
print('len_layers: ', len(model.layers))
logging = TensorBoard(log_dir=log_dir)
checkpoint = ModelCheckpoint(
weights_dir + "ep{epoch:03d}-loss{loss:.3f}-val_loss{val_loss:.3f}.h5",
monitor="val_loss",
save_weights_only=True,
save_best_only=False,
priod=1,
)
reduce_lr = ReduceLROnPlateau(monitor="val_loss", factor=0.1, patience=3, verbose=1)
early_stopping = EarlyStopping(
monitor="val_loss", min_delta=0, patience=10, verbose=1
)
""" 读取数据集的txt文件 """
with open(train_annotation_path) as f:
train_lines = f.readlines()
with open(val_annotation_path) as f:
val_lines = f.readlines()
num_train = len(train_lines)
num_val = len(val_lines)
if Freeze_Train:
batch_size = 4
model.compile(
optimizer=Adam(1e-3),
loss={# use custom yolo_loss Lambda layer.
"yolo_loss": lambda y_true, save_freq: save_freq}
)
print("Train on {} samples, val on {} samples, with batch size {}.".format(num_train, num_val, batch_size))
model.fit(
data_generator_wrapper(train_lines,batch_size,input_shape,anchors,num_classes),
steps_per_epoch=max(1,num_train//batch_size),
validation_data=data_generator_wrapper(val_lines,batch_size,input_shape,anchors,num_classes),
validation_steps=max(1,num_val//batch_size),
epochs=50,
initial_epoch=0,
callbacks=[logging,checkpoint,loss_history]
)
model.save_weights(weights_dir+'trained_weights_stage_1.h5')
for i in range(len(model.layers)): model.layers[i].trainable = True
model.compile(optimizer=Adam(lr=1e-4),
loss={'yolo_loss': lambda y_true, save_freq: save_freq}) # recompile to apply the change
print('Unfreeze all of the layers.')
batch_size = 4 # note that more GPU memory is required after unfreezing the body
print('Train on {} samples, val on {} samples, with batch size {}.'.format(num_train, num_val, batch_size))
model.fit_generator(data_generator_wrapper(train_lines, batch_size, input_shape, anchors, num_classes),
steps_per_epoch=max(1, num_train // batch_size),
validation_data=data_generator_wrapper(val_lines, batch_size, input_shape, anchors,
num_classes),
validation_steps=max(1, num_val // batch_size),
epochs=100,
initial_epoch=50,
callbacks=[logging, checkpoint, reduce_lr, early_stopping,loss_history])
model.save_weights(log_dir + 'trained_weights_final.h5')
if __name__ == '__main__':
train()
2.4 util.py
from yolov3.decode import preprocess_true_boxes,yolo_loss
from yolov3.nets import yolo_body, tiny_yolo_body
from yolov3.utils import get_random_data
import numpy as np
import tensorflow.keras.backend as K
from tensorflow.keras.layers import Input, Lambda
from tensorflow.keras.models import Model
import datetime
from tensorflow import keras
import os
def get_classes(classes_path):
'''loads the classes'''
with open(classes_path) as f:
class_names = f.readlines()
class_names = [c.strip() for c in class_names]
return class_names
def get_anchors(anchors_path):
'''loads the anchors from a file'''
with open(anchors_path) as f:
anchors = f.readline()
anchors = [float(x) for x in anchors.split(',')]
return np.array(anchors).reshape(-1, 2)
def create_model(input_shape, anchors, num_classes, load_pretrained=True, freeze_body=2,
weights_path='../model_data/yolov3.h5'):
'''create the training model'''
K.clear_session() # get a new session
image_input = Input(shape=(None, None, 3))
h, w = input_shape
num_anchors = len(anchors)
y_true = [Input(shape=(h//{0:32, 1:16, 2:8}[l], w//{0:32, 1:16, 2:8}[l],
num_anchors//3, num_classes+5)) for l in range(3)]
model_body = yolo_body(image_input, num_anchors//3, num_classes)
print('Create YOLOv3 model with {} anchors and {} classes.'.format(num_anchors, num_classes))
if load_pretrained:
weights_path = weights_path[0]
model_body.load_weights(weights_path, by_name=True, skip_mismatch=True)
print('Load weights {}.'.format(weights_path))
if freeze_body in [1, 2]:
# Freeze darknet53 body or freeze all but 3 output layers.
num = (185, len(model_body.layers)-3)[freeze_body-1]
for i in range(num): model_body.layers[i].trainable = False
print('Freeze the first {} layers of total {} layers.'.format(num, len(model_body.layers)))
model_loss = Lambda(yolo_loss, output_shape=(1,), name='yolo_loss',
arguments={'anchors': anchors, 'num_classes': num_classes, 'ignore_thresh': 0.5})(
[*model_body.output, *y_true])
model = Model([model_body.input, *y_true], model_loss)
return model
def create_tiny_model(input_shape, anchors, num_classes, load_pretrained=True, freeze_body=2,
weights_path='../model_data/yolov3-tiny.h5'):
'''create the training model, for Tiny YOLOv3'''
K.clear_session() # get a new session
image_input = Input(shape=(None, None, 3))
h, w = input_shape
num_anchors = len(anchors)
y_true = [Input(shape=(h//{0:32, 1:16}[l], w//{0:32, 1:16}[l],
num_anchors//2, num_classes+5)) for l in range(2)]
model_body = tiny_yolo_body(image_input, num_anchors//2, num_classes)
print('Create Tiny YOLOv3 model with {} anchors and {} classes.'.format(num_anchors, num_classes))
if load_pretrained:
model_body.load_weights(weights_path, by_name=True, skip_mismatch=True)
print('Load weights {}.'.format(weights_path))
if freeze_body in [1, 2]:
# Freeze the darknet body or freeze all but 2 output layers.
num = (20, len(model_body.layers)-2)[freeze_body-1]
for i in range(num): model_body.layers[i].trainable = False
print('Freeze the first {} layers of total {} layers.'.format(num, len(model_body.layers)))
model_loss = Lambda(yolo_loss, output_shape=(1,), name='yolo_loss',
arguments={'anchors': anchors, 'num_classes': num_classes, 'ignore_thresh': 0.7})(
[*model_body.output, *y_true])
model = Model([model_body.input, *y_true], model_loss)
return model
def data_generator(annotation_lines, batch_size, input_shape, anchors, num_classes):
'''data generator for fit_generator'''
n = len(annotation_lines)
i = 0
while True:
image_data = []
box_data = []
for b in range(batch_size):
if i==0:
np.random.shuffle(annotation_lines)
image, box = get_random_data(annotation_lines[i], input_shape, random=True)
image_data.append(image)
box_data.append(box)
i = (i+1) % n
image_data = np.array(image_data)
box_data = np.array(box_data)
y_true = preprocess_true_boxes(box_data, input_shape, anchors, num_classes)
yield [image_data, *y_true], np.zeros(batch_size)
def data_generator_wrapper(annotation_lines, batch_size, input_shape, anchors, num_classes):
n = len(annotation_lines)
if n==0 or batch_size<=0: return None
return data_generator(annotation_lines, batch_size, input_shape, anchors, num_classes)
class LossHistory(keras.callbacks.Callback):
def __init__(self, log_dir):
super(LossHistory, self).__init__()
curr_time = datetime.datetime.now()
time_str = datetime.datetime.strftime(curr_time, '%Y_%m_%d_%H_%M_%S')
self.log_dir = log_dir
self.time_str = time_str
self.save_path = os.path.join(self.log_dir, "loss_" + str(self.time_str))
self.losses = []
self.val_loss = []
if not os.path.exists(self.save_path):
os.makedirs(self.save_path)
2.5 decode.py
from tensorflow.keras import backend as K
import tensorflow as tf
import numpy as np
def yolo_head(feats, anchors, num_classes, input_shape, calc_loss=False):
"""Convert final layer features to bounding box parameters."""
num_anchors = len(anchors)
# Reshape to batch, height, width, num_anchors, box_params.
anchors_tensor = K.reshape(K.constant(anchors), [1, 1, 1, num_anchors, 2])
grid_shape = K.shape(feats)[1:3] # height, width
grid_y = K.tile(K.reshape(K.arange(0, stop=grid_shape[0]), [-1, 1, 1, 1]),
[1, grid_shape[1], 1, 1])
grid_x = K.tile(K.reshape(K.arange(0, stop=grid_shape[1]), [1, -1, 1, 1]),
[grid_shape[0], 1, 1, 1])
grid = K.concatenate([grid_x, grid_y])
grid = K.cast(grid, K.dtype(feats))
feats = K.reshape(
feats, [-1, grid_shape[0], grid_shape[1], num_anchors, num_classes + 5])
# Adjust preditions to each spatial grid point and anchor size.
box_xy = (K.sigmoid(feats[..., :2]) + grid) / K.cast(grid_shape[...,::-1], K.dtype(feats))
box_wh = K.exp(feats[..., 2:4]) * anchors_tensor / K.cast(input_shape[...,::-1], K.dtype(feats))
box_confidence = K.sigmoid(feats[..., 4:5])
box_class_probs = K.sigmoid(feats[..., 5:])
if calc_loss == True:
return grid, feats, box_xy, box_wh
return box_xy, box_wh, box_confidence, box_class_probs
def yolo_correct_boxes(box_xy, box_wh, input_shape, image_shape):
'''Get corrected boxes'''
box_yx = box_xy[..., ::-1]
box_hw = box_wh[..., ::-1]
input_shape = K.cast(input_shape, K.dtype(box_yx))
image_shape = K.cast(image_shape, K.dtype(box_yx))
new_shape = K.round(image_shape * K.min(input_shape/image_shape))
offset = (input_shape-new_shape)/2./input_shape
scale = input_shape/new_shape
box_yx = (box_yx - offset) * scale
box_hw *= scale
box_mins = box_yx - (box_hw / 2.)
box_maxes = box_yx + (box_hw / 2.)
boxes = K.concatenate([
box_mins[..., 0:1], # y_min
box_mins[..., 1:2], # x_min
box_maxes[..., 0:1], # y_max
box_maxes[..., 1:2] # x_max
])
# Scale boxes back to original image shape.
boxes *= K.concatenate([image_shape, image_shape])
return boxes
def yolo_boxes_and_scores(feats, anchors, num_classes, input_shape, image_shape):
'''Process Conv layer output'''
box_xy, box_wh, box_confidence, box_class_probs = yolo_head(feats,
anchors, num_classes, input_shape)
boxes = yolo_correct_boxes(box_xy, box_wh, input_shape, image_shape)
boxes = K.reshape(boxes, [-1, 4])
box_scores = box_confidence * box_class_probs
box_scores = K.reshape(box_scores, [-1, num_classes])
return boxes, box_scores
def yolo_eval(yolo_outputs,
anchors,
num_classes,
image_shape,
max_boxes=20,
score_threshold=.6,
iou_threshold=.5):
"""Evaluate YOLO model on given input and return filtered boxes."""
num_layers = len(yolo_outputs)
anchor_mask = [[6,7,8], [3,4,5], [0,1,2]] if num_layers==3 else [[3,4,5], [1,2,3]] # default setting
input_shape = K.shape(yolo_outputs[0])[1:3] * 32
boxes = []
box_scores = []
for l in range(num_layers):
_boxes, _box_scores = yolo_boxes_and_scores(yolo_outputs[l],
anchors[anchor_mask[l]], num_classes, input_shape, image_shape)
boxes.append(_boxes)
box_scores.append(_box_scores)
boxes = K.concatenate(boxes, axis=0)
box_scores = K.concatenate(box_scores, axis=0)
mask = box_scores >= score_threshold
max_boxes_tensor = K.constant(max_boxes, dtype='int32')
boxes_ = []
scores_ = []
classes_ = []
for c in range(num_classes):
# TODO: use keras backend instead of tf.
class_boxes = tf.boolean_mask(boxes, mask[:, c])
class_box_scores = tf.boolean_mask(box_scores[:, c], mask[:, c])
nms_index = tf.image.non_max_suppression(
class_boxes, class_box_scores, max_boxes_tensor, iou_threshold=iou_threshold)
class_boxes = K.gather(class_boxes, nms_index)
class_box_scores = K.gather(class_box_scores, nms_index)
classes = K.ones_like(class_box_scores, 'int32') * c
boxes_.append(class_boxes)
scores_.append(class_box_scores)
classes_.append(classes)
boxes_ = K.concatenate(boxes_, axis=0)
scores_ = K.concatenate(scores_, axis=0)
classes_ = K.concatenate(classes_, axis=0)
return boxes_, scores_, classes_
def preprocess_true_boxes(true_boxes, input_shape, anchors, num_classes):
'''Preprocess true boxes to training input format
Parameters
----------
true_boxes: array, shape=(m, T, 5)
Absolute x_min, y_min, x_max, y_max, class_id relative to input_shape.
input_shape: array-like, hw, multiples of 32
anchors: array, shape=(N, 2), wh
num_classes: integer
Returns
-------
y_true: list of array, shape like yolo_outputs, xywh are reletive value
'''
assert (true_boxes[..., 4]<num_classes).all(), 'class id must be less than num_classes'
num_layers = len(anchors)//3 # default setting
anchor_mask = [[6,7,8], [3,4,5], [0,1,2]] if num_layers==3 else [[3,4,5], [1,2,3]]
true_boxes = np.array(true_boxes, dtype='float32')
input_shape = np.array(input_shape, dtype='int32')
boxes_xy = (true_boxes[..., 0:2] + true_boxes[..., 2:4]) // 2
boxes_wh = true_boxes[..., 2:4] - true_boxes[..., 0:2]
true_boxes[..., 0:2] = boxes_xy/input_shape[::-1]
true_boxes[..., 2:4] = boxes_wh/input_shape[::-1]
m = true_boxes.shape[0]
grid_shapes = [input_shape//{0:32, 1:16, 2:8}[l] for l in range(num_layers)]
y_true = [np.zeros((m,grid_shapes[l][0],grid_shapes[l][1],len(anchor_mask[l]),5+num_classes),
dtype='float32') for l in range(num_layers)]
# Expand dim to apply broadcasting.
anchors = np.expand_dims(anchors, 0)
anchor_maxes = anchors / 2.
anchor_mins = -anchor_maxes
valid_mask = boxes_wh[..., 0]>0
for b in range(m):
# Discard zero rows.
wh = boxes_wh[b, valid_mask[b]]
if len(wh)==0: continue
# Expand dim to apply broadcasting.
wh = np.expand_dims(wh, -2)
box_maxes = wh / 2.
box_mins = -box_maxes
intersect_mins = np.maximum(box_mins, anchor_mins)
intersect_maxes = np.minimum(box_maxes, anchor_maxes)
intersect_wh = np.maximum(intersect_maxes - intersect_mins, 0.)
intersect_area = intersect_wh[..., 0] * intersect_wh[..., 1]
box_area = wh[..., 0] * wh[..., 1]
anchor_area = anchors[..., 0] * anchors[..., 1]
iou = intersect_area / (box_area + anchor_area - intersect_area)
# Find best anchor for each true box
best_anchor = np.argmax(iou, axis=-1)
for t, n in enumerate(best_anchor):
for l in range(num_layers):
if n in anchor_mask[l]:
i = np.floor(true_boxes[b,t,0]*grid_shapes[l][1]).astype('int32')
j = np.floor(true_boxes[b,t,1]*grid_shapes[l][0]).astype('int32')
k = anchor_mask[l].index(n)
c = true_boxes[b,t, 4].astype('int32')
y_true[l][b, j, i, k, 0:4] = true_boxes[b,t, 0:4]
y_true[l][b, j, i, k, 4] = 1
y_true[l][b, j, i, k, 5+c] = 1
#print(y_true.shape)
return y_true
def box_iou(b1, b2):
'''Return iou tensor
Parameters
----------
b1: tensor, shape=(i1,...,iN, 4), xywh
b2: tensor, shape=(j, 4), xywh
Returns
-------
iou: tensor, shape=(i1,...,iN, j)
'''
# Expand dim to apply broadcasting.
b1 = K.expand_dims(b1, -2)
b1_xy = b1[..., :2]
b1_wh = b1[..., 2:4]
b1_wh_half = b1_wh/2.
b1_mins = b1_xy - b1_wh_half
b1_maxes = b1_xy + b1_wh_half
# Expand dim to apply broadcasting.
b2 = K.expand_dims(b2, 0)
b2_xy = b2[..., :2]
b2_wh = b2[..., 2:4]
b2_wh_half = b2_wh/2.
b2_mins = b2_xy - b2_wh_half
b2_maxes = b2_xy + b2_wh_half
intersect_mins = K.maximum(b1_mins, b2_mins)
intersect_maxes = K.minimum(b1_maxes, b2_maxes)
intersect_wh = K.maximum(intersect_maxes - intersect_mins, 0.)
intersect_area = intersect_wh[..., 0] * intersect_wh[..., 1]
b1_area = b1_wh[..., 0] * b1_wh[..., 1]
b2_area = b2_wh[..., 0] * b2_wh[..., 1]
iou = intersect_area / (b1_area + b2_area - intersect_area)
return iou
def yolo_loss(args, anchors, num_classes, ignore_thresh=.5, print_loss=False):
'''Return yolo_loss tensor
Parameters
----------
yolo_outputs: list of tensor, the output of yolo_body or tiny_yolo_body
y_true: list of array, the output of preprocess_true_boxes
anchors: array, shape=(N, 2), wh
num_classes: integer
ignore_thresh: float, the iou threshold whether to ignore object confidence loss
Returns
-------
loss: tensor, shape=(1,)
'''
num_layers = len(anchors)//3 # default setting
yolo_outputs = args[:num_layers]
y_true = args[num_layers:]
anchor_mask = [[6,7,8], [3,4,5], [0,1,2]] if num_layers==3 else [[3,4,5], [1,2,3]]
input_shape = K.cast(K.shape(yolo_outputs[0])[1:3] * 32, K.dtype(y_true[0]))
grid_shapes = [K.cast(K.shape(yolo_outputs[l])[1:3], K.dtype(y_true[0])) for l in range(num_layers)]
loss = 0
m = K.shape(yolo_outputs[0])[0] # m为 batch size, tensor
mf = K.cast(m, K.dtype(yolo_outputs[0]))
for l in range(num_layers):
object_mask = y_true[l][..., 4:5]
true_class_probs = y_true[l][..., 5:]
grid, raw_pred, pred_xy, pred_wh = yolo_head(yolo_outputs[l],
anchors[anchor_mask[l]], num_classes, input_shape, calc_loss=True)
pred_box = K.concatenate([pred_xy, pred_wh])
# Darknet raw box to calculate loss.
raw_true_xy = y_true[l][..., :2]*grid_shapes[l][::-1] - grid
raw_true_wh = K.log(y_true[l][..., 2:4] / anchors[anchor_mask[l]] * input_shape[::-1])
raw_true_wh = K.switch(object_mask, raw_true_wh, K.zeros_like(raw_true_wh)) # avoid log(0)=-inf
box_loss_scale = 2 - y_true[l][...,2:3]*y_true[l][...,3:4]
# Find ignore mask, iterate over each of batch.
ignore_mask = tf.TensorArray(K.dtype(y_true[0]), size=1, dynamic_size=True)
object_mask_bool = K.cast(object_mask, 'bool')
def loop_body(b, ignore_mask):
true_box = tf.boolean_mask(y_true[l][b,...,0:4], object_mask_bool[b,...,0])
iou = box_iou(pred_box[b], true_box)
best_iou = K.max(iou, axis=-1)
ignore_mask = ignore_mask.write(b, K.cast(best_iou<ignore_thresh, K.dtype(true_box)))
return b+1, ignore_mask
_, ignore_mask = tf.while_loop(lambda b,*args: b<m, loop_body, [0, ignore_mask])
ignore_mask = ignore_mask.stack()
ignore_mask = K.expand_dims(ignore_mask, -1)
# K.binary_crossentropy is helpful to avoid exp overflow.
xy_loss = object_mask * box_loss_scale * K.binary_crossentropy(raw_true_xy, raw_pred[...,0:2], from_logits=True)
wh_loss = object_mask * box_loss_scale * 0.5 * K.square(raw_true_wh-raw_pred[...,2:4])
confidence_loss = object_mask * K.binary_crossentropy(object_mask, raw_pred[...,4:5], from_logits=True)+ \
(1-object_mask) * K.binary_crossentropy(object_mask, raw_pred[...,4:5], from_logits=True) * ignore_mask
class_loss = object_mask * K.binary_crossentropy(true_class_probs, raw_pred[...,5:], from_logits=True)
xy_loss = K.sum(xy_loss) / mf
wh_loss = K.sum(wh_loss) / mf
confidence_loss = K.sum(confidence_loss) / mf
class_loss = K.sum(class_loss) / mf
loss += xy_loss + wh_loss + confidence_loss + class_loss
if print_loss:
loss = tf.print(loss, [loss, xy_loss, wh_loss, confidence_loss, class_loss, K.sum(ignore_mask)], message='loss: ')
return loss
2.6 nets.py
from tensorflow.keras.regularizers import l2
from tensorflow.keras.layers import (Conv2D,BatchNormalization,LeakyReLU,ZeroPadding2D,
Add,UpSampling2D,Concatenate,MaxPooling2D)
from yolov3.utils import compose
from tensorflow.keras.models import Model
def DarknetConv2D(*args,**kwargs):
darknet_conv_kwargs = {'kernel_regularizer': l2(5e-4)}
darknet_conv_kwargs['padding'] = 'valid' if kwargs.get('strides')==(2,2) else 'same'
darknet_conv_kwargs.update(kwargs)
return Conv2D(*args,**darknet_conv_kwargs)
def DarknetConv2D_BN_Leaky(*args,**kwargs):
no_bias_kwargs = {'use_bias':False}
no_bias_kwargs.update(kwargs)
return compose(DarknetConv2D(*args,**no_bias_kwargs),
BatchNormalization(),LeakyReLU(alpha=0.1))
def resblock_body(x,num_filters,num_blocks):
x = ZeroPadding2D(((1,0),(1,0)))(x)
x = DarknetConv2D_BN_Leaky(num_filters,(3,3),strides=(2,2))(x)
for i in range(num_blocks):
y = compose(DarknetConv2D_BN_Leaky(num_filters//2,(1,1)),
DarknetConv2D_BN_Leaky(num_filters,(3,3)))(x)
x = Add()([x,y])
return x
def darknet_body(x):
x = DarknetConv2D_BN_Leaky(32,(3,3))(x)
x = resblock_body(x,64,1)
x = resblock_body(x,128,2)
x2 = resblock_body(x,256,8)
x1 = resblock_body(x2,512,8)
x = resblock_body(x1,1024,4)
return x,x1,x2
def make_last_layers(x,num_filters,out_filters):
x = compose(DarknetConv2D_BN_Leaky(num_filters,(1,1)), # 512个filters
DarknetConv2D_BN_Leaky(num_filters*2,(3,3)),# 1024个filters
DarknetConv2D_BN_Leaky(num_filters,(1,1)), # 512个filters
DarknetConv2D_BN_Leaky(num_filters*2,(3,3)), # 1024个filters
DarknetConv2D_BN_Leaky(num_filters,(1,1)))(x) # 512个filters
output1 = compose(DarknetConv2D_BN_Leaky(num_filters*2,(3,3)),
DarknetConv2D_BN_Leaky(out_filters,(1,1)))(x)
return x,output1
def yolo_body(inputs,num_anchors,num_classes):
x,x1,x2 = darknet_body(inputs)
darknet = Model(inputs,x)
darknetx1 = Model(inputs, x1)
darknetx2 = Model(inputs, x2)
x,y1 = make_last_layers(darknet.output,512,num_anchors*(num_classes+5))
x = compose(DarknetConv2D_BN_Leaky(256,(1,1)),
UpSampling2D(2))(x)
x = Concatenate()([x,darknetx1.output])
x,y2 = make_last_layers(x,256,num_anchors*(num_classes+5))
x = compose(DarknetConv2D_BN_Leaky(128,(1,1)),
UpSampling2D(2))(x)
x = Concatenate()([x, darknetx2.output])
x,y3 = make_last_layers(x,128,num_anchors*(num_classes+5))
return Model(inputs,[y1,y2,y3])
def tiny_yolo_body(inputs, num_anchors, num_classes):
'''Create Tiny YOLO_v3 model CNN body in keras.'''
x1 = compose(
DarknetConv2D_BN_Leaky(16, (3,3)),
MaxPooling2D(pool_size=(2,2), strides=(2,2), padding='same'),
DarknetConv2D_BN_Leaky(32, (3,3)),
MaxPooling2D(pool_size=(2,2), strides=(2,2), padding='same'),
DarknetConv2D_BN_Leaky(64, (3,3)),
MaxPooling2D(pool_size=(2,2), strides=(2,2), padding='same'),
DarknetConv2D_BN_Leaky(128, (3,3)),
MaxPooling2D(pool_size=(2,2), strides=(2,2), padding='same'),
DarknetConv2D_BN_Leaky(256, (3,3)))(inputs)
x2 = compose(
MaxPooling2D(pool_size=(2,2), strides=(2,2), padding='same'),
DarknetConv2D_BN_Leaky(512, (3,3)),
MaxPooling2D(pool_size=(2,2), strides=(1,1), padding='same'),
DarknetConv2D_BN_Leaky(1024, (3,3)),
DarknetConv2D_BN_Leaky(256, (1,1)))(x1)
y1 = compose(
DarknetConv2D_BN_Leaky(512, (3,3)),
DarknetConv2D(num_anchors*(num_classes+5), (1,1)))(x2)
x2 = compose(
DarknetConv2D_BN_Leaky(128, (1,1)),
UpSampling2D(2))(x2)
y2 = compose(
Concatenate(),
DarknetConv2D_BN_Leaky(256, (3,3)),
DarknetConv2D(num_anchors*(num_classes+5), (1,1)))([x2,x1])
return Model(inputs, [y1,y2])
2.7 utils.py
from functools import reduce
from PIL import Image
import numpy as np
from matplotlib.colors import rgb_to_hsv, hsv_to_rgb
def compose(*funcs):
if funcs:
return reduce(lambda f,g: lambda *a, **kw: g(f(*a,**kw)),funcs)
else:
raise ValueError("Composition of empty sequence not supported.")
def letterbox_image(image, size):
'''resize image with unchanged aspect ratio using padding'''
iw, ih = image.size
w, h = size
scale = min(w/iw, h/ih)
nw = int(iw*scale)
nh = int(ih*scale)
image = image.resize((nw,nh), Image.BICUBIC)
new_image = Image.new('RGB', size, (128,128,128))
new_image.paste(image, ((w-nw)//2, (h-nh)//2))
return new_image
def get_random_data(annotation_line, input_shape, random=True, max_boxes=20, jitter=.3, hue=.1, sat=1.5, val=1.5, proc_img=True):
'''random preprocessing for real-time data augmentation'''
line = annotation_line.split()
image = Image.open(line[0])
iw, ih = image.size
h, w = input_shape
box = np.array([np.array(list(map(int,box.split(',')))) for box in line[1:]])
if not random:
# resize image
scale = min(w/iw, h/ih)
nw = int(iw*scale)
nh = int(ih*scale)
dx = (w-nw)//2
dy = (h-nh)//2
image_data=0
if proc_img:
image = image.resize((nw,nh), Image.BICUBIC)
new_image = Image.new('RGB', (w,h), (128,128,128))
new_image.paste(image, (dx, dy))
image_data = np.array(new_image)/255.
# correct boxes
box_data = np.zeros((max_boxes,5))
if len(box)>0:
np.random.shuffle(box)
if len(box)>max_boxes: box = box[:max_boxes]
box[:, [0,2]] = box[:, [0,2]]*scale + dx
box[:, [1,3]] = box[:, [1,3]]*scale + dy
box_data[:len(box)] = box
return image_data, box_data
# resize image
new_ar = w/h * rand(1-jitter,1+jitter)/rand(1-jitter,1+jitter)
scale = rand(.25, 2)
if new_ar < 1:
nh = int(scale*h)
nw = int(nh*new_ar)
else:
nw = int(scale*w)
nh = int(nw/new_ar)
image = image.resize((nw,nh), Image.BICUBIC)
# place image
dx = int(rand(0, w-nw))
dy = int(rand(0, h-nh))
new_image = Image.new('RGB', (w,h), (128,128,128))
new_image.paste(image, (dx, dy))
image = new_image
# flip image or not
flip = rand()<.5
if flip: image = image.transpose(Image.FLIP_LEFT_RIGHT)
# distort image
hue = rand(-hue, hue)
sat = rand(1, sat) if rand()<.5 else 1/rand(1, sat)
val = rand(1, val) if rand()<.5 else 1/rand(1, val)
x = rgb_to_hsv(np.array(image)/255.)
x[..., 0] += hue
x[..., 0][x[..., 0]>1] -= 1
x[..., 0][x[..., 0]<0] += 1
x[..., 1] *= sat
x[..., 2] *= val
x[x>1] = 1
x[x<0] = 0
image_data = hsv_to_rgb(x) # numpy array, 0 to 1
# correct boxes
box_data = np.zeros((max_boxes,5))
if len(box)>0:
np.random.shuffle(box)
box[:, [0,2]] = box[:, [0,2]]*nw/iw + dx
box[:, [1,3]] = box[:, [1,3]]*nh/ih + dy
if flip: box[:, [0,2]] = w - box[:, [2,0]]
box[:, 0:2][box[:, 0:2]<0] = 0
box[:, 2][box[:, 2]>w] = w
box[:, 3][box[:, 3]>h] = h
box_w = box[:, 2] - box[:, 0]
box_h = box[:, 3] - box[:, 1]
box = box[np.logical_and(box_w>1, box_h>1)] # discard invalid box
if len(box)>max_boxes: box = box[:max_boxes]
box_data[:len(box)] = box
return image_data, box_data
def rand(a=0.0, b=1.0):
return np.random.rand()*(b-a) + a
2.8 voc_annotation.py
"""
todo: 将数据集转成yolo可以训练的格式
"""
import os
import random
import xml.etree.ElementTree as ET
from train.util import get_classes
from os import getcwd
""" 训练集、验证集划分 """
train_percent=0.9
trainval_percent=0.9
VOCdevkit_path = 'VOCdevkit'
classes_path='model_data/hy_classes.txt'
VOCdevkit_sets = [('2021', 'train'), ('2021', 'val')]
classes=get_classes(classes_path)
print("your classes: ",classes)
def convert_annotation(year, image_id, list_file):
in_file = open(VOCdevkit_path+'/VOC%s/Annotations/%s.xml'%(year, image_id))
tree=ET.parse(in_file)
root = tree.getroot()
for obj in root.iter('object'):
difficult = obj.find('difficult').text
cls = obj.find('name').text
if cls not in classes or int(difficult)==1:
continue
cls_id = classes.index(cls)
xmlbox = obj.find('bndbox')
b = (int(xmlbox.find('xmin').text), int(xmlbox.find('ymin').text), int(xmlbox.find('xmax').text), int(xmlbox.find('ymax').text))
list_file.write(" " + ",".join([str(a) for a in b]) + ',' + str(cls_id))
def generate_index():
random.seed(0)
print("Generate txt in ImageSets.")
xmlfilepath = os.path.join(VOCdevkit_path, 'VOC2021/Annotations')
saveBasePath = os.path.join(VOCdevkit_path, 'VOC2021/ImageSets/Main')
temp_xml = os.listdir(xmlfilepath)
total_xml = []
for xml in temp_xml:
if xml.endswith(".xml"):
total_xml.append(xml)
num = len(total_xml)
list = range(num)
tv = int(num * trainval_percent)
tr = int(tv * train_percent)
trainval = random.sample(list, tv)
train = random.sample(trainval, tr)
print("train and val size", tv)
print("train size", tr)
ftrainval = open(os.path.join(saveBasePath, 'trainval.txt'), 'w')
ftest = open(os.path.join(saveBasePath, 'test.txt'), 'w')
ftrain = open(os.path.join(saveBasePath, 'train.txt'), 'w')
fval = open(os.path.join(saveBasePath, 'val.txt'), 'w')
for i in list:
name = total_xml[i][:-4] + '\n'
if i in trainval:
ftrainval.write(name)
if i in train:
ftrain.write(name)
else:
fval.write(name)
else:
ftest.write(name)
ftrainval.close()
ftrain.close()
fval.close()
ftest.close()
print("Generate txt in ImageSets done.")
if __name__ == '__main__':
wd = getcwd()
generate_index()
for year, image_set in VOCdevkit_sets:
image_ids = open(VOCdevkit_path + '/VOC%s/ImageSets/Main/%s.txt' % (year, image_set)).read().strip().split()
with open('%s_%s.txt' % (year, image_set), 'w') as list_file:
for image_id in image_ids:
list_file.write('%s/%s/VOC%s/JPEGImages/%s.jpg' % (wd, VOCdevkit_path, year, image_id))
convert_annotation(year, image_id, list_file)
list_file.write('\n')
总共也就8个文件hhh。
三、训练步骤
- 标注。准备数据,对应放到第二步中的文件夹中
- 在model_data中新建你自己的类别文件例如test_classes.txt
- 运行voc_annotation.py,会生成2021_train.txt 和 2021_val.txt
- 在 train.py 中修改classes_path为你自己的test_classes.txt的路径
- 运行train.py就可以训练了
四、预测步骤
- 将 yolo.py 中的 self.model_path 修改为你训练之后保存在weights文件夹下的模型。
将 yolo.py 中的 self.classes_path 修改为你自己的类别路径。 - 运行 predict.py 即可开始预测。(默认是预测图片,输入图片路径就可以了)
终于写完了,如果对您有帮助,就点个赞吧,谢谢!
版权声明:本文为CSDN博主「吴天德少侠」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/sdhdsf132452/article/details/121051620
暂无评论