文章目录[隐藏]
- 3d object detection的一般的pipeline
- Anchor based vs Center based RPN
3d object detection的一般的pipeline
Anchor based vs Center based RPN
当前比较流行的3d目标检测pipeline,或是通过pillar,对3d点云进行编码,特征提取,压缩到2d,或是通过voxel,3D sparseConv,特征提取,压缩到2d,转化为2d后就可以通过2D的目标检测的方法去实现目标检测的任务。本文详细解读和对比,经典的anchor based的SSD(single short detection)和最近在3d objectection中较为流行的anchor free center based的CenterNet的原理以及在OpenPCDet框架下的具体实现。
如下是截止到2021-10-26前的,NuScenes和Waymo real time 3D object detection challenge的排行榜。比较靠前的,都采用了anchor free的模型。
SECOND Voxelization, Voxel Feature Extraction,3D SpconvConv, Map to BEV
我们以waymo dataset为例,推断从点云,经过网格化、网格特征提取、稀疏3D卷积之后的特征图的维度,
point cloud range : [-75.2, -75.2, -2, 75.2, 75.2, 4]
voxel size: [0.1, 0.1, 0.15]
经过网格化和特征提取之后,维度为[40,1504,1504, 5]
5维的特征分别是:x,y,z,intensity,elongation
经过多层的稀疏3D卷积之后,维度变为[2,188,188, 128]
通过维度变化,压缩到BEV,维度变为[188,188,256]
输入给2D Backbone和RPN的维度也就是 [188, 188, 256] HxWxC
SECOND backbone 2d
SECOND的官方论文的2d backbone如上图,
Conv2D(Cout,k,s)代表Conv2D-BN-ReLU的组合
DeConv2D(Cout,k,s)代表DeConv2D-BN-ReLU的组合
3个 Conv2D(128,3,1(2)) 经过1个Deconv2D(128,3,1)
5个Conv2D(128,3,1(2))经过1个Deconv2D(128,3,2)
5个Conv2D(256,3,1(2))经过1个Deconv2D(128,3,4)
然后叠加到一起,完成不同尺度的特征提取,输入到RPN head。
在后面SECOND的官方github,以及在OpenPCdet常用的backbone2D,简化为了下面的结构:
配置文件为:
LAYER_NUMS: [5, 5]
LAYER_STRIDES: [1, 2]
NUM_FILTERS: [128, 256]
UPSAMPLE_STRIDES: [1, 2]
NUM_UPSAMPLE_FILTERS: [256, 256]
网络结构为:
6个Conv2D(128,3,1) 经过1个Deconv2D(256,3,1),此时的HxWxC为[188,188,256]
6个Conv2D(256,3,1(2))经过1个Deconv2D(256,3,2),此时的HxWxC为[188,188,256]
concate到一起,HxWxC为[188,188,512]
或者更简单的:
LAYER_NUMS: [5]
LAYER_STRIDES: [1]
NUM_FILTERS: [128]
UPSAMPLE_STRIDES: [2]
NUM_UPSAMPLE_FILTERS: [256]
网络结构为:
6个Conv2D(128,3,1)经过1个Deconv2D(256,3,2),此时HxWxC为[376,376,256]
SECOND RPN
通过Conv2D得到三个预测头,class、box_size、direction,三个预测头的网络分别为:
Conv2D(512,2x3,1,1),2个anchor,3个class的置信度
Conv2D(512,2x7,1,1),2个anchor,7个box相对于anchor的位置和尺寸的参数,dx,dy,dz,l,w,h,a
Conv2D(512,2x2,1,1),2个anchor,box相对于anchor的角度差
SECOND training
生成anchor:
anchor的尺寸根据不同的类别,分别定义
在feature_map上,每一个位置放置2个正交的anchor
最终得到feature_map_size[0]xfeature_map[_size[1]x2xclass_num个anchors
预测的目标:
把真实的3d box,分配到anchor上
N_anchors,N_groudtruth,进行IOU计算,针对每一个anchor,挑出IOU最大的GT box,设置一个threshold,大于threshold,设为前景,小于一个threshold,设为背景。
前景的anchor,标签为对应的class;
前景的anchor和对应的gt box进行计算,得到所需要的regression的目标。
如果使用direction layer,同样也要计算一个direction的目标。(因为sin(a-b)无法区分0和pi)
预测的结果:
预测得到的结果为,对应每一个anchor,有一个预测的置信度,一个相对于anchor的位置和尺寸的偏移量,一个相对于anchor的角度的偏移量
loss的计算:
SECOND inference
得到预测的结果后,可以通过anchor的信息对预测的结果进行解码,得到对应的boxes。
针对每一种class,设定一定的阈值,对结果继续筛选。
然后进行NMS,极大值抑制的循环筛选,留下最终的boxes。
Anchorbased和Anchorfree的对比
anchorbased的detector的问题
- 在二维平面,目标物体的框都是和图像坐标对齐的,也和布置的anchor的坐标系对齐,因此很有效果。但是到了3D的空间,目标物的框的方向是任意的,anchor数量太少,无法完全覆盖。anchor数量太多,非常消耗计算资源。当前anchorbase的网络一般放置2个垂直的anchor。(0,90°)
- anchor的选取,anchor的尺寸如何定义,一般都是根据数据集中进行统计得到的,如果新增种类,size也得重新挑选,调试得到一个比较理想的值。
- 每个位置两个anchor,也会带来很多的overlapped的boxes,需要NMS对预测的结果进行后处理,在部署端,进行推理运算的时候,需要NMS(非极大值抑制)进行后处理,也非常耗费计算资源。
centerbased的detector优势
CenterNet是19年提出的anchor free的2D的目标检测方法,他不用anchor进行目标检测,而是直接预测目标的中心,以及box的尺寸。具体来说:
针对每种类别,分别预测一个热点图,通过挑选热点图的peak,可以得到物体初步的中心位置。
通过regression,预测得到box的尺寸,以及中心点相对于grid中心的偏移量。
随后这个2D detector被引入到3D目标检测的模型中。(CenterPoint,AFDet),在各大排行榜上都比较靠前。
- 无需设定anchor,因此也无需特殊选定anchor的size和方向。
- 针对同一个类别,同一个位置只有一个positive的结果,因此不需要耗时的NMS去过滤overlap的boxes.
CenterPoint backbone 2d
Centerpoint在前面的点云编码和bev backbone上没有太多的创新,基本沿用了SECOND/point pillar的思路。
github repo上的2d backbone
配置文件为:
LAYER_NUMS: [5, 5]
LAYER_STRIDES: [1, 2]
NUM_FILTERS: [128, 256]
UPSAMPLE_STRIDES: [1, 2]
NUM_UPSAMPLE_FILTERS: [256, 256]
网络结构为:
6个Conv2D(128,3,1) 经过1个Deconv2D(256,3,1),此时的HxWxC为[188,188,256]
6个Conv2D(256,3,1(2))经过1个Deconv2D(256,3,2),此时的HxWxC为[188,188,256]
concate到一起,HxWxC为[188,188,512]
CenterPoint RPN
Centerpoint的创新点在于其RPN head,引入了anchor free center based的detector(参考了CenterNet)。
在训练的时候,直接预测如下参数:
- 每种类别,预测center heatmap
- 预测box的位置:dx,dy,用以修正中心点的位置
- 预测box的高度:z
- 预测box的尺寸:w,h,l
- 预测box的方向:sin(a),cos(a),直接确定heading
paper和github repo上具体的网络结构:
共享卷积层:Conv2D(512,64,3,1) + BN +ReLu
检测头:common_heads={‘hm’:(3,2),‘reg’: (2, 2), ‘height’: (1, 2), ‘dim’:(3, 2), ‘rot’:(2, 2)} # (output_channel, num_conv)
以’hm’检测头为例:Conv2D(64,64,3,1)+BN+ReLu+Conv2D(64,3,3,1)
CenterPoint training
同anchor based的方法,最关键的一步也是如何定义目标。
hm的目标,是把真实的box,通过高斯图投射到map_view中。
CenterPoint loss
分类的loss, Focal loss
regression的loss, L1 loss
CenterPoint inference
根据hm和box的预测结果,进行解码,输出,box和class.
根据CenterNet的论文,直接使用max pooling就可以确定某一个目标,而不需要nms来处理。但是在bev视角下的feature map下,物体都很小,可能多个物体在同一个grid下,因此centerpoint并没有使用maxpooling,还是用到了nms的方法。
CenterPoint ++
在21年,CenterPoint 进行升级,在稍微提高原有的精度的前提下,进一步的提高了推理速度,主要引入了如下的变化点:
IOU aware
参照CIA-SSD的方法。
在单阶段的模型中,localization accuracy and classification con
fidence是两个检测头,分别训练和预测,相互之间并没有直接联系,但是事实上和GT box的IOU越大的box,classification的confidence应该也越高。考虑到这一点,可以增加一个IOU预测头。
在训练阶段,去检测box和gt的iou大小作为target,计算l1 loss,并修正IOU的预测模型。
在推理的时候,得到这个IOU,并把这个IOU去加载到classification confidence上,提高预测的准确度。
temporal multi frame input
参照nuScene提出的方法。
在当前点云的特征上,增加时间维度;把前几帧的点云,通过坐标变换,也变换到当前点云的坐标系下。
点云更密,有历史信息,但是计算会更加耗时,也需要多帧输入的准确的坐标变换关系。
hard voxelization-> dynamic voxelization
hard voxelization,最先由voxelnet提出,设定了voxel的总数量,以及voxel内点的上限要求,因此在做voxelization的时候,会丢弃掉voxel内点的信息,而且也会丢掉部分的voxel,降低一定的精度。
参考MVF提出的 dynamic voxelization的概念,在GPU上,实现dynamic voxelization。可以保留完整的点的信息,保证了一定的精度。同时CenterPoint在GPU上实现了dynamic voxelization,相对于其在CPU上的实现,大大提高了速度。50ms->2ms
two stage
针对第一阶段预测得到的box,提取box的四个面的中心点和box的中心点。
然后从featue map上提取这5个点的特征,并叠加到一起,送入第二个stage。
第二个stage,通过MLP,预测box的confidence和localization。作为对第一个阶段结果的refinment.
各个模块对精度的影响,作者做了对比实验。
详细的代码:
backbone_2d
'''
输入backbone 2d的类型,model_cfg.BACKBONE_2D
输入input_channels,model_info_dict['num_bev_features HxWxC,上一层网络的输出]
输出 backbone_2d_module 的模型
输出model_info_dict,增加了经过bev backbone得到的特征
'''
def build_backbone_2d(self, model_info_dict):
if self.model_cfg.get('BACKBONE_2D', None) is None:
return None, model_info_dict
backbone_2d_module = backbones_2d.__all__[self.model_cfg.BACKBONE_2D.NAME](
model_cfg=self.model_cfg.BACKBONE_2D,
input_channels=model_info_dict['num_bev_features']
)
model_info_dict['module_list'].append(backbone_2d_module)
model_info_dict['num_bev_features'] = backbone_2d_module.num_bev_features
return backbone_2d_module, model_info_dict
BaseBEVBackbone
'''
init,中初始化2D的模型
forward,导入模型,导入上一层的特征,计算得到本层的特征
'''
class BaseBEVBackbone(nn.Module):
def __init__(self, model_cfg, input_channels):
super().__init__()
self.model_cfg = model_cfg
#读取2dbackbone的基本参数
if self.model_cfg.get('LAYER_NUMS', None) is not None:
assert len(self.model_cfg.LAYER_NUMS) == len(self.model_cfg.LAYER_STRIDES) == len(self.model_cfg.NUM_FILTERS)
layer_nums = self.model_cfg.LAYER_NUMS
layer_strides = self.model_cfg.LAYER_STRIDES
num_filters = self.model_cfg.NUM_FILTERS
else:
layer_nums = layer_strides = num_filters = []
#读取上采样的参数
if self.model_cfg.get('UPSAMPLE_STRIDES', None) is not None:
assert len(self.model_cfg.UPSAMPLE_STRIDES) == len(self.model_cfg.NUM_UPSAMPLE_FILTERS)
num_upsample_filters = self.model_cfg.NUM_UPSAMPLE_FILTERS
upsample_strides = self.model_cfg.UPSAMPLE_STRIDES
else:
upsample_strides = num_upsample_filters = []
num_levels = len(layer_nums)
c_in_list = [input_channels, *num_filters[:-1]]
self.blocks = nn.ModuleList()
self.deblocks = nn.ModuleList()
for idx in range(num_levels):
cur_layers = [
nn.ZeroPad2d(1),
nn.Conv2d(
c_in_list[idx], num_filters[idx], kernel_size=3,
stride=layer_strides[idx], padding=0, bias=False
),
nn.BatchNorm2d(num_filters[idx], eps=1e-3, momentum=0.01),
nn.ReLU()
]
for k in range(layer_nums[idx]):
cur_layers.extend([
nn.Conv2d(num_filters[idx], num_filters[idx], kernel_size=3, padding=1, bias=False),
nn.BatchNorm2d(num_filters[idx], eps=1e-3, momentum=0.01),
nn.ReLU()
])
self.blocks.append(nn.Sequential(*cur_layers))
if len(upsample_strides) > 0:
stride = upsample_strides[idx]
if stride >= 1:
self.deblocks.append(nn.Sequential(
nn.ConvTranspose2d(
num_filters[idx], num_upsample_filters[idx],
upsample_strides[idx],
stride=upsample_strides[idx], bias=False
),
nn.BatchNorm2d(num_upsample_filters[idx], eps=1e-3, momentum=0.01),
nn.ReLU()
))
else:
stride = np.round(1 / stride).astype(np.int)
self.deblocks.append(nn.Sequential(
nn.Conv2d(
num_filters[idx], num_upsample_filters[idx],
stride,
stride=stride, bias=False
),
nn.BatchNorm2d(num_upsample_filters[idx], eps=1e-3, momentum=0.01),
nn.ReLU()
))
c_in = sum(num_upsample_filters)
if len(upsample_strides) > num_levels:
self.deblocks.append(nn.Sequential(
nn.ConvTranspose2d(c_in, c_in, upsample_strides[-1], stride=upsample_strides[-1], bias=False),
nn.BatchNorm2d(c_in, eps=1e-3, momentum=0.01),
nn.ReLU(),
))
self.num_bev_features = c_in
def forward(self, data_dict):
"""
Args:
data_dict:
spatial_features
Returns:
"""
spatial_features = data_dict['spatial_features']
ups = []
ret_dict = {}
x = spatial_features
for i in range(len(self.blocks)):
x = self.blocks[i](x)
stride = int(spatial_features.shape[2] / x.shape[2])
ret_dict['spatial_features_%dx' % stride] = x
if len(self.deblocks) > 0:
ups.append(self.deblocks[i](x))
else:
ups.append(x)
if len(ups) > 1:
x = torch.cat(ups, dim=1)
elif len(ups) == 1:
x = ups[0]
if len(self.deblocks) > len(self.blocks):
x = self.deblocks[-1](x)
data_dict['spatial_features_2d'] = x
return data_dict
anchor_head_single
class AnchorHeadSingle(AnchorHeadTemplate):
def __init__(self, model_cfg, input_channels, num_class, class_names, grid_size, point_cloud_range,
predict_boxes_when_training=True, **kwargs):
super().__init__(
model_cfg=model_cfg, num_class=num_class, class_names=class_names, grid_size=grid_size, point_cloud_range=point_cloud_range,
predict_boxes_when_training=predict_boxes_when_training
)
self.num_anchors_per_location = sum(self.num_anchors_per_location)
#定义classification_head,输出每个位置的,anchor(x)xclass
self.conv_cls = nn.Conv2d(
input_channels, self.num_anchors_per_location * self.num_class,
kernel_size=1
)
#定义regression_head,输出每个位置的,anchor(x)xbox尺寸的数量
#比如每个位置两个正交的anchor, 预测的box和anchor的相对位置关系
self.conv_box = nn.Conv2d(
input_channels, self.num_anchors_per_location * self.box_coder.code_size,
kernel_size=1
)
#定义direction_head,输出每个位置的,anchor(x)x角度编码
if self.model_cfg.get('USE_DIRECTION_CLASSIFIER', None) is not None:
self.conv_dir_cls = nn.Conv2d(
input_channels,
self.num_anchors_per_location * self.model_cfg.NUM_DIR_BINS,
kernel_size=1
)
else:
self.conv_dir_cls = None
self.init_weights()
def init_weights(self):
pi = 0.01
nn.init.constant_(self.conv_cls.bias, -np.log((1 - pi) / pi))
nn.init.normal_(self.conv_box.weight, mean=0, std=0.001)
def forward(self, data_dict):
#推理计算,预测得到结果
spatial_features_2d = data_dict['spatial_features_2d']
cls_preds = self.conv_cls(spatial_features_2d)
box_preds = self.conv_box(spatial_features_2d)
cls_preds = cls_preds.permute(0, 2, 3, 1).contiguous() # [N, H, W, C]
box_preds = box_preds.permute(0, 2, 3, 1).contiguous() # [N, H, W, C]
self.forward_ret_dict['cls_preds'] = cls_preds
self.forward_ret_dict['box_preds'] = box_preds
if self.conv_dir_cls is not None:
dir_cls_preds = self.conv_dir_cls(spatial_features_2d)
dir_cls_preds = dir_cls_preds.permute(0, 2, 3, 1).contiguous()
self.forward_ret_dict['dir_cls_preds'] = dir_cls_preds
else:
dir_cls_preds = None
#如果训练,计算目标
if self.training:
targets_dict = self.assign_targets(
gt_boxes=data_dict['gt_boxes']
)
self.forward_ret_dict.update(targets_dict)
#如果没训练,但是需要得到box,那就解码预测的结果,得到box的结果
if not self.training or self.predict_boxes_when_training:
batch_cls_preds, batch_box_preds = self.generate_predicted_boxes(
batch_size=data_dict['batch_size'],
cls_preds=cls_preds, box_preds=box_preds, dir_cls_preds=dir_cls_preds
)
data_dict['batch_cls_preds'] = batch_cls_preds
data_dict['batch_box_preds'] = batch_box_preds
data_dict['cls_preds_normalized'] = False
return data_dict
anchor_head_template
class AnchorHeadTemplate(nn.Module):
def __init__(self, model_cfg, num_class, class_names, grid_size, point_cloud_range, predict_boxes_when_training):
super().__init__()
self.model_cfg = model_cfg
self.num_class = num_class
self.class_names = class_names
self.predict_boxes_when_training = predict_boxes_when_training
self.use_multihead = self.model_cfg.get('USE_MULTIHEAD', False)
anchor_target_cfg = self.model_cfg.TARGET_ASSIGNER_CONFIG
self.box_coder = getattr(box_coder_utils, anchor_target_cfg.BOX_CODER)(
num_dir_bins=anchor_target_cfg.get('NUM_DIR_BINS', 6),
**anchor_target_cfg.get('BOX_CODER_CONFIG', {})
)
#生成anchors
anchor_generator_cfg = self.model_cfg.ANCHOR_GENERATOR_CONFIG
anchors, self.num_anchors_per_location = self.generate_anchors(
anchor_generator_cfg, grid_size=grid_size, point_cloud_range=point_cloud_range,
anchor_ndim=self.box_coder.code_size
)
self.anchors = [x.cuda() for x in anchors]
#分配anchor和gtbox,得到预测的目标,class和box(box和anchor的偏移)
self.target_assigner = self.get_target_assigner(anchor_target_cfg)
self.forward_ret_dict = {}
self.build_losses(self.model_cfg.LOSS_CONFIG)
@staticmethod
def generate_anchors(anchor_generator_cfg, grid_size, point_cloud_range, anchor_ndim=7):
anchor_generator = AnchorGenerator(
anchor_range=point_cloud_range,
anchor_generator_config=anchor_generator_cfg
)
feature_map_size = [grid_size[:2] // config['feature_map_stride'] for config in anchor_generator_cfg]
anchors_list, num_anchors_per_location_list = anchor_generator.generate_anchors(feature_map_size)
if anchor_ndim != 7:
for idx, anchors in enumerate(anchors_list):
pad_zeros = anchors.new_zeros([*anchors.shape[0:-1], anchor_ndim - 7])
new_anchors = torch.cat((anchors, pad_zeros), dim=-1)
anchors_list[idx] = new_anchors
return anchors_list, num_anchors_per_location_list
def get_target_assigner(self, anchor_target_cfg):
if anchor_target_cfg.NAME == 'ATSS':
target_assigner = ATSSTargetAssigner(
topk=anchor_target_cfg.TOPK,
box_coder=self.box_coder,
use_multihead=self.use_multihead,
match_height=anchor_target_cfg.MATCH_HEIGHT
)
elif anchor_target_cfg.NAME == 'AxisAlignedTargetAssigner':
target_assigner = AxisAlignedTargetAssigner(
model_cfg=self.model_cfg,
class_names=self.class_names,
box_coder=self.box_coder,
match_height=anchor_target_cfg.MATCH_HEIGHT
)
else:
raise NotImplementedError
return target_assigner
def build_losses(self, losses_cfg):
self.add_module(
'cls_loss_func',
loss_utils.SigmoidFocalClassificationLoss(alpha=0.25, gamma=2.0)
)
reg_loss_name = 'WeightedSmoothL1Loss' if losses_cfg.get('REG_LOSS_TYPE', None) is None \
else losses_cfg.REG_LOSS_TYPE
self.add_module(
'reg_loss_func',
getattr(loss_utils, reg_loss_name)(code_weights=losses_cfg.LOSS_WEIGHTS['code_weights'])
)
self.add_module(
'dir_loss_func',
loss_utils.WeightedCrossEntropyLoss()
)
def assign_targets(self, gt_boxes):
"""
Args:
gt_boxes: (B, M, 8)
Returns:
"""
targets_dict = self.target_assigner.assign_targets(
self.anchors, gt_boxes
)
return targets_dict
def get_cls_layer_loss(self):
cls_preds = self.forward_ret_dict['cls_preds']
box_cls_labels = self.forward_ret_dict['box_cls_labels']
batch_size = int(cls_preds.shape[0])
cared = box_cls_labels >= 0 # [N, num_anchors]
positives = box_cls_labels > 0
negatives = box_cls_labels == 0
negative_cls_weights = negatives * 1.0
cls_weights = (negative_cls_weights + 1.0 * positives).float()
reg_weights = positives.float()
if self.num_class == 1:
# class agnostic
box_cls_labels[positives] = 1
pos_normalizer = positives.sum(1, keepdim=True).float()
reg_weights /= torch.clamp(pos_normalizer, min=1.0)
cls_weights /= torch.clamp(pos_normalizer, min=1.0)
cls_targets = box_cls_labels * cared.type_as(box_cls_labels)
cls_targets = cls_targets.unsqueeze(dim=-1)
cls_targets = cls_targets.squeeze(dim=-1)
one_hot_targets = torch.zeros(
*list(cls_targets.shape), self.num_class + 1, dtype=cls_preds.dtype, device=cls_targets.device
)
one_hot_targets.scatter_(-1, cls_targets.unsqueeze(dim=-1).long(), 1.0)
cls_preds = cls_preds.view(batch_size, -1, self.num_class)
one_hot_targets = one_hot_targets[..., 1:]
cls_loss_src = self.cls_loss_func(cls_preds, one_hot_targets, weights=cls_weights) # [N, M]
cls_loss = cls_loss_src.sum() / batch_size
cls_loss = cls_loss * self.model_cfg.LOSS_CONFIG.LOSS_WEIGHTS['cls_weight']
tb_dict = {
'rpn_loss_cls': cls_loss.item()
}
return cls_loss, tb_dict
@staticmethod
def add_sin_difference(boxes1, boxes2, dim=6):
assert dim != -1
rad_pred_encoding = torch.sin(boxes1[..., dim:dim + 1]) * torch.cos(boxes2[..., dim:dim + 1])
rad_tg_encoding = torch.cos(boxes1[..., dim:dim + 1]) * torch.sin(boxes2[..., dim:dim + 1])
boxes1 = torch.cat([boxes1[..., :dim], rad_pred_encoding, boxes1[..., dim + 1:]], dim=-1)
boxes2 = torch.cat([boxes2[..., :dim], rad_tg_encoding, boxes2[..., dim + 1:]], dim=-1)
return boxes1, boxes2
@staticmethod
def get_direction_target(anchors, reg_targets, one_hot=True, dir_offset=0, num_bins=2):
batch_size = reg_targets.shape[0]
anchors = anchors.view(batch_size, -1, anchors.shape[-1])
rot_gt = reg_targets[..., 6] + anchors[..., 6]
offset_rot = common_utils.limit_period(rot_gt - dir_offset, 0, 2 * np.pi)
dir_cls_targets = torch.floor(offset_rot / (2 * np.pi / num_bins)).long()
dir_cls_targets = torch.clamp(dir_cls_targets, min=0, max=num_bins - 1)
if one_hot:
dir_targets = torch.zeros(*list(dir_cls_targets.shape), num_bins, dtype=anchors.dtype,
device=dir_cls_targets.device)
dir_targets.scatter_(-1, dir_cls_targets.unsqueeze(dim=-1).long(), 1.0)
dir_cls_targets = dir_targets
return dir_cls_targets
def get_box_reg_layer_loss(self):
box_preds = self.forward_ret_dict['box_preds']
box_dir_cls_preds = self.forward_ret_dict.get('dir_cls_preds', None)
box_reg_targets = self.forward_ret_dict['box_reg_targets']
box_cls_labels = self.forward_ret_dict['box_cls_labels']
batch_size = int(box_preds.shape[0])
positives = box_cls_labels > 0
reg_weights = positives.float()
pos_normalizer = positives.sum(1, keepdim=True).float()
reg_weights /= torch.clamp(pos_normalizer, min=1.0)
if isinstance(self.anchors, list):
if self.use_multihead:
anchors = torch.cat(
[anchor.permute(3, 4, 0, 1, 2, 5).contiguous().view(-1, anchor.shape[-1]) for anchor in
self.anchors], dim=0)
else:
anchors = torch.cat(self.anchors, dim=-3)
else:
anchors = self.anchors
anchors = anchors.view(1, -1, anchors.shape[-1]).repeat(batch_size, 1, 1)
box_preds = box_preds.view(batch_size, -1,
box_preds.shape[-1] // self.num_anchors_per_location if not self.use_multihead else
box_preds.shape[-1])
# sin(a - b) = sinacosb-cosasinb
box_preds_sin, reg_targets_sin = self.add_sin_difference(box_preds, box_reg_targets)
loc_loss_src = self.reg_loss_func(box_preds_sin, reg_targets_sin, weights=reg_weights) # [N, M]
loc_loss = loc_loss_src.sum() / batch_size
loc_loss = loc_loss * self.model_cfg.LOSS_CONFIG.LOSS_WEIGHTS['loc_weight']
box_loss = loc_loss
tb_dict = {
'rpn_loss_loc': loc_loss.item()
}
if box_dir_cls_preds is not None:
dir_targets = self.get_direction_target(
anchors, box_reg_targets,
dir_offset=self.model_cfg.DIR_OFFSET,
num_bins=self.model_cfg.NUM_DIR_BINS
)
dir_logits = box_dir_cls_preds.view(batch_size, -1, self.model_cfg.NUM_DIR_BINS)
weights = positives.type_as(dir_logits)
weights /= torch.clamp(weights.sum(-1, keepdim=True), min=1.0)
dir_loss = self.dir_loss_func(dir_logits, dir_targets, weights=weights)
dir_loss = dir_loss.sum() / batch_size
dir_loss = dir_loss * self.model_cfg.LOSS_CONFIG.LOSS_WEIGHTS['dir_weight']
box_loss += dir_loss
tb_dict['rpn_loss_dir'] = dir_loss.item()
return box_loss, tb_dict
def get_loss(self):
cls_loss, tb_dict = self.get_cls_layer_loss()
box_loss, tb_dict_box = self.get_box_reg_layer_loss()
tb_dict.update(tb_dict_box)
rpn_loss = cls_loss + box_loss
tb_dict['rpn_loss'] = rpn_loss.item()
return rpn_loss, tb_dict
def generate_predicted_boxes(self, batch_size, cls_preds, box_preds, dir_cls_preds=None):
"""
Args:
batch_size:
cls_preds: (N, H, W, C1)
box_preds: (N, H, W, C2)
dir_cls_preds: (N, H, W, C3)
Returns:
batch_cls_preds: (B, num_boxes, num_classes)
batch_box_preds: (B, num_boxes, 7+C)
"""
if isinstance(self.anchors, list):
if self.use_multihead:
anchors = torch.cat([anchor.permute(3, 4, 0, 1, 2, 5).contiguous().view(-1, anchor.shape[-1])
for anchor in self.anchors], dim=0)
else:
anchors = torch.cat(self.anchors, dim=-3)
else:
anchors = self.anchors
num_anchors = anchors.view(-1, anchors.shape[-1]).shape[0]
batch_anchors = anchors.view(1, -1, anchors.shape[-1]).repeat(batch_size, 1, 1)
batch_cls_preds = cls_preds.view(batch_size, num_anchors, -1).float() \
if not isinstance(cls_preds, list) else cls_preds
batch_box_preds = box_preds.view(batch_size, num_anchors, -1) if not isinstance(box_preds, list) \
else torch.cat(box_preds, dim=1).view(batch_size, num_anchors, -1)
batch_box_preds = self.box_coder.decode_torch(batch_box_preds, batch_anchors)
if dir_cls_preds is not None:
dir_offset = self.model_cfg.DIR_OFFSET
dir_limit_offset = self.model_cfg.DIR_LIMIT_OFFSET
dir_cls_preds = dir_cls_preds.view(batch_size, num_anchors, -1) if not isinstance(dir_cls_preds, list) \
else torch.cat(dir_cls_preds, dim=1).view(batch_size, num_anchors, -1)
dir_labels = torch.max(dir_cls_preds, dim=-1)[1]
period = (2 * np.pi / self.model_cfg.NUM_DIR_BINS)
dir_rot = common_utils.limit_period(
batch_box_preds[..., 6] - dir_offset, dir_limit_offset, period
)
batch_box_preds[..., 6] = dir_rot + dir_offset + period * dir_labels.to(batch_box_preds.dtype)
if isinstance(self.box_coder, box_coder_utils.PreviousResidualDecoder):
batch_box_preds[..., 6] = common_utils.limit_period(
-(batch_box_preds[..., 6] + np.pi / 2), offset=0.5, period=np.pi * 2
)
return batch_cls_preds, batch_box_preds
def forward(self, **kwargs):
raise NotImplementedError
CenterHead in OpenPCDet
class CenterHead(nn.Module):
def __init__(self, model_cfg, input_channels, num_class, class_names, grid_size, point_cloud_range,
predict_boxes_when_training=True):
super().__init__()
self.model_cfg = model_cfg
self.num_class = num_class
self.class_names = [class_names]
self.predict_boxes_when_training = predict_boxes_when_training
self.use_multihead = self.model_cfg.get('USE_MULTIHEAD', False)
target_cfg = self.model_cfg.TARGET_ASSIGNER_CONFIG
self.target_cfg = target_cfg
self.grid_size = grid_size
self.point_cloud_range = point_cloud_range
self.forward_ret_dict = {}
#相对于原paper,用了非常简单的检测头
self.conv_cls = nn.Conv2d(
input_channels, self.num_class,
kernel_size=1
)
self.conv_box = nn.Conv2d(
input_channels, 8,
kernel_size=1
)
self.loss_cls = GaussianFocalLoss(reduction='mean')
self.init_weights()
def init_weights(self):
pi = 0.01
nn.init.constant_(self.conv_cls.bias, -np.log((1 - pi) / pi))
nn.init.normal_(self.conv_box.weight, mean=0, std=0.001)
def forward(self, data_dict):
spatial_features_2d = data_dict['spatial_features_2d']
cls_preds = self.conv_cls(spatial_features_2d)
box_preds = self.conv_box(spatial_features_2d)
cls_preds = cls_preds.permute(0, 2, 3, 1).contiguous() # [N, H, W, C]
box_preds = box_preds.permute(0, 2, 3, 1).contiguous() # [N, H, W, C]
self.forward_ret_dict['cls_preds'] = cls_preds
self.forward_ret_dict['box_preds'] = box_preds
#如果训练,获得预测的目标
if self.training:
targets_dict = self.assign_targets(
gt_boxes=data_dict['gt_boxes']
)
self.forward_ret_dict.update(targets_dict)
#如果推理或者训练时要预测box,解码box
if not self.training or self.predict_boxes_when_training:
batch_cls_preds, batch_box_preds = self.generate_predicted_boxes(
batch_size=data_dict['batch_size'],
cls_preds=cls_preds, box_preds=box_preds, dir_cls_preds=None
)
data_dict['batch_cls_preds'] = batch_cls_preds
data_dict['batch_box_preds'] = batch_box_preds
data_dict['cls_preds_normalized'] = False
return data_dict
def _gather_feat(self, feat, ind, mask=None):
"""Gather feature map.
Given feature map and index, return indexed feature map.
Args:
feat (torch.tensor): Feature map with the shape of [B, H*W, 10].
ind (torch.Tensor): Index of the ground truth boxes with the
shape of [B, max_obj].
mask (torch.Tensor): Mask of the feature map with the shape
of [B, max_obj]. Default: None.
Returns:
torch.Tensor: Feature map after gathering with the shape
of [B, max_obj, 10].
"""
dim = feat.size(2)
ind = ind.unsqueeze(2).expand(ind.size(0), ind.size(1), dim)
feat = feat.gather(1, ind)
if mask is not None:
mask = mask.unsqueeze(2).expand_as(feat)
feat = feat[mask]
feat = feat.view(-1, dim)
return feat
def assign_targets(self, gt_boxes):
"""Generate targets.
Args:
gt_boxes: (B, M, 8) box + cls
Returns:
Returns:
tuple
]: Tuple of target including \
the following results in order.
- list[torch.Tensor]: Heatmap scores.
- list[torch.Tensor]: Ground truth boxes.
- list[torch.Tensor]: Indexes indicating the \
position of the valid boxes.
- list[torch.Tensor]: Masks indicating which \
boxes are valid.
"""
gt_bboxes_3d, gt_labels_3d = gt_boxes[..., :-1], gt_boxes[..., -1]
heatmaps, anno_boxes, inds, masks = multi_apply(
self.get_targets_single, gt_bboxes_3d, gt_labels_3d)
# transpose heatmaps, because the dimension of tensors in each task is
# different, we have to use numpy instead of torch to do the transpose.
heatmaps = np.array(heatmaps).transpose(1, 0).tolist()
heatmaps = [torch.stack(hms_) for hms_ in heatmaps]
# transpose anno_boxes
anno_boxes = np.array(anno_boxes).transpose(1, 0).tolist()
anno_boxes = [torch.stack(anno_boxes_) for anno_boxes_ in anno_boxes]
# transpose inds
inds = np.array(inds).transpose(1, 0).tolist()
inds = [torch.stack(inds_) for inds_ in inds]
# transpose inds
masks = np.array(masks).transpose(1, 0).tolist()
masks = [torch.stack(masks_) for masks_ in masks]
all_targets_dict = {
'heatmaps': heatmaps,
'anno_boxes': anno_boxes,
'inds': inds,
'masks': masks
}
return all_targets_dict
def get_targets_single(self, gt_bboxes_3d, gt_labels_3d):
"""Generate training targets for a single sample.
Args:
gt_bboxes_3d (:obj:`LiDARInstance3DBoxes`): Ground truth gt boxes.
gt_labels_3d (torch.Tensor): Labels of boxes.
Returns:
tuple
]: Tuple of target including \
the following results in order.
- list[torch.Tensor]: Heatmap scores.
- list[torch.Tensor]: Ground truth boxes.
- list[torch.Tensor]: Indexes indicating the position \
of the valid boxes.
- list[torch.Tensor]: Masks indicating which boxes \
are valid.
"""
device = gt_labels_3d.device
"""gt_bboxes_3d = torch.cat(
(gt_bboxes_3d.gravity_center, gt_bboxes_3d.tensor[:, 3:]),
dim=1).to(device)
"""
max_objs = self.target_cfg.MAX_OBJS
grid_size = torch.tensor(self.grid_size)
pc_range = torch.tensor(self.point_cloud_range)
voxel_size = torch.tensor(self.target_cfg.VOXEL_SIZE)
feature_map_size = grid_size[:2] // self.target_cfg.OUT_SIZE_FACTOR
"""
# reorganize the gt_dict by tasks
task_masks = []
flag = 0
for class_name in self.class_names:
print(gt_labels_3d)
task_masks.append([
torch.where(gt_labels_3d == class_name.index(i) + flag)
for i in class_name
])
flag += len(class_name)
task_boxes = []
task_classes = []
flag2 = 0
for idx, mask in enumerate(task_masks):
task_box = []
task_class = []
for m in mask:
task_box.append(gt_bboxes_3d[m])
# 0 is background for each task, so we need to add 1 here.
task_class.append(gt_labels_3d[m] - flag2)
task_boxes.append(torch.cat(task_box, axis=0).to(device))
task_classes.append(torch.cat(task_class).long().to(device))
flag2 += len(mask)
"""
task_boxes = [gt_bboxes_3d]
task_classes = [gt_labels_3d]
draw_gaussian = draw_heatmap_gaussian
heatmaps, anno_boxes, inds, masks = [], [], [], []
for idx in range(1):
heatmap = gt_bboxes_3d.new_zeros(
(len(self.class_names[idx]), feature_map_size[1],
feature_map_size[0]))
anno_box = gt_bboxes_3d.new_zeros((max_objs, 8),
dtype=torch.float32)
ind = gt_labels_3d.new_zeros((max_objs), dtype=torch.int64)
mask = gt_bboxes_3d.new_zeros((max_objs), dtype=torch.uint8)
num_objs = min(task_boxes[idx].shape[0], max_objs)
for k in range(num_objs):
cls_id = (task_classes[idx][k] - 1).int()
width = task_boxes[idx][k][3]
length = task_boxes[idx][k][4]
width = width / voxel_size[0] / self.target_cfg.OUT_SIZE_FACTOR
length = length / voxel_size[1] / self.target_cfg.OUT_SIZE_FACTOR
if width > 0 and length > 0:
radius = gaussian_radius(
(length, width),
min_overlap=self.target_cfg.GAUSSIAN_OVERLAP)
radius = max(self.target_cfg.MIN_RADIUS, int(radius))
# be really careful for the coordinate system of
# your box annotation.
x, y, z = task_boxes[idx][k][0], task_boxes[idx][k][
1], task_boxes[idx][k][2]
coor_x = (
x - pc_range[0]
) / voxel_size[0] / self.target_cfg.OUT_SIZE_FACTOR
coor_y = (
y - pc_range[1]
) / voxel_size[1] / self.target_cfg.OUT_SIZE_FACTOR
center = torch.tensor([coor_x, coor_y],
dtype=torch.float32,
device=device)
center_int = center.to(torch.int32)
# throw out not in range objects to avoid out of array
# area when creating the heatmap
if not (0 <= center_int[0] < feature_map_size[0]
and 0 <= center_int[1] < feature_map_size[1]):
continue
draw_gaussian(heatmap[cls_id], center_int, radius)
new_idx = k
x, y = center_int[0], center_int[1]
assert (y * feature_map_size[0] + x <
feature_map_size[0] * feature_map_size[1])
ind[new_idx] = y * feature_map_size[0] + x
mask[new_idx] = 1
rot = task_boxes[idx][k][6]
box_dim = task_boxes[idx][k][3:6]
box_dim = box_dim.log()
anno_box[new_idx] = torch.cat([
center - torch.tensor([x, y], device=device),
z.unsqueeze(0), box_dim,
torch.sin(rot).unsqueeze(0),
torch.cos(rot).unsqueeze(0),
])
heatmaps.append(heatmap)
anno_boxes.append(anno_box)
masks.append(mask)
inds.append(ind)
return heatmaps, anno_boxes, inds, masks
def generate_predicted_boxes(self, batch_size, cls_preds, box_preds, dir_cls_preds=None):
"""
Args:
batch_size:
cls_preds: (N, H, W, C1)
box_preds: (N, H, W, C2)
dir_cls_preds: (N, H, W, C3)
Returns:
batch_cls_preds: (B, num_boxes, num_classes)
batch_box_preds: (B, num_boxes, 7+C)
"""
batch, H, W, code_size = box_preds.size()
box_preds = box_preds.reshape(batch, H*W, code_size)
batch_reg = box_preds[..., 0:2]
batch_hei = box_preds[..., 2:3]
batch_dim = torch.exp(box_preds[..., 3:6])
batch_rots = box_preds[..., 6:7]
batch_rotc = box_preds[..., 7:8]
ys, xs = torch.meshgrid([torch.arange(0, H), torch.arange(0, W)])
ys = ys.view(1, H, W).repeat(batch, 1, 1).to(cls_preds.device)
xs = xs.view(1, H, W).repeat(batch, 1, 1).to(cls_preds.device)
xs = xs.view(batch, -1, 1) + batch_reg[:, :, 0:1]
ys = ys.view(batch, -1, 1) + batch_reg[:, :, 1:2]
xs = xs * self.target_cfg.OUT_SIZE_FACTOR * self.target_cfg.VOXEL_SIZE[0] + self.point_cloud_range[0]
ys = ys * self.target_cfg.OUT_SIZE_FACTOR * self.target_cfg.VOXEL_SIZE[1] + self.point_cloud_range[1]
rot = torch.atan2(batch_rots, batch_rotc)
batch_box_preds = torch.cat([xs, ys, batch_hei, batch_dim, rot], dim=2)
batch_cls_preds = cls_preds.view(batch, H*W, -1)
return batch_cls_preds, batch_box_preds
def get_loss(self):
cls_loss, tb_dict = self.get_cls_layer_loss()
box_loss, tb_dict_box = self.get_box_reg_layer_loss()
tb_dict.update(tb_dict_box)
rpn_loss = cls_loss + box_loss
tb_dict['rpn_loss'] = rpn_loss.item()
return rpn_loss, tb_dict
def get_cls_layer_loss(self):
# NHWC -> NCHW
pred_heatmaps = clip_sigmoid(self.forward_ret_dict['cls_preds']).permute(0, 3, 1, 2)
gt_heatmaps = self.forward_ret_dict['heatmaps'][0]
num_pos = gt_heatmaps.eq(1).float().sum().item()
cls_loss = self.loss_cls(
pred_heatmaps,
gt_heatmaps,
avg_factor=max(num_pos, 1))
cls_loss = cls_loss * self.model_cfg.LOSS_CONFIG.LOSS_WEIGHTS['cls_weight']
tb_dict = {
'rpn_loss_cls': cls_loss.item()
}
return cls_loss, tb_dict
def get_box_reg_layer_loss(self):
# Regression loss for dimension, offset, height, rotation
target_box, inds, masks = self.forward_ret_dict['anno_boxes'][0], self.forward_ret_dict['inds'][0], self.forward_ret_dict['masks'][0]
ind = inds
num = masks.float().sum()
pred = self.forward_ret_dict['box_preds'] # N x (HxW) x 7
pred = pred.view(pred.size(0), -1, pred.size(3))
pred = self._gather_feat(pred, ind)
mask = masks.unsqueeze(2).expand_as(target_box).float()
isnotnan = (~torch.isnan(target_box)).float()
mask *= isnotnan
code_weights = self.model_cfg.LOSS_CONFIG.LOSS_WEIGHTS['code_weights']
bbox_weights = mask * mask.new_tensor(code_weights)
loc_loss = l1_loss(
pred, target_box, bbox_weights, avg_factor=(num + 1e-4))
loc_loss = loc_loss * self.model_cfg.LOSS_CONFIG.LOSS_WEIGHTS['loc_weight']
box_loss = loc_loss
tb_dict = {
'rpn_loss_loc': loc_loss.item()
}
return box_loss, tb_dict
版权声明:本文为CSDN博主「huang_victor」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/huang_victor/article/details/120778544
暂无评论