Lidar Object detection

3d object detection的一般的pipeline

在这里插入图片描述
在这里插入图片描述

Anchor based vs Center based RPN

当前比较流行的3d目标检测pipeline,或是通过pillar,对3d点云进行编码,特征提取,压缩到2d,或是通过voxel,3D sparseConv,特征提取,压缩到2d,转化为2d后就可以通过2D的目标检测的方法去实现目标检测的任务。本文详细解读和对比,经典的anchor based的SSD(single short detection)和最近在3d objectection中较为流行的anchor free center based的CenterNet的原理以及在OpenPCDet框架下的具体实现。

如下是截止到2021-10-26前的,NuScenes和Waymo real time 3D object detection challenge的排行榜。比较靠前的,都采用了anchor free的模型。
在这里插入图片描述
在这里插入图片描述

SECOND Voxelization, Voxel Feature Extraction,3D SpconvConv, Map to BEV

在这里插入图片描述
我们以waymo dataset为例,推断从点云,经过网格化、网格特征提取、稀疏3D卷积之后的特征图的维度,
point cloud range : [-75.2, -75.2, -2, 75.2, 75.2, 4]
voxel size: [0.1, 0.1, 0.15]
经过网格化和特征提取之后,维度为[40,1504,1504, 5]
5维的特征分别是:x,y,z,intensity,elongation
经过多层的稀疏3D卷积之后,维度变为[2,188,188, 128]
通过维度变化,压缩到BEV,维度变为[188,188,256]
输入给2D Backbone和RPN的维度也就是 [188, 188, 256] HxWxC

SECOND backbone 2d

backbone2d
SECOND的官方论文的2d backbone如上图,
Conv2D(Cout,k,s)代表Conv2D-BN-ReLU的组合
DeConv2D(Cout,k,s)代表DeConv2D-BN-ReLU的组合
3个 Conv2D(128,3,1(2)) 经过1个Deconv2D(128,3,1)
5个Conv2D(128,3,1(2))经过1个Deconv2D(128,3,2)
5个Conv2D(256,3,1(2))经过1个Deconv2D(128,3,4)
然后叠加到一起,完成不同尺度的特征提取,输入到RPN head。

在后面SECOND的官方github,以及在OpenPCdet常用的backbone2D,简化为了下面的结构:
配置文件为:
LAYER_NUMS: [5, 5]
LAYER_STRIDES: [1, 2]
NUM_FILTERS: [128, 256]
UPSAMPLE_STRIDES: [1, 2]
NUM_UPSAMPLE_FILTERS: [256, 256]
网络结构为:
6个Conv2D(128,3,1) 经过1个Deconv2D(256,3,1),此时的HxWxC为[188,188,256]
6个Conv2D(256,3,1(2))经过1个Deconv2D(256,3,2),此时的HxWxC为[188,188,256]
concate到一起,HxWxC为[188,188,512]

或者更简单的:
LAYER_NUMS: [5]
LAYER_STRIDES: [1]
NUM_FILTERS: [128]
UPSAMPLE_STRIDES: [2]
NUM_UPSAMPLE_FILTERS: [256]
网络结构为:
6个Conv2D(128,3,1)经过1个Deconv2D(256,3,2),此时HxWxC为[376,376,256]

SECOND RPN

在这里插入图片描述
通过Conv2D得到三个预测头,class、box_size、direction,三个预测头的网络分别为:
Conv2D(512,2x3,1,1),2个anchor,3个class的置信度
Conv2D(512,2x7,1,1),2个anchor,7个box相对于anchor的位置和尺寸的参数,dx,dy,dz,l,w,h,a
Conv2D(512,2x2,1,1),2个anchor,box相对于anchor的角度差

SECOND training

生成anchor:
anchor的尺寸根据不同的类别,分别定义
在feature_map上,每一个位置放置2个正交的anchor
最终得到feature_map_size[0]xfeature_map[_size[1]x2xclass_num个anchors
预测的目标:
把真实的3d box,分配到anchor上
N_anchors,N_groudtruth,进行IOU计算,针对每一个anchor,挑出IOU最大的GT box,设置一个threshold,大于threshold,设为前景,小于一个threshold,设为背景。
前景的anchor,标签为对应的class;
前景的anchor和对应的gt box进行计算,得到所需要的regression的目标。
如果使用direction layer,同样也要计算一个direction的目标。(因为sin(a-b)无法区分0和pi)
预测的结果:
预测得到的结果为,对应每一个anchor,有一个预测的置信度,一个相对于anchor的位置和尺寸的偏移量,一个相对于anchor的角度的偏移量

在这里插入图片描述
loss的计算:
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

SECOND inference

得到预测的结果后,可以通过anchor的信息对预测的结果进行解码,得到对应的boxes。
针对每一种class,设定一定的阈值,对结果继续筛选。
然后进行NMS,极大值抑制的循环筛选,留下最终的boxes。

Anchorbased和Anchorfree的对比

anchorbased的detector的问题

  1. 在二维平面,目标物体的框都是和图像坐标对齐的,也和布置的anchor的坐标系对齐,因此很有效果。但是到了3D的空间,目标物的框的方向是任意的,anchor数量太少,无法完全覆盖。anchor数量太多,非常消耗计算资源。当前anchorbase的网络一般放置2个垂直的anchor。(0,90°)
  2. anchor的选取,anchor的尺寸如何定义,一般都是根据数据集中进行统计得到的,如果新增种类,size也得重新挑选,调试得到一个比较理想的值。
  3. 每个位置两个anchor,也会带来很多的overlapped的boxes,需要NMS对预测的结果进行后处理,在部署端,进行推理运算的时候,需要NMS(非极大值抑制)进行后处理,也非常耗费计算资源。

centerbased的detector优势

CenterNet是19年提出的anchor free的2D的目标检测方法,他不用anchor进行目标检测,而是直接预测目标的中心,以及box的尺寸。具体来说:
针对每种类别,分别预测一个热点图,通过挑选热点图的peak,可以得到物体初步的中心位置。
通过regression,预测得到box的尺寸,以及中心点相对于grid中心的偏移量。
在这里插入图片描述
随后这个2D detector被引入到3D目标检测的模型中。(CenterPoint,AFDet),在各大排行榜上都比较靠前。

  1. 无需设定anchor,因此也无需特殊选定anchor的size和方向。
  2. 针对同一个类别,同一个位置只有一个positive的结果,因此不需要耗时的NMS去过滤overlap的boxes.

在这里插入图片描述

CenterPoint backbone 2d

Centerpoint在前面的点云编码和bev backbone上没有太多的创新,基本沿用了SECOND/point pillar的思路。
github repo上的2d backbone
配置文件为:
LAYER_NUMS: [5, 5]
LAYER_STRIDES: [1, 2]
NUM_FILTERS: [128, 256]
UPSAMPLE_STRIDES: [1, 2]
NUM_UPSAMPLE_FILTERS: [256, 256]
网络结构为:
6个Conv2D(128,3,1) 经过1个Deconv2D(256,3,1),此时的HxWxC为[188,188,256]
6个Conv2D(256,3,1(2))经过1个Deconv2D(256,3,2),此时的HxWxC为[188,188,256]
concate到一起,HxWxC为[188,188,512]

CenterPoint RPN

Centerpoint的创新点在于其RPN head,引入了anchor free center based的detector(参考了CenterNet)。
在训练的时候,直接预测如下参数:

  1. 每种类别,预测center heatmap
  2. 预测box的位置:dx,dy,用以修正中心点的位置
  3. 预测box的高度:z
  4. 预测box的尺寸:w,h,l
  5. 预测box的方向:sin(a),cos(a),直接确定heading

paper和github repo上具体的网络结构:
共享卷积层:Conv2D(512,64,3,1) + BN +ReLu
检测头:common_heads={‘hm’:(3,2),‘reg’: (2, 2), ‘height’: (1, 2), ‘dim’:(3, 2), ‘rot’:(2, 2)} # (output_channel, num_conv)
以’hm’检测头为例:Conv2D(64,64,3,1)+BN+ReLu+Conv2D(64,3,3,1)

CenterPoint training

同anchor based的方法,最关键的一步也是如何定义目标。
hm的目标,是把真实的box,通过高斯图投射到map_view中。

CenterPoint loss

分类的loss, Focal loss
regression的loss, L1 loss

CenterPoint inference

根据hm和box的预测结果,进行解码,输出,box和class.
根据CenterNet的论文,直接使用max pooling就可以确定某一个目标,而不需要nms来处理。但是在bev视角下的feature map下,物体都很小,可能多个物体在同一个grid下,因此centerpoint并没有使用maxpooling,还是用到了nms的方法。

CenterPoint ++

在21年,CenterPoint 进行升级,在稍微提高原有的精度的前提下,进一步的提高了推理速度,主要引入了如下的变化点:

IOU aware

参照CIA-SSD的方法。
在单阶段的模型中,localization accuracy and classification con
fidence是两个检测头,分别训练和预测,相互之间并没有直接联系,但是事实上和GT box的IOU越大的box,classification的confidence应该也越高。考虑到这一点,可以增加一个IOU预测头。
在训练阶段,去检测box和gt的iou大小作为target,计算l1 loss,并修正IOU的预测模型。
在推理的时候,得到这个IOU,并把这个IOU去加载到classification confidence上,提高预测的准确度。

在这里插入图片描述

temporal multi frame input

参照nuScene提出的方法。
在当前点云的特征上,增加时间维度;把前几帧的点云,通过坐标变换,也变换到当前点云的坐标系下。
点云更密,有历史信息,但是计算会更加耗时,也需要多帧输入的准确的坐标变换关系。

hard voxelization-> dynamic voxelization

hard voxelization,最先由voxelnet提出,设定了voxel的总数量,以及voxel内点的上限要求,因此在做voxelization的时候,会丢弃掉voxel内点的信息,而且也会丢掉部分的voxel,降低一定的精度。
在这里插入图片描述
参考MVF提出的 dynamic voxelization的概念,在GPU上,实现dynamic voxelization。可以保留完整的点的信息,保证了一定的精度。同时CenterPoint在GPU上实现了dynamic voxelization,相对于其在CPU上的实现,大大提高了速度。50ms->2ms

two stage

针对第一阶段预测得到的box,提取box的四个面的中心点和box的中心点。
然后从featue map上提取这5个点的特征,并叠加到一起,送入第二个stage。
第二个stage,通过MLP,预测box的confidence和localization。作为对第一个阶段结果的refinment.

各个模块对精度的影响,作者做了对比实验。
在这里插入图片描述

详细的代码:
backbone_2d

'''
输入backbone 2d的类型,model_cfg.BACKBONE_2D
输入input_channels,model_info_dict['num_bev_features HxWxC,上一层网络的输出]
输出 backbone_2d_module 的模型
输出model_info_dict,增加了经过bev backbone得到的特征
'''
def build_backbone_2d(self, model_info_dict):
        if self.model_cfg.get('BACKBONE_2D', None) is None:
            return None, model_info_dict

        backbone_2d_module = backbones_2d.__all__[self.model_cfg.BACKBONE_2D.NAME](
            model_cfg=self.model_cfg.BACKBONE_2D,
            input_channels=model_info_dict['num_bev_features']
        )
        model_info_dict['module_list'].append(backbone_2d_module)
        model_info_dict['num_bev_features'] = backbone_2d_module.num_bev_features
        return backbone_2d_module, model_info_dict

BaseBEVBackbone

'''
init,中初始化2D的模型
forward,导入模型,导入上一层的特征,计算得到本层的特征
'''
class BaseBEVBackbone(nn.Module):
    def __init__(self, model_cfg, input_channels):
        super().__init__()
        self.model_cfg = model_cfg
		#读取2dbackbone的基本参数
        if self.model_cfg.get('LAYER_NUMS', None) is not None:
            assert len(self.model_cfg.LAYER_NUMS) == len(self.model_cfg.LAYER_STRIDES) == len(self.model_cfg.NUM_FILTERS)
            layer_nums = self.model_cfg.LAYER_NUMS
            layer_strides = self.model_cfg.LAYER_STRIDES
            num_filters = self.model_cfg.NUM_FILTERS
        else:
            layer_nums = layer_strides = num_filters = []
		#读取上采样的参数
        if self.model_cfg.get('UPSAMPLE_STRIDES', None) is not None:
            assert len(self.model_cfg.UPSAMPLE_STRIDES) == len(self.model_cfg.NUM_UPSAMPLE_FILTERS)
            num_upsample_filters = self.model_cfg.NUM_UPSAMPLE_FILTERS
            upsample_strides = self.model_cfg.UPSAMPLE_STRIDES
        else:
            upsample_strides = num_upsample_filters = []

        num_levels = len(layer_nums)
        c_in_list = [input_channels, *num_filters[:-1]]
        self.blocks = nn.ModuleList()
        self.deblocks = nn.ModuleList()
        for idx in range(num_levels):
            cur_layers = [
                nn.ZeroPad2d(1),
                nn.Conv2d(
                    c_in_list[idx], num_filters[idx], kernel_size=3,
                    stride=layer_strides[idx], padding=0, bias=False
                ),
                nn.BatchNorm2d(num_filters[idx], eps=1e-3, momentum=0.01),
                nn.ReLU()
            ]
            for k in range(layer_nums[idx]):
                cur_layers.extend([
                    nn.Conv2d(num_filters[idx], num_filters[idx], kernel_size=3, padding=1, bias=False),
                    nn.BatchNorm2d(num_filters[idx], eps=1e-3, momentum=0.01),
                    nn.ReLU()
                ])
            self.blocks.append(nn.Sequential(*cur_layers))
            if len(upsample_strides) > 0:
                stride = upsample_strides[idx]
                if stride >= 1:
                    self.deblocks.append(nn.Sequential(
                        nn.ConvTranspose2d(
                            num_filters[idx], num_upsample_filters[idx],
                            upsample_strides[idx],
                            stride=upsample_strides[idx], bias=False
                        ),
                        nn.BatchNorm2d(num_upsample_filters[idx], eps=1e-3, momentum=0.01),
                        nn.ReLU()
                    ))
                else:
                    stride = np.round(1 / stride).astype(np.int)
                    self.deblocks.append(nn.Sequential(
                        nn.Conv2d(
                            num_filters[idx], num_upsample_filters[idx],
                            stride,
                            stride=stride, bias=False
                        ),
                        nn.BatchNorm2d(num_upsample_filters[idx], eps=1e-3, momentum=0.01),
                        nn.ReLU()
                    ))

        c_in = sum(num_upsample_filters)
        if len(upsample_strides) > num_levels:
            self.deblocks.append(nn.Sequential(
                nn.ConvTranspose2d(c_in, c_in, upsample_strides[-1], stride=upsample_strides[-1], bias=False),
                nn.BatchNorm2d(c_in, eps=1e-3, momentum=0.01),
                nn.ReLU(),
            ))

        self.num_bev_features = c_in

    def forward(self, data_dict):
        """
        Args:
            data_dict:
                spatial_features
        Returns:
        """
        spatial_features = data_dict['spatial_features']
        ups = []
        ret_dict = {}
        x = spatial_features
        for i in range(len(self.blocks)):
            x = self.blocks[i](x)

            stride = int(spatial_features.shape[2] / x.shape[2])
            ret_dict['spatial_features_%dx' % stride] = x
            if len(self.deblocks) > 0:
                ups.append(self.deblocks[i](x))
            else:
                ups.append(x)

        if len(ups) > 1:
            x = torch.cat(ups, dim=1)
        elif len(ups) == 1:
            x = ups[0]

        if len(self.deblocks) > len(self.blocks):
            x = self.deblocks[-1](x)

        data_dict['spatial_features_2d'] = x

        return data_dict

anchor_head_single

class AnchorHeadSingle(AnchorHeadTemplate):
    def __init__(self, model_cfg, input_channels, num_class, class_names, grid_size, point_cloud_range,
                 predict_boxes_when_training=True, **kwargs):
        super().__init__(
            model_cfg=model_cfg, num_class=num_class, class_names=class_names, grid_size=grid_size, point_cloud_range=point_cloud_range,
            predict_boxes_when_training=predict_boxes_when_training
        )

        self.num_anchors_per_location = sum(self.num_anchors_per_location)
		#定义classification_head,输出每个位置的,anchor(x)xclass
        self.conv_cls = nn.Conv2d(
            input_channels, self.num_anchors_per_location * self.num_class,
            kernel_size=1
        )
        #定义regression_head,输出每个位置的,anchor(x)xbox尺寸的数量
        #比如每个位置两个正交的anchor, 预测的box和anchor的相对位置关系
        self.conv_box = nn.Conv2d(
            input_channels, self.num_anchors_per_location * self.box_coder.code_size,
            kernel_size=1
        )
		#定义direction_head,输出每个位置的,anchor(x)x角度编码
        if self.model_cfg.get('USE_DIRECTION_CLASSIFIER', None) is not None:
            self.conv_dir_cls = nn.Conv2d(
                input_channels,
                self.num_anchors_per_location * self.model_cfg.NUM_DIR_BINS,
                kernel_size=1
            )
        else:
            self.conv_dir_cls = None
        self.init_weights()

    def init_weights(self):
        pi = 0.01
        nn.init.constant_(self.conv_cls.bias, -np.log((1 - pi) / pi))
        nn.init.normal_(self.conv_box.weight, mean=0, std=0.001)

    def forward(self, data_dict):
    	#推理计算,预测得到结果
        spatial_features_2d = data_dict['spatial_features_2d']

        cls_preds = self.conv_cls(spatial_features_2d)
        box_preds = self.conv_box(spatial_features_2d)

        cls_preds = cls_preds.permute(0, 2, 3, 1).contiguous()  # [N, H, W, C]
        box_preds = box_preds.permute(0, 2, 3, 1).contiguous()  # [N, H, W, C]

        self.forward_ret_dict['cls_preds'] = cls_preds
        self.forward_ret_dict['box_preds'] = box_preds

        if self.conv_dir_cls is not None:
            dir_cls_preds = self.conv_dir_cls(spatial_features_2d)
            dir_cls_preds = dir_cls_preds.permute(0, 2, 3, 1).contiguous()
            self.forward_ret_dict['dir_cls_preds'] = dir_cls_preds
        else:
            dir_cls_preds = None
		#如果训练,计算目标
        if self.training:
            targets_dict = self.assign_targets(
                gt_boxes=data_dict['gt_boxes']
            )
            self.forward_ret_dict.update(targets_dict)
		#如果没训练,但是需要得到box,那就解码预测的结果,得到box的结果
        if not self.training or self.predict_boxes_when_training:
            batch_cls_preds, batch_box_preds = self.generate_predicted_boxes(
                batch_size=data_dict['batch_size'],
                cls_preds=cls_preds, box_preds=box_preds, dir_cls_preds=dir_cls_preds
            )
            data_dict['batch_cls_preds'] = batch_cls_preds
            data_dict['batch_box_preds'] = batch_box_preds
            data_dict['cls_preds_normalized'] = False

        return data_dict

anchor_head_template

class AnchorHeadTemplate(nn.Module):
    def __init__(self, model_cfg, num_class, class_names, grid_size, point_cloud_range, predict_boxes_when_training):
        super().__init__()
        self.model_cfg = model_cfg
        self.num_class = num_class
        self.class_names = class_names
        self.predict_boxes_when_training = predict_boxes_when_training
        self.use_multihead = self.model_cfg.get('USE_MULTIHEAD', False)
        
        anchor_target_cfg = self.model_cfg.TARGET_ASSIGNER_CONFIG
        self.box_coder = getattr(box_coder_utils, anchor_target_cfg.BOX_CODER)(
            num_dir_bins=anchor_target_cfg.get('NUM_DIR_BINS', 6),
            **anchor_target_cfg.get('BOX_CODER_CONFIG', {})
        )
        #生成anchors
        anchor_generator_cfg = self.model_cfg.ANCHOR_GENERATOR_CONFIG
        anchors, self.num_anchors_per_location = self.generate_anchors(
            anchor_generator_cfg, grid_size=grid_size, point_cloud_range=point_cloud_range,
            anchor_ndim=self.box_coder.code_size
        )
        self.anchors = [x.cuda() for x in anchors]
        #分配anchor和gtbox,得到预测的目标,class和box(box和anchor的偏移)
        self.target_assigner = self.get_target_assigner(anchor_target_cfg)

        self.forward_ret_dict = {}
        self.build_losses(self.model_cfg.LOSS_CONFIG)

    @staticmethod
    def generate_anchors(anchor_generator_cfg, grid_size, point_cloud_range, anchor_ndim=7):
        anchor_generator = AnchorGenerator(
            anchor_range=point_cloud_range,
            anchor_generator_config=anchor_generator_cfg
        )
        feature_map_size = [grid_size[:2] // config['feature_map_stride'] for config in anchor_generator_cfg]
        anchors_list, num_anchors_per_location_list = anchor_generator.generate_anchors(feature_map_size)

        if anchor_ndim != 7:
            for idx, anchors in enumerate(anchors_list):
                pad_zeros = anchors.new_zeros([*anchors.shape[0:-1], anchor_ndim - 7])
                new_anchors = torch.cat((anchors, pad_zeros), dim=-1)
                anchors_list[idx] = new_anchors

        return anchors_list, num_anchors_per_location_list

    def get_target_assigner(self, anchor_target_cfg):
        if anchor_target_cfg.NAME == 'ATSS':
            target_assigner = ATSSTargetAssigner(
                topk=anchor_target_cfg.TOPK,
                box_coder=self.box_coder,
                use_multihead=self.use_multihead,
                match_height=anchor_target_cfg.MATCH_HEIGHT
            )
        elif anchor_target_cfg.NAME == 'AxisAlignedTargetAssigner':
            target_assigner = AxisAlignedTargetAssigner(
                model_cfg=self.model_cfg,
                class_names=self.class_names,
                box_coder=self.box_coder,
                match_height=anchor_target_cfg.MATCH_HEIGHT
            )
        else:
            raise NotImplementedError
        return target_assigner

    def build_losses(self, losses_cfg):
        self.add_module(
            'cls_loss_func',
            loss_utils.SigmoidFocalClassificationLoss(alpha=0.25, gamma=2.0)
        )
        reg_loss_name = 'WeightedSmoothL1Loss' if losses_cfg.get('REG_LOSS_TYPE', None) is None \
            else losses_cfg.REG_LOSS_TYPE
        self.add_module(
            'reg_loss_func',
            getattr(loss_utils, reg_loss_name)(code_weights=losses_cfg.LOSS_WEIGHTS['code_weights'])
        )
        self.add_module(
            'dir_loss_func',
            loss_utils.WeightedCrossEntropyLoss()
        )

    def assign_targets(self, gt_boxes):
        """
        Args:
            gt_boxes: (B, M, 8)
        Returns:

        """
        targets_dict = self.target_assigner.assign_targets(
            self.anchors, gt_boxes
        )
        return targets_dict

    def get_cls_layer_loss(self):
        cls_preds = self.forward_ret_dict['cls_preds']
        box_cls_labels = self.forward_ret_dict['box_cls_labels']
        batch_size = int(cls_preds.shape[0])
        cared = box_cls_labels >= 0  # [N, num_anchors]
        positives = box_cls_labels > 0
        negatives = box_cls_labels == 0
        negative_cls_weights = negatives * 1.0
        cls_weights = (negative_cls_weights + 1.0 * positives).float()
        reg_weights = positives.float()
        if self.num_class == 1:
            # class agnostic
            box_cls_labels[positives] = 1

        pos_normalizer = positives.sum(1, keepdim=True).float()
        reg_weights /= torch.clamp(pos_normalizer, min=1.0)
        cls_weights /= torch.clamp(pos_normalizer, min=1.0)
        cls_targets = box_cls_labels * cared.type_as(box_cls_labels)
        cls_targets = cls_targets.unsqueeze(dim=-1)

        cls_targets = cls_targets.squeeze(dim=-1)
        one_hot_targets = torch.zeros(
            *list(cls_targets.shape), self.num_class + 1, dtype=cls_preds.dtype, device=cls_targets.device
        )
        one_hot_targets.scatter_(-1, cls_targets.unsqueeze(dim=-1).long(), 1.0)
        cls_preds = cls_preds.view(batch_size, -1, self.num_class)
        one_hot_targets = one_hot_targets[..., 1:]
        cls_loss_src = self.cls_loss_func(cls_preds, one_hot_targets, weights=cls_weights)  # [N, M]
        cls_loss = cls_loss_src.sum() / batch_size

        cls_loss = cls_loss * self.model_cfg.LOSS_CONFIG.LOSS_WEIGHTS['cls_weight']
        tb_dict = {
            'rpn_loss_cls': cls_loss.item()
        }
        return cls_loss, tb_dict

    @staticmethod
    def add_sin_difference(boxes1, boxes2, dim=6):
        assert dim != -1
        rad_pred_encoding = torch.sin(boxes1[..., dim:dim + 1]) * torch.cos(boxes2[..., dim:dim + 1])
        rad_tg_encoding = torch.cos(boxes1[..., dim:dim + 1]) * torch.sin(boxes2[..., dim:dim + 1])
        boxes1 = torch.cat([boxes1[..., :dim], rad_pred_encoding, boxes1[..., dim + 1:]], dim=-1)
        boxes2 = torch.cat([boxes2[..., :dim], rad_tg_encoding, boxes2[..., dim + 1:]], dim=-1)
        return boxes1, boxes2

    @staticmethod
    def get_direction_target(anchors, reg_targets, one_hot=True, dir_offset=0, num_bins=2):
        batch_size = reg_targets.shape[0]
        anchors = anchors.view(batch_size, -1, anchors.shape[-1])
        rot_gt = reg_targets[..., 6] + anchors[..., 6]
        offset_rot = common_utils.limit_period(rot_gt - dir_offset, 0, 2 * np.pi)
        dir_cls_targets = torch.floor(offset_rot / (2 * np.pi / num_bins)).long()
        dir_cls_targets = torch.clamp(dir_cls_targets, min=0, max=num_bins - 1)

        if one_hot:
            dir_targets = torch.zeros(*list(dir_cls_targets.shape), num_bins, dtype=anchors.dtype,
                                      device=dir_cls_targets.device)
            dir_targets.scatter_(-1, dir_cls_targets.unsqueeze(dim=-1).long(), 1.0)
            dir_cls_targets = dir_targets
        return dir_cls_targets

    def get_box_reg_layer_loss(self):
        box_preds = self.forward_ret_dict['box_preds']
        box_dir_cls_preds = self.forward_ret_dict.get('dir_cls_preds', None)
        box_reg_targets = self.forward_ret_dict['box_reg_targets']
        box_cls_labels = self.forward_ret_dict['box_cls_labels']
        batch_size = int(box_preds.shape[0])

        positives = box_cls_labels > 0
        reg_weights = positives.float()
        pos_normalizer = positives.sum(1, keepdim=True).float()
        reg_weights /= torch.clamp(pos_normalizer, min=1.0)

        if isinstance(self.anchors, list):
            if self.use_multihead:
                anchors = torch.cat(
                    [anchor.permute(3, 4, 0, 1, 2, 5).contiguous().view(-1, anchor.shape[-1]) for anchor in
                     self.anchors], dim=0)
            else:
                anchors = torch.cat(self.anchors, dim=-3)
        else:
            anchors = self.anchors
        anchors = anchors.view(1, -1, anchors.shape[-1]).repeat(batch_size, 1, 1)
        box_preds = box_preds.view(batch_size, -1,
                                   box_preds.shape[-1] // self.num_anchors_per_location if not self.use_multihead else
                                   box_preds.shape[-1])
        # sin(a - b) = sinacosb-cosasinb
        box_preds_sin, reg_targets_sin = self.add_sin_difference(box_preds, box_reg_targets)
        loc_loss_src = self.reg_loss_func(box_preds_sin, reg_targets_sin, weights=reg_weights)  # [N, M]
        loc_loss = loc_loss_src.sum() / batch_size

        loc_loss = loc_loss * self.model_cfg.LOSS_CONFIG.LOSS_WEIGHTS['loc_weight']
        box_loss = loc_loss
        tb_dict = {
            'rpn_loss_loc': loc_loss.item()
        }

        if box_dir_cls_preds is not None:
            dir_targets = self.get_direction_target(
                anchors, box_reg_targets,
                dir_offset=self.model_cfg.DIR_OFFSET,
                num_bins=self.model_cfg.NUM_DIR_BINS
            )

            dir_logits = box_dir_cls_preds.view(batch_size, -1, self.model_cfg.NUM_DIR_BINS)
            weights = positives.type_as(dir_logits)
            weights /= torch.clamp(weights.sum(-1, keepdim=True), min=1.0)
            dir_loss = self.dir_loss_func(dir_logits, dir_targets, weights=weights)
            dir_loss = dir_loss.sum() / batch_size
            dir_loss = dir_loss * self.model_cfg.LOSS_CONFIG.LOSS_WEIGHTS['dir_weight']
            box_loss += dir_loss
            tb_dict['rpn_loss_dir'] = dir_loss.item()

        return box_loss, tb_dict

    def get_loss(self):
        cls_loss, tb_dict = self.get_cls_layer_loss()
        box_loss, tb_dict_box = self.get_box_reg_layer_loss()
        tb_dict.update(tb_dict_box)
        rpn_loss = cls_loss + box_loss

        tb_dict['rpn_loss'] = rpn_loss.item()
        return rpn_loss, tb_dict

    def generate_predicted_boxes(self, batch_size, cls_preds, box_preds, dir_cls_preds=None):
        """
        Args:
            batch_size:
            cls_preds: (N, H, W, C1)
            box_preds: (N, H, W, C2)
            dir_cls_preds: (N, H, W, C3)

        Returns:
            batch_cls_preds: (B, num_boxes, num_classes)
            batch_box_preds: (B, num_boxes, 7+C)

        """
        if isinstance(self.anchors, list):
            if self.use_multihead:
                anchors = torch.cat([anchor.permute(3, 4, 0, 1, 2, 5).contiguous().view(-1, anchor.shape[-1])
                                     for anchor in self.anchors], dim=0)
            else:
                anchors = torch.cat(self.anchors, dim=-3)
        else:
            anchors = self.anchors
        num_anchors = anchors.view(-1, anchors.shape[-1]).shape[0]
        batch_anchors = anchors.view(1, -1, anchors.shape[-1]).repeat(batch_size, 1, 1)
        batch_cls_preds = cls_preds.view(batch_size, num_anchors, -1).float() \
            if not isinstance(cls_preds, list) else cls_preds
        batch_box_preds = box_preds.view(batch_size, num_anchors, -1) if not isinstance(box_preds, list) \
            else torch.cat(box_preds, dim=1).view(batch_size, num_anchors, -1)
        batch_box_preds = self.box_coder.decode_torch(batch_box_preds, batch_anchors)

        if dir_cls_preds is not None:
            dir_offset = self.model_cfg.DIR_OFFSET
            dir_limit_offset = self.model_cfg.DIR_LIMIT_OFFSET
            dir_cls_preds = dir_cls_preds.view(batch_size, num_anchors, -1) if not isinstance(dir_cls_preds, list) \
                else torch.cat(dir_cls_preds, dim=1).view(batch_size, num_anchors, -1)
            dir_labels = torch.max(dir_cls_preds, dim=-1)[1]

            period = (2 * np.pi / self.model_cfg.NUM_DIR_BINS)
            dir_rot = common_utils.limit_period(
                batch_box_preds[..., 6] - dir_offset, dir_limit_offset, period
            )
            batch_box_preds[..., 6] = dir_rot + dir_offset + period * dir_labels.to(batch_box_preds.dtype)

        if isinstance(self.box_coder, box_coder_utils.PreviousResidualDecoder):
            batch_box_preds[..., 6] = common_utils.limit_period(
                -(batch_box_preds[..., 6] + np.pi / 2), offset=0.5, period=np.pi * 2
            )

        return batch_cls_preds, batch_box_preds

    def forward(self, **kwargs):
        raise NotImplementedError

CenterHead in OpenPCDet

class CenterHead(nn.Module):
    def __init__(self, model_cfg, input_channels, num_class, class_names, grid_size, point_cloud_range,
                 predict_boxes_when_training=True):
        super().__init__()
        self.model_cfg = model_cfg
        self.num_class = num_class
        self.class_names = [class_names]
        self.predict_boxes_when_training = predict_boxes_when_training
        self.use_multihead = self.model_cfg.get('USE_MULTIHEAD', False)

        target_cfg = self.model_cfg.TARGET_ASSIGNER_CONFIG

        self.target_cfg = target_cfg 
        self.grid_size = grid_size
        self.point_cloud_range = point_cloud_range

        self.forward_ret_dict = {}
		#相对于原paper,用了非常简单的检测头
        self.conv_cls = nn.Conv2d(
            input_channels, self.num_class,
            kernel_size=1
        )
        self.conv_box = nn.Conv2d(
            input_channels, 8,
            kernel_size=1
        )

        self.loss_cls = GaussianFocalLoss(reduction='mean')

        self.init_weights()

    def init_weights(self):
        pi = 0.01
        nn.init.constant_(self.conv_cls.bias, -np.log((1 - pi) / pi))
        nn.init.normal_(self.conv_box.weight, mean=0, std=0.001)

    def forward(self, data_dict):
        spatial_features_2d = data_dict['spatial_features_2d']

        cls_preds = self.conv_cls(spatial_features_2d)
        box_preds = self.conv_box(spatial_features_2d)

        cls_preds = cls_preds.permute(0, 2, 3, 1).contiguous()  # [N, H, W, C]
        box_preds = box_preds.permute(0, 2, 3, 1).contiguous()  # [N, H, W, C]

        self.forward_ret_dict['cls_preds'] = cls_preds
        self.forward_ret_dict['box_preds'] = box_preds
		#如果训练,获得预测的目标
        if self.training:
            targets_dict = self.assign_targets(
                gt_boxes=data_dict['gt_boxes']
            )
            self.forward_ret_dict.update(targets_dict)
		#如果推理或者训练时要预测box,解码box
        if not self.training or self.predict_boxes_when_training:
            batch_cls_preds, batch_box_preds = self.generate_predicted_boxes(
                batch_size=data_dict['batch_size'],
                cls_preds=cls_preds, box_preds=box_preds, dir_cls_preds=None
            )
            data_dict['batch_cls_preds'] = batch_cls_preds
            data_dict['batch_box_preds'] = batch_box_preds
            data_dict['cls_preds_normalized'] = False

        return data_dict

    def _gather_feat(self, feat, ind, mask=None):
        """Gather feature map.

        Given feature map and index, return indexed feature map.

        Args:
            feat (torch.tensor): Feature map with the shape of [B, H*W, 10].
            ind (torch.Tensor): Index of the ground truth boxes with the
                shape of [B, max_obj].
            mask (torch.Tensor): Mask of the feature map with the shape
                of [B, max_obj]. Default: None.

        Returns:
            torch.Tensor: Feature map after gathering with the shape
                of [B, max_obj, 10].
        """
        dim = feat.size(2)
        ind = ind.unsqueeze(2).expand(ind.size(0), ind.size(1), dim)
        feat = feat.gather(1, ind)
        if mask is not None:
            mask = mask.unsqueeze(2).expand_as(feat)
            feat = feat[mask]
            feat = feat.view(-1, dim)
        return feat

    def assign_targets(self, gt_boxes):
        """Generate targets.

        Args:
            gt_boxes: (B, M, 8) box + cls 

        Returns:
            Returns:
                tuple
]: Tuple of target including \ the following results in order. - list[torch.Tensor]: Heatmap scores. - list[torch.Tensor]: Ground truth boxes. - list[torch.Tensor]: Indexes indicating the \ position of the valid boxes. - list[torch.Tensor]: Masks indicating which \ boxes are valid. """
gt_bboxes_3d, gt_labels_3d = gt_boxes[..., :-1], gt_boxes[..., -1] heatmaps, anno_boxes, inds, masks = multi_apply( self.get_targets_single, gt_bboxes_3d, gt_labels_3d) # transpose heatmaps, because the dimension of tensors in each task is # different, we have to use numpy instead of torch to do the transpose. heatmaps = np.array(heatmaps).transpose(1, 0).tolist() heatmaps = [torch.stack(hms_) for hms_ in heatmaps] # transpose anno_boxes anno_boxes = np.array(anno_boxes).transpose(1, 0).tolist() anno_boxes = [torch.stack(anno_boxes_) for anno_boxes_ in anno_boxes] # transpose inds inds = np.array(inds).transpose(1, 0).tolist() inds = [torch.stack(inds_) for inds_ in inds] # transpose inds masks = np.array(masks).transpose(1, 0).tolist() masks = [torch.stack(masks_) for masks_ in masks] all_targets_dict = { 'heatmaps': heatmaps, 'anno_boxes': anno_boxes, 'inds': inds, 'masks': masks } return all_targets_dict def get_targets_single(self, gt_bboxes_3d, gt_labels_3d): """Generate training targets for a single sample. Args: gt_bboxes_3d (:obj:`LiDARInstance3DBoxes`): Ground truth gt boxes. gt_labels_3d (torch.Tensor): Labels of boxes. Returns: tuple
]: Tuple of target including \ the following results in order. - list[torch.Tensor]: Heatmap scores. - list[torch.Tensor]: Ground truth boxes. - list[torch.Tensor]: Indexes indicating the position \ of the valid boxes. - list[torch.Tensor]: Masks indicating which boxes \ are valid. """
device = gt_labels_3d.device """gt_bboxes_3d = torch.cat( (gt_bboxes_3d.gravity_center, gt_bboxes_3d.tensor[:, 3:]), dim=1).to(device) """ max_objs = self.target_cfg.MAX_OBJS grid_size = torch.tensor(self.grid_size) pc_range = torch.tensor(self.point_cloud_range) voxel_size = torch.tensor(self.target_cfg.VOXEL_SIZE) feature_map_size = grid_size[:2] // self.target_cfg.OUT_SIZE_FACTOR """ # reorganize the gt_dict by tasks task_masks = [] flag = 0 for class_name in self.class_names: print(gt_labels_3d) task_masks.append([ torch.where(gt_labels_3d == class_name.index(i) + flag) for i in class_name ]) flag += len(class_name) task_boxes = [] task_classes = [] flag2 = 0 for idx, mask in enumerate(task_masks): task_box = [] task_class = [] for m in mask: task_box.append(gt_bboxes_3d[m]) # 0 is background for each task, so we need to add 1 here. task_class.append(gt_labels_3d[m] - flag2) task_boxes.append(torch.cat(task_box, axis=0).to(device)) task_classes.append(torch.cat(task_class).long().to(device)) flag2 += len(mask) """ task_boxes = [gt_bboxes_3d] task_classes = [gt_labels_3d] draw_gaussian = draw_heatmap_gaussian heatmaps, anno_boxes, inds, masks = [], [], [], [] for idx in range(1): heatmap = gt_bboxes_3d.new_zeros( (len(self.class_names[idx]), feature_map_size[1], feature_map_size[0])) anno_box = gt_bboxes_3d.new_zeros((max_objs, 8), dtype=torch.float32) ind = gt_labels_3d.new_zeros((max_objs), dtype=torch.int64) mask = gt_bboxes_3d.new_zeros((max_objs), dtype=torch.uint8) num_objs = min(task_boxes[idx].shape[0], max_objs) for k in range(num_objs): cls_id = (task_classes[idx][k] - 1).int() width = task_boxes[idx][k][3] length = task_boxes[idx][k][4] width = width / voxel_size[0] / self.target_cfg.OUT_SIZE_FACTOR length = length / voxel_size[1] / self.target_cfg.OUT_SIZE_FACTOR if width > 0 and length > 0: radius = gaussian_radius( (length, width), min_overlap=self.target_cfg.GAUSSIAN_OVERLAP) radius = max(self.target_cfg.MIN_RADIUS, int(radius)) # be really careful for the coordinate system of # your box annotation. x, y, z = task_boxes[idx][k][0], task_boxes[idx][k][ 1], task_boxes[idx][k][2] coor_x = ( x - pc_range[0] ) / voxel_size[0] / self.target_cfg.OUT_SIZE_FACTOR coor_y = ( y - pc_range[1] ) / voxel_size[1] / self.target_cfg.OUT_SIZE_FACTOR center = torch.tensor([coor_x, coor_y], dtype=torch.float32, device=device) center_int = center.to(torch.int32) # throw out not in range objects to avoid out of array # area when creating the heatmap if not (0 <= center_int[0] < feature_map_size[0] and 0 <= center_int[1] < feature_map_size[1]): continue draw_gaussian(heatmap[cls_id], center_int, radius) new_idx = k x, y = center_int[0], center_int[1] assert (y * feature_map_size[0] + x < feature_map_size[0] * feature_map_size[1]) ind[new_idx] = y * feature_map_size[0] + x mask[new_idx] = 1 rot = task_boxes[idx][k][6] box_dim = task_boxes[idx][k][3:6] box_dim = box_dim.log() anno_box[new_idx] = torch.cat([ center - torch.tensor([x, y], device=device), z.unsqueeze(0), box_dim, torch.sin(rot).unsqueeze(0), torch.cos(rot).unsqueeze(0), ]) heatmaps.append(heatmap) anno_boxes.append(anno_box) masks.append(mask) inds.append(ind) return heatmaps, anno_boxes, inds, masks def generate_predicted_boxes(self, batch_size, cls_preds, box_preds, dir_cls_preds=None): """ Args: batch_size: cls_preds: (N, H, W, C1) box_preds: (N, H, W, C2) dir_cls_preds: (N, H, W, C3) Returns: batch_cls_preds: (B, num_boxes, num_classes) batch_box_preds: (B, num_boxes, 7+C) """ batch, H, W, code_size = box_preds.size() box_preds = box_preds.reshape(batch, H*W, code_size) batch_reg = box_preds[..., 0:2] batch_hei = box_preds[..., 2:3] batch_dim = torch.exp(box_preds[..., 3:6]) batch_rots = box_preds[..., 6:7] batch_rotc = box_preds[..., 7:8] ys, xs = torch.meshgrid([torch.arange(0, H), torch.arange(0, W)]) ys = ys.view(1, H, W).repeat(batch, 1, 1).to(cls_preds.device) xs = xs.view(1, H, W).repeat(batch, 1, 1).to(cls_preds.device) xs = xs.view(batch, -1, 1) + batch_reg[:, :, 0:1] ys = ys.view(batch, -1, 1) + batch_reg[:, :, 1:2] xs = xs * self.target_cfg.OUT_SIZE_FACTOR * self.target_cfg.VOXEL_SIZE[0] + self.point_cloud_range[0] ys = ys * self.target_cfg.OUT_SIZE_FACTOR * self.target_cfg.VOXEL_SIZE[1] + self.point_cloud_range[1] rot = torch.atan2(batch_rots, batch_rotc) batch_box_preds = torch.cat([xs, ys, batch_hei, batch_dim, rot], dim=2) batch_cls_preds = cls_preds.view(batch, H*W, -1) return batch_cls_preds, batch_box_preds def get_loss(self): cls_loss, tb_dict = self.get_cls_layer_loss() box_loss, tb_dict_box = self.get_box_reg_layer_loss() tb_dict.update(tb_dict_box) rpn_loss = cls_loss + box_loss tb_dict['rpn_loss'] = rpn_loss.item() return rpn_loss, tb_dict def get_cls_layer_loss(self): # NHWC -> NCHW pred_heatmaps = clip_sigmoid(self.forward_ret_dict['cls_preds']).permute(0, 3, 1, 2) gt_heatmaps = self.forward_ret_dict['heatmaps'][0] num_pos = gt_heatmaps.eq(1).float().sum().item() cls_loss = self.loss_cls( pred_heatmaps, gt_heatmaps, avg_factor=max(num_pos, 1)) cls_loss = cls_loss * self.model_cfg.LOSS_CONFIG.LOSS_WEIGHTS['cls_weight'] tb_dict = { 'rpn_loss_cls': cls_loss.item() } return cls_loss, tb_dict def get_box_reg_layer_loss(self): # Regression loss for dimension, offset, height, rotation target_box, inds, masks = self.forward_ret_dict['anno_boxes'][0], self.forward_ret_dict['inds'][0], self.forward_ret_dict['masks'][0] ind = inds num = masks.float().sum() pred = self.forward_ret_dict['box_preds'] # N x (HxW) x 7 pred = pred.view(pred.size(0), -1, pred.size(3)) pred = self._gather_feat(pred, ind) mask = masks.unsqueeze(2).expand_as(target_box).float() isnotnan = (~torch.isnan(target_box)).float() mask *= isnotnan code_weights = self.model_cfg.LOSS_CONFIG.LOSS_WEIGHTS['code_weights'] bbox_weights = mask * mask.new_tensor(code_weights) loc_loss = l1_loss( pred, target_box, bbox_weights, avg_factor=(num + 1e-4)) loc_loss = loc_loss * self.model_cfg.LOSS_CONFIG.LOSS_WEIGHTS['loc_weight'] box_loss = loc_loss tb_dict = { 'rpn_loss_loc': loc_loss.item() } return box_loss, tb_dict

版权声明:本文为CSDN博主「huang_victor」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/huang_victor/article/details/120778544

huang_victor

我还没有学会写个人说明!

暂无评论

发表评论

相关推荐

玩转KITTI3D目标检测:KITTI评估工具evaluate的使用

近期因实验需要利用kitti数据集,发现关于评估工具使用的部分网上教程不够详细,特此记录. 文末为了方便对数据结果观看,附上了修改代码. 1. KITTI评估工具来源 官网评估工具 下载后文件目录包含: matlab(2D/3D框显示和

深度学习之目标检测YOLOv5

一.简介 YOLOV4出现之后不久,YOLOv5横空出世。YOLOv5在YOLOv4算法的基础上做了进一步的改进,检测性能得到进一步的提升。虽然YOLOv5算法并没有与YOLOv4算法进行性能比较与分析&#xff0