Lidar Object detection

3d object detection的一般的pipeline

在这里插入图片描述
在这里插入图片描述

Anchor based vs Center based RPN

当前比较流行的3d目标检测pipeline,或是通过pillar,对3d点云进行编码,特征提取,压缩到2d,或是通过voxel,3D sparseConv,特征提取,压缩到2d,转化为2d后就可以通过2D的目标检测的方法去实现目标检测的任务。本文详细解读和对比,经典的anchor based的SSD(single short detection)和最近在3d objectection中较为流行的anchor free center based的CenterNet的原理以及在OpenPCDet框架下的具体实现。

如下是截止到2021-10-26前的,NuScenes和Waymo real time 3D object detection challenge的排行榜。比较靠前的,都采用了anchor free的模型。
在这里插入图片描述
在这里插入图片描述

SECOND Voxelization, Voxel Feature Extraction,3D SpconvConv, Map to BEV

在这里插入图片描述
我们以waymo dataset为例,推断从点云,经过网格化、网格特征提取、稀疏3D卷积之后的特征图的维度,
point cloud range : [-75.2, -75.2, -2, 75.2, 75.2, 4]
voxel size: [0.1, 0.1, 0.15]
经过网格化和特征提取之后,维度为[40,1504,1504, 5]
5维的特征分别是:x,y,z,intensity,elongation
经过多层的稀疏3D卷积之后,维度变为[2,188,188, 128]
通过维度变化,压缩到BEV,维度变为[188,188,256]
输入给2D Backbone和RPN的维度也就是 [188, 188, 256] HxWxC

SECOND backbone 2d

backbone2d
SECOND的官方论文的2d backbone如上图,
Conv2D(Cout,k,s)代表Conv2D-BN-ReLU的组合
DeConv2D(Cout,k,s)代表DeConv2D-BN-ReLU的组合
3个 Conv2D(128,3,1(2)) 经过1个Deconv2D(128,3,1)
5个Conv2D(128,3,1(2))经过1个Deconv2D(128,3,2)
5个Conv2D(256,3,1(2))经过1个Deconv2D(128,3,4)
然后叠加到一起,完成不同尺度的特征提取,输入到RPN head。

在后面SECOND的官方github,以及在OpenPCdet常用的backbone2D,简化为了下面的结构:
配置文件为:
LAYER_NUMS: [5, 5]
LAYER_STRIDES: [1, 2]
NUM_FILTERS: [128, 256]
UPSAMPLE_STRIDES: [1, 2]
NUM_UPSAMPLE_FILTERS: [256, 256]
网络结构为:
6个Conv2D(128,3,1) 经过1个Deconv2D(256,3,1),此时的HxWxC为[188,188,256]
6个Conv2D(256,3,1(2))经过1个Deconv2D(256,3,2),此时的HxWxC为[188,188,256]
concate到一起,HxWxC为[188,188,512]

或者更简单的:
LAYER_NUMS: [5]
LAYER_STRIDES: [1]
NUM_FILTERS: [128]
UPSAMPLE_STRIDES: [2]
NUM_UPSAMPLE_FILTERS: [256]
网络结构为:
6个Conv2D(128,3,1)经过1个Deconv2D(256,3,2),此时HxWxC为[376,376,256]

SECOND RPN

在这里插入图片描述
通过Conv2D得到三个预测头,class、box_size、direction,三个预测头的网络分别为:
Conv2D(512,2x3,1,1),2个anchor,3个class的置信度
Conv2D(512,2x7,1,1),2个anchor,7个box相对于anchor的位置和尺寸的参数,dx,dy,dz,l,w,h,a
Conv2D(512,2x2,1,1),2个anchor,box相对于anchor的角度差

SECOND training

生成anchor:
anchor的尺寸根据不同的类别,分别定义
在feature_map上,每一个位置放置2个正交的anchor
最终得到feature_map_size[0]xfeature_map[_size[1]x2xclass_num个anchors
预测的目标:
把真实的3d box,分配到anchor上
N_anchors,N_groudtruth,进行IOU计算,针对每一个anchor,挑出IOU最大的GT box,设置一个threshold,大于threshold,设为前景,小于一个threshold,设为背景。
前景的anchor,标签为对应的class;
前景的anchor和对应的gt box进行计算,得到所需要的regression的目标。
如果使用direction layer,同样也要计算一个direction的目标。(因为sin(a-b)无法区分0和pi)
预测的结果:
预测得到的结果为,对应每一个anchor,有一个预测的置信度,一个相对于anchor的位置和尺寸的偏移量,一个相对于anchor的角度的偏移量

在这里插入图片描述
loss的计算:
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

SECOND inference

得到预测的结果后,可以通过anchor的信息对预测的结果进行解码,得到对应的boxes。
针对每一种class,设定一定的阈值,对结果继续筛选。
然后进行NMS,极大值抑制的循环筛选,留下最终的boxes。

Anchorbased和Anchorfree的对比

anchorbased的detector的问题

  1. 在二维平面,目标物体的框都是和图像坐标对齐的,也和布置的anchor的坐标系对齐,因此很有效果。但是到了3D的空间,目标物的框的方向是任意的,anchor数量太少,无法完全覆盖。anchor数量太多,非常消耗计算资源。当前anchorbase的网络一般放置2个垂直的anchor。(0,90°)
  2. anchor的选取,anchor的尺寸如何定义,一般都是根据数据集中进行统计得到的,如果新增种类,size也得重新挑选,调试得到一个比较理想的值。
  3. 每个位置两个anchor,也会带来很多的overlapped的boxes,需要NMS对预测的结果进行后处理,在部署端,进行推理运算的时候,需要NMS(非极大值抑制)进行后处理,也非常耗费计算资源。

centerbased的detector优势

CenterNet是19年提出的anchor free的2D的目标检测方法,他不用anchor进行目标检测,而是直接预测目标的中心,以及box的尺寸。具体来说:
针对每种类别,分别预测一个热点图,通过挑选热点图的peak,可以得到物体初步的中心位置。
通过regression,预测得到box的尺寸,以及中心点相对于grid中心的偏移量。
在这里插入图片描述
随后这个2D detector被引入到3D目标检测的模型中。(CenterPoint,AFDet),在各大排行榜上都比较靠前。

  1. 无需设定anchor,因此也无需特殊选定anchor的size和方向。
  2. 针对同一个类别,同一个位置只有一个positive的结果,因此不需要耗时的NMS去过滤overlap的boxes.

在这里插入图片描述

CenterPoint backbone 2d

Centerpoint在前面的点云编码和bev backbone上没有太多的创新,基本沿用了SECOND/point pillar的思路。
github repo上的2d backbone
配置文件为:
LAYER_NUMS: [5, 5]
LAYER_STRIDES: [1, 2]
NUM_FILTERS: [128, 256]
UPSAMPLE_STRIDES: [1, 2]
NUM_UPSAMPLE_FILTERS: [256, 256]
网络结构为:
6个Conv2D(128,3,1) 经过1个Deconv2D(256,3,1),此时的HxWxC为[188,188,256]
6个Conv2D(256,3,1(2))经过1个Deconv2D(256,3,2),此时的HxWxC为[188,188,256]
concate到一起,HxWxC为[188,188,512]

CenterPoint RPN

Centerpoint的创新点在于其RPN head,引入了anchor free center based的detector(参考了CenterNet)。
在训练的时候,直接预测如下参数:

  1. 每种类别,预测center heatmap
  2. 预测box的位置:dx,dy,用以修正中心点的位置
  3. 预测box的高度:z
  4. 预测box的尺寸:w,h,l
  5. 预测box的方向:sin(a),cos(a),直接确定heading

paper和github repo上具体的网络结构:
共享卷积层:Conv2D(512,64,3,1) + BN +ReLu
检测头:common_heads={‘hm’:(3,2),‘reg’: (2, 2), ‘height’: (1, 2), ‘dim’:(3, 2), ‘rot’:(2, 2)} # (output_channel, num_conv)
以’hm’检测头为例:Conv2D(64,64,3,1)+BN+ReLu+Conv2D(64,3,3,1)

CenterPoint training

同anchor based的方法,最关键的一步也是如何定义目标。
hm的目标,是把真实的box,通过高斯图投射到map_view中。

CenterPoint loss

分类的loss, Focal loss
regression的loss, L1 loss

CenterPoint inference

根据hm和box的预测结果,进行解码,输出,box和class.
根据CenterNet的论文,直接使用max pooling就可以确定某一个目标,而不需要nms来处理。但是在bev视角下的feature map下,物体都很小,可能多个物体在同一个grid下,因此centerpoint并没有使用maxpooling,还是用到了nms的方法。

CenterPoint ++

在21年,CenterPoint 进行升级,在稍微提高原有的精度的前提下,进一步的提高了推理速度,主要引入了如下的变化点:

IOU aware

参照CIA-SSD的方法。
在单阶段的模型中,localization accuracy and classification con
fidence是两个检测头,分别训练和预测,相互之间并没有直接联系,但是事实上和GT box的IOU越大的box,classification的confidence应该也越高。考虑到这一点,可以增加一个IOU预测头。
在训练阶段,去检测box和gt的iou大小作为target,计算l1 loss,并修正IOU的预测模型。
在推理的时候,得到这个IOU,并把这个IOU去加载到classification confidence上,提高预测的准确度。

在这里插入图片描述

temporal multi frame input

参照nuScene提出的方法。
在当前点云的特征上,增加时间维度;把前几帧的点云,通过坐标变换,也变换到当前点云的坐标系下。
点云更密,有历史信息,但是计算会更加耗时,也需要多帧输入的准确的坐标变换关系。

hard voxelization-> dynamic voxelization

hard voxelization,最先由voxelnet提出,设定了voxel的总数量,以及voxel内点的上限要求,因此在做voxelization的时候,会丢弃掉voxel内点的信息,而且也会丢掉部分的voxel,降低一定的精度。
在这里插入图片描述
参考MVF提出的 dynamic voxelization的概念,在GPU上,实现dynamic voxelization。可以保留完整的点的信息,保证了一定的精度。同时CenterPoint在GPU上实现了dynamic voxelization,相对于其在CPU上的实现,大大提高了速度。50ms->2ms

two stage

针对第一阶段预测得到的box,提取box的四个面的中心点和box的中心点。
然后从featue map上提取这5个点的特征,并叠加到一起,送入第二个stage。
第二个stage,通过MLP,预测box的confidence和localization。作为对第一个阶段结果的refinment.

各个模块对精度的影响,作者做了对比实验。
在这里插入图片描述

详细的代码:
backbone_2d

'''
输入backbone 2d的类型,model_cfg.BACKBONE_2D
输入input_channels,model_info_dict['num_bev_features HxWxC,上一层网络的输出]
输出 backbone_2d_module 的模型
输出model_info_dict,增加了经过bev backbone得到的特征
'''
def build_backbone_2d(self, model_info_dict):
        if self.model_cfg.get('BACKBONE_2D', None) is None:
            return None, model_info_dict

        backbone_2d_module = backbones_2d.__all__[self.model_cfg.BACKBONE_2D.NAME](
            model_cfg=self.model_cfg.BACKBONE_2D,
            input_channels=model_info_dict['num_bev_features']
        )
        model_info_dict['module_list'].append(backbone_2d_module)
        model_info_dict['num_bev_features'] = backbone_2d_module.num_bev_features
        return backbone_2d_module, model_info_dict

BaseBEVBackbone

'''
init,中初始化2D的模型
forward,导入模型,导入上一层的特征,计算得到本层的特征
'''
class BaseBEVBackbone(nn.Module):
    def __init__(self, model_cfg, input_channels):
        super().__init__()
        self.model_cfg = model_cfg
		#读取2dbackbone的基本参数
        if self.model_cfg.get('LAYER_NUMS', None) is not None:
            assert len(self.model_cfg.LAYER_NUMS) == len(self.model_cfg.LAYER_STRIDES) == len(self.model_cfg.NUM_FILTERS)
            layer_nums = self.model_cfg.LAYER_NUMS
            layer_strides = self.model_cfg.LAYER_STRIDES
            num_filters = self.model_cfg.NUM_FILTERS
        else:
            layer_nums = layer_strides = num_filters = []
		#读取上采样的参数
        if self.model_cfg.get('UPSAMPLE_STRIDES', None) is not None:
            assert len(self.model_cfg.UPSAMPLE_STRIDES) == len(self.model_cfg.NUM_UPSAMPLE_FILTERS)
            num_upsample_filters = self.model_cfg.NUM_UPSAMPLE_FILTERS
            upsample_strides = self.model_cfg.UPSAMPLE_STRIDES
        else:
            upsample_strides = num_upsample_filters = []

        num_levels = len(layer_nums)
        c_in_list = [input_channels, *num_filters[:-1]]
        self.blocks = nn.ModuleList()
        self.deblocks = nn.ModuleList()
        for idx in range(num_levels):
            cur_layers = [
                nn.ZeroPad2d(1),
                nn.Conv2d(
                    c_in_list[idx], num_filters[idx], kernel_size=3,
                    stride=layer_strides[idx], padding=0, bias=False
                ),
                nn.BatchNorm2d(num_filters[idx], eps=1e-3, momentum=0.01),
                nn.ReLU()
            ]
            for k in range(layer_nums[idx]):
                cur_layers.extend([
                    nn.Conv2d(num_filters[idx], num_filters[idx], kernel_size=3, padding=1, bias=False),
                    nn.BatchNorm2d(num_filters[idx], eps=1e-3, momentum=0.01),
                    nn.ReLU()
                ])
            self.blocks.append(nn.Sequential(*cur_layers))
            if len(upsample_strides) > 0:
                stride = upsample_strides[idx]
                if stride >= 1:
                    self.deblocks.append(nn.Sequential(
                        nn.ConvTranspose2d(
                            num_filters[idx], num_upsample_filters[idx],
                            upsample_strides[idx],
                            stride=upsample_strides[idx], bias=False
                        ),
                        nn.BatchNorm2d(num_upsample_filters[idx], eps=1e-3, momentum=0.01),
                        nn.ReLU()
                    ))
                else:
                    stride = np.round(1 / stride).astype(np.int)
                    self.deblocks.append(nn.Sequential(
                        nn.Conv2d(
                            num_filters[idx], num_upsample_filters[idx],
                            stride,
                            stride=stride, bias=False
                        ),
                        nn.BatchNorm2d(num_upsample_filters[idx], eps=1e-3, momentum=0.01),
                        nn.ReLU()
                    ))

        c_in = sum(num_upsample_filters)
        if len(upsample_strides) > num_levels:
            self.deblocks.append(nn.Sequential(
                nn.ConvTranspose2d(c_in, c_in, upsample_strides[-1], stride=upsample_strides[-1], bias=False),
                nn.BatchNorm2d(c_in, eps=1e-3, momentum=0.01),
                nn.ReLU(),
            ))

        self.num_bev_features = c_in

    def forward(self, data_dict):
        """
        Args:
            data_dict:
                spatial_features
        Returns:
        """
        spatial_features = data_dict['spatial_features']
        ups = []
        ret_dict = {}
        x = spatial_features
        for i in range(len(self.blocks)):
            x = self.blocks[i](x)

            stride = int(spatial_features.shape[2] / x.shape[2])
            ret_dict['spatial_features_%dx' % stride] = x
            if len(self.deblocks) > 0:
                ups.append(self.deblocks[i](x))
            else:
                ups.append(x)

        if len(ups) > 1:
            x = torch.cat(ups, dim=1)
        elif len(ups) == 1:
            x = ups[0]

        if len(self.deblocks) > len(self.blocks):
            x = self.deblocks[-1](x)

        data_dict['spatial_features_2d'] = x

        return data_dict

anchor_head_single

class AnchorHeadSingle(AnchorHeadTemplate):
    def __init__(self, model_cfg, input_channels, num_class, class_names, grid_size, point_cloud_range,
                 predict_boxes_when_training=True, **kwargs):
        super().__init__(
            model_cfg=model_cfg, num_class=num_class, class_names=class_names, grid_size=grid_size, point_cloud_range=point_cloud_range,
            predict_boxes_when_training=predict_boxes_when_training
        )

        self.num_anchors_per_location = sum(self.num_anchors_per_location)
		#定义classification_head,输出每个位置的,anchor(x)xclass
        self.conv_cls = nn.Conv2d(
            input_channels, self.num_anchors_per_location * self.num_class,
            kernel_size=1
        )
        #定义regression_head,输出每个位置的,anchor(x)xbox尺寸的数量
        #比如每个位置两个正交的anchor, 预测的box和anchor的相对位置关系
        self.conv_box = nn.Conv2d(
            input_channels, self.num_anchors_per_location * self.box_coder.code_size,
            kernel_size=1
        )
		#定义direction_head,输出每个位置的,anchor(x)x角度编码
        if self.model_cfg.get('USE_DIRECTION_CLASSIFIER', None) is not None:
            self.conv_dir_cls = nn.Conv2d(
                input_channels,
                self.num_anchors_per_location * self.model_cfg.NUM_DIR_BINS,
                kernel_size=1
            )
        else:
            self.conv_dir_cls = None
        self.init_weights()

    def init_weights(self):
        pi = 0.01
        nn.init.constant_(self.conv_cls.bias, -np.log((1 - pi) / pi))
        nn.init.normal_(self.conv_box.weight, mean=0, std=0.001)

    def forward(self, data_dict):
    	#推理计算,预测得到结果
        spatial_features_2d = data_dict['spatial_features_2d']

        cls_preds = self.conv_cls(spatial_features_2d)
        box_preds = self.conv_box(spatial_features_2d)

        cls_preds = cls_preds.permute(0, 2, 3, 1).contiguous()  # [N, H, W, C]
        box_preds = box_preds.permute(0, 2, 3, 1).contiguous()  # [N, H, W, C]

        self.forward_ret_dict['cls_preds'] = cls_preds
        self.forward_ret_dict['box_preds'] = box_preds

        if self.conv_dir_cls is not None:
            dir_cls_preds = self.conv_dir_cls(spatial_features_2d)
            dir_cls_preds = dir_cls_preds.permute(0, 2, 3, 1).contiguous()
            self.forward_ret_dict['dir_cls_preds'] = dir_cls_preds
        else:
            dir_cls_preds = None
		#如果训练,计算目标
        if self.training:
            targets_dict = self.assign_targets(
                gt_boxes=data_dict['gt_boxes']
            )
            self.forward_ret_dict.update(targets_dict)
		#如果没训练,但是需要得到box,那就解码预测的结果,得到box的结果
        if not self.training or self.predict_boxes_when_training:
            batch_cls_preds, batch_box_preds = self.generate_predicted_boxes(
                batch_size=data_dict['batch_size'],
                cls_preds=cls_preds, box_preds=box_preds, dir_cls_preds=dir_cls_preds
            )
            data_dict['batch_cls_preds'] = batch_cls_preds
            data_dict['batch_box_preds'] = batch_box_preds
            data_dict['cls_preds_normalized'] = False

        return data_dict

anchor_head_template

class AnchorHeadTemplate(nn.Module):
    def __init__(self, model_cfg, num_class, class_names, grid_size, point_cloud_range, predict_boxes_when_training):
        super().__init__()
        self.model_cfg = model_cfg
        self.num_class = num_class
        self.class_names = class_names
        self.predict_boxes_when_training = predict_boxes_when_training
        self.use_multihead = self.model_cfg.get('USE_MULTIHEAD', False)
        
        anchor_target_cfg = self.model_cfg.TARGET_ASSIGNER_CONFIG
        self.box_coder = getattr(box_coder_utils, anchor_target_cfg.BOX_CODER)(
            num_dir_bins=anchor_target_cfg.get('NUM_DIR_BINS', 6),
            **anchor_target_cfg.get('BOX_CODER_CONFIG', {})
        )
        #生成anchors
        anchor_generator_cfg = self.model_cfg.ANCHOR_GENERATOR_CONFIG
        anchors, self.num_anchors_per_location = self.generate_anchors(
            anchor_generator_cfg, grid_size=grid_size, point_cloud_range=point_cloud_range,
            anchor_ndim=self.box_coder.code_size
        )
        self.anchors = [x.cuda() for x in anchors]
        #分配anchor和gtbox,得到预测的目标,class和box(box和anchor的偏移)
        self.target_assigner = self.get_target_assigner(anchor_target_cfg)

        self.forward_ret_dict = {}
        self.build_losses(self.model_cfg.LOSS_CONFIG)

    @staticmethod
    def generate_anchors(anchor_generator_cfg, grid_size, point_cloud_range, anchor_ndim=7):
        anchor_generator = AnchorGenerator(
            anchor_range=point_cloud_range,
            anchor_generator_config=anchor_generator_cfg
        )
        feature_map_size = [grid_size[:2] // config['feature_map_stride'] for config in anchor_generator_cfg]
        anchors_list, num_anchors_per_location_list = anchor_generator.generate_anchors(feature_map_size)

        if anchor_ndim != 7:
            for idx, anchors in enumerate(anchors_list):
                pad_zeros = anchors.new_zeros([*anchors.shape[0:-1], anchor_ndim - 7])
                new_anchors = torch.cat((anchors, pad_zeros), dim=-1)
                anchors_list[idx] = new_anchors

        return anchors_list, num_anchors_per_location_list

    def get_target_assigner(self, anchor_target_cfg):
        if anchor_target_cfg.NAME == 'ATSS':
            target_assigner = ATSSTargetAssigner(
                topk=anchor_target_cfg.TOPK,
                box_coder=self.box_coder,
                use_multihead=self.use_multihead,
                match_height=anchor_target_cfg.MATCH_HEIGHT
            )
        elif anchor_target_cfg.NAME == 'AxisAlignedTargetAssigner':
            target_assigner = AxisAlignedTargetAssigner(
                model_cfg=self.model_cfg,
                class_names=self.class_names,
                box_coder=self.box_coder,
                match_height=anchor_target_cfg.MATCH_HEIGHT
            )
        else:
            raise NotImplementedError
        return target_assigner

    def build_losses(self, losses_cfg):
        self.add_module(
            'cls_loss_func',
            loss_utils.SigmoidFocalClassificationLoss(alpha=0.25, gamma=2.0)
        )
        reg_loss_name = 'WeightedSmoothL1Loss' if losses_cfg.get('REG_LOSS_TYPE', None) is None \
            else losses_cfg.REG_LOSS_TYPE
        self.add_module(
            'reg_loss_func',
            getattr(loss_utils, reg_loss_name)(code_weights=losses_cfg.LOSS_WEIGHTS['code_weights'])
        )
        self.add_module(
            'dir_loss_func',
            loss_utils.WeightedCrossEntropyLoss()
        )

    def assign_targets(self, gt_boxes):
        """
        Args:
            gt_boxes: (B, M, 8)
        Returns:

        """
        targets_dict = self.target_assigner.assign_targets(
            self.anchors, gt_boxes
        )
        return targets_dict

    def get_cls_layer_loss(self):
        cls_preds = self.forward_ret_dict['cls_preds']
        box_cls_labels = self.forward_ret_dict['box_cls_labels']
        batch_size = int(cls_preds.shape[0])
        cared = box_cls_labels >= 0  # [N, num_anchors]
        positives = box_cls_labels > 0
        negatives = box_cls_labels == 0
        negative_cls_weights = negatives * 1.0
        cls_weights = (negative_cls_weights + 1.0 * positives).float()
        reg_weights = positives.float()
        if self.num_class == 1:
            # class agnostic
            box_cls_labels[positives] = 1

        pos_normalizer = positives.sum(1, keepdim=True).float()
        reg_weights /= torch.clamp(pos_normalizer, min=1.0)
        cls_weights /= torch.clamp(pos_normalizer, min=1.0)
        cls_targets = box_cls_labels * cared.type_as(box_cls_labels)
        cls_targets = cls_targets.unsqueeze(dim=-1)

        cls_targets = cls_targets.squeeze(dim=-1)
        one_hot_targets = torch.zeros(
            *list(cls_targets.shape), self.num_class + 1, dtype=cls_preds.dtype, device=cls_targets.device
        )
        one_hot_targets.scatter_(-1, cls_targets.unsqueeze(dim=-1).long(), 1.0)
        cls_preds = cls_preds.view(batch_size, -1, self.num_class)
        one_hot_targets = one_hot_targets[..., 1:]
        cls_loss_src = self.cls_loss_func(cls_preds, one_hot_targets, weights=cls_weights)  # [N, M]
        cls_loss = cls_loss_src.sum() / batch_size

        cls_loss = cls_loss * self.model_cfg.LOSS_CONFIG.LOSS_WEIGHTS['cls_weight']
        tb_dict = {
            'rpn_loss_cls': cls_loss.item()
        }
        return cls_loss, tb_dict

    @staticmethod
    def add_sin_difference(boxes1, boxes2, dim=6):
        assert dim != -1
        rad_pred_encoding = torch.sin(boxes1[..., dim:dim + 1]) * torch.cos(boxes2[..., dim:dim + 1])
        rad_tg_encoding = torch.cos(boxes1[..., dim:dim + 1]) * torch.sin(boxes2[..., dim:dim + 1])
        boxes1 = torch.cat([boxes1[..., :dim], rad_pred_encoding, boxes1[..., dim + 1:]], dim=-1)
        boxes2 = torch.cat([boxes2[..., :dim], rad_tg_encoding, boxes2[..., dim + 1:]], dim=-1)
        return boxes1, boxes2

    @staticmethod
    def get_direction_target(anchors, reg_targets, one_hot=True, dir_offset=0, num_bins=2):
        batch_size = reg_targets.shape[0]
        anchors = anchors.view(batch_size, -1, anchors.shape[-1])
        rot_gt = reg_targets[..., 6] + anchors[..., 6]
        offset_rot = common_utils.limit_period(rot_gt - dir_offset, 0, 2 * np.pi)
        dir_cls_targets = torch.floor(offset_rot / (2 * np.pi / num_bins)).long()
        dir_cls_targets = torch.clamp(dir_cls_targets, min=0, max=num_bins - 1)

        if one_hot:
            dir_targets = torch.zeros(*list(dir_cls_targets.shape), num_bins, dtype=anchors.dtype,
                                      device=dir_cls_targets.device)
            dir_targets.scatter_(-1, dir_cls_targets.unsqueeze(dim=-1).long(), 1.0)
            dir_cls_targets = dir_targets
        return dir_cls_targets

    def get_box_reg_layer_loss(self):
        box_preds = self.forward_ret_dict['box_preds']
        box_dir_cls_preds = self.forward_ret_dict.get('dir_cls_preds', None)
        box_reg_targets = self.forward_ret_dict['box_reg_targets']
        box_cls_labels = self.forward_ret_dict['box_cls_labels']
        batch_size = int(box_preds.shape[0])

        positives = box_cls_labels > 0
        reg_weights = positives.float()
        pos_normalizer = positives.sum(1, keepdim=True).float()
        reg_weights /= torch.clamp(pos_normalizer, min=1.0)

        if isinstance(self.anchors, list):
            if self.use_multihead:
                anchors = torch.cat(
                    [anchor.permute(3, 4, 0, 1, 2, 5).contiguous().view(-1, anchor.shape[-1]) for anchor in
                     self.anchors], dim=0)
            else:
                anchors = torch.cat(self.anchors, dim=-3)
        else:
            anchors = self.anchors
        anchors = anchors.view(1, -1, anchors.shape[-1]).repeat(batch_size, 1, 1)
        box_preds = box_preds.view(batch_size, -1,
                                   box_preds.shape[-1] // self.num_anchors_per_location if not self.use_multihead else
                                   box_preds.shape[-1])
        # sin(a - b) = sinacosb-cosasinb
        box_preds_sin, reg_targets_sin = self.add_sin_difference(box_preds, box_reg_targets)
        loc_loss_src = self.reg_loss_func(box_preds_sin, reg_targets_sin, weights=reg_weights)  # [N, M]
        loc_loss = loc_loss_src.sum() / batch_size

        loc_loss = loc_loss * self.model_cfg.LOSS_CONFIG.LOSS_WEIGHTS['loc_weight']
        box_loss = loc_loss
        tb_dict = {
            'rpn_loss_loc': loc_loss.item()
        }

        if box_dir_cls_preds is not None:
            dir_targets = self.get_direction_target(
                anchors, box_reg_targets,
                dir_offset=self.model_cfg.DIR_OFFSET,
                num_bins=self.model_cfg.NUM_DIR_BINS
            )

            dir_logits = box_dir_cls_preds.view(batch_size, -1, self.model_cfg.NUM_DIR_BINS)
            weights = positives.type_as(dir_logits)
            weights /= torch.clamp(weights.sum(-1, keepdim=True), min=1.0)
            dir_loss = self.dir_loss_func(dir_logits, dir_targets, weights=weights)
            dir_loss = dir_loss.sum() / batch_size
            dir_loss = dir_loss * self.model_cfg.LOSS_CONFIG.LOSS_WEIGHTS['dir_weight']
            box_loss += dir_loss
            tb_dict['rpn_loss_dir'] = dir_loss.item()

        return box_loss, tb_dict

    def get_loss(self):
        cls_loss, tb_dict = self.get_cls_layer_loss()
        box_loss, tb_dict_box = self.get_box_reg_layer_loss()
        tb_dict.update(tb_dict_box)
        rpn_loss = cls_loss + box_loss

        tb_dict['rpn_loss'] = rpn_loss.item()
        return rpn_loss, tb_dict

    def generate_predicted_boxes(self, batch_size, cls_preds, box_preds, dir_cls_preds=None):
        """
        Args:
            batch_size:
            cls_preds: (N, H, W, C1)
            box_preds: (N, H, W, C2)
            dir_cls_preds: (N, H, W, C3)

        Returns:
            batch_cls_preds: (B, num_boxes, num_classes)
            batch_box_preds: (B, num_boxes, 7+C)

        """
        if isinstance(self.anchors, list):
            if self.use_multihead:
                anchors = torch.cat([anchor.permute(3, 4, 0, 1, 2, 5).contiguous().view(-1, anchor.shape[-1])
                                     for anchor in self.anchors], dim=0)
            else:
                anchors = torch.cat(self.anchors, dim=-3)
        else:
            anchors = self.anchors
        num_anchors = anchors.view(-1, anchors.shape[-1]).shape[0]
        batch_anchors = anchors.view(1, -1, anchors.shape[-1]).repeat(batch_size, 1, 1)
        batch_cls_preds = cls_preds.view(batch_size, num_anchors, -1).float() \
            if not isinstance(cls_preds, list) else cls_preds
        batch_box_preds = box_preds.view(batch_size, num_anchors, -1) if not isinstance(box_preds, list) \
            else torch.cat(box_preds, dim=1).view(batch_size, num_anchors, -1)
        batch_box_preds = self.box_coder.decode_torch(batch_box_preds, batch_anchors)

        if dir_cls_preds is not None:
            dir_offset = self.model_cfg.DIR_OFFSET
            dir_limit_offset = self.model_cfg.DIR_LIMIT_OFFSET
            dir_cls_preds = dir_cls_preds.view(batch_size, num_anchors, -1) if not isinstance(dir_cls_preds, list) \
                else torch.cat(dir_cls_preds, dim=1).view(batch_size, num_anchors, -1)
            dir_labels = torch.max(dir_cls_preds, dim=-1)[1]

            period = (2 * np.pi / self.model_cfg.NUM_DIR_BINS)
            dir_rot = common_utils.limit_period(
                batch_box_preds[..., 6] - dir_offset, dir_limit_offset, period
            )
            batch_box_preds[..., 6] = dir_rot + dir_offset + period * dir_labels.to(batch_box_preds.dtype)

        if isinstance(self.box_coder, box_coder_utils.PreviousResidualDecoder):
            batch_box_preds[..., 6] = common_utils.limit_period(
                -(batch_box_preds[..., 6] + np.pi / 2), offset=0.5, period=np.pi * 2
            )

        return batch_cls_preds, batch_box_preds

    def forward(self, **kwargs):
        raise NotImplementedError

CenterHead in OpenPCDet

class CenterHead(nn.Module):
    def __init__(self, model_cfg, input_channels, num_class, class_names, grid_size, point_cloud_range,
                 predict_boxes_when_training=True):
        super().__init__()
        self.model_cfg = model_cfg
        self.num_class = num_class
        self.class_names = [class_names]
        self.predict_boxes_when_training = predict_boxes_when_training
        self.use_multihead = self.model_cfg.get('USE_MULTIHEAD', False)

        target_cfg = self.model_cfg.TARGET_ASSIGNER_CONFIG

        self.target_cfg = target_cfg 
        self.grid_size = grid_size
        self.point_cloud_range = point_cloud_range

        self.forward_ret_dict = {}
		#相对于原paper,用了非常简单的检测头
        self.conv_cls = nn.Conv2d(
            input_channels, self.num_class,
            kernel_size=1
        )
        self.conv_box = nn.Conv2d(
            input_channels, 8,
            kernel_size=1
        )

        self.loss_cls = GaussianFocalLoss(reduction='mean')

        self.init_weights()

    def init_weights(self):
        pi = 0.01
        nn.init.constant_(self.conv_cls.bias, -np.log((1 - pi) / pi))
        nn.init.normal_(self.conv_box.weight, mean=0, std=0.001)

    def forward(self, data_dict):
        spatial_features_2d = data_dict['spatial_features_2d']

        cls_preds = self.conv_cls(spatial_features_2d)
        box_preds = self.conv_box(spatial_features_2d)

        cls_preds = cls_preds.permute(0, 2, 3, 1).contiguous()  # [N, H, W, C]
        box_preds = box_preds.permute(0, 2, 3, 1).contiguous()  # [N, H, W, C]

        self.forward_ret_dict['cls_preds'] = cls_preds
        self.forward_ret_dict['box_preds'] = box_preds
		#如果训练,获得预测的目标
        if self.training:
            targets_dict = self.assign_targets(
                gt_boxes=data_dict['gt_boxes']
            )
            self.forward_ret_dict.update(targets_dict)
		#如果推理或者训练时要预测box,解码box
        if not self.training or self.predict_boxes_when_training:
            batch_cls_preds, batch_box_preds = self.generate_predicted_boxes(
                batch_size=data_dict['batch_size'],
                cls_preds=cls_preds, box_preds=box_preds, dir_cls_preds=None
            )
            data_dict['batch_cls_preds'] = batch_cls_preds
            data_dict['batch_box_preds'] = batch_box_preds
            data_dict['cls_preds_normalized'] = False

        return data_dict

    def _gather_feat(self, feat, ind, mask=None):
        """Gather feature map.

        Given feature map and index, return indexed feature map.

        Args:
            feat (torch.tensor): Feature map with the shape of [B, H*W, 10].
            ind (torch.Tensor): Index of the ground truth boxes with the
                shape of [B, max_obj].
            mask (torch.Tensor): Mask of the feature map with the shape
                of [B, max_obj]. Default: None.

        Returns:
            torch.Tensor: Feature map after gathering with the shape
                of [B, max_obj, 10].
        """
        dim = feat.size(2)
        ind = ind.unsqueeze(2).expand(ind.size(0), ind.size(1), dim)
        feat = feat.gather(1, ind)
        if mask is not None:
            mask = mask.unsqueeze(2).expand_as(feat)
            feat = feat[mask]
            feat = feat.view(-1, dim)
        return feat

    def assign_targets(self, gt_boxes):
        """Generate targets.

        Args:
            gt_boxes: (B, M, 8) box + cls 

        Returns:
            Returns:
                tuple
]: Tuple of target including \ the following results in order. - list[torch.Tensor]: Heatmap scores. - list[torch.Tensor]: Ground truth boxes. - list[torch.Tensor]: Indexes indicating the \ position of the valid boxes. - list[torch.Tensor]: Masks indicating which \ boxes are valid. """
gt_bboxes_3d, gt_labels_3d = gt_boxes[..., :-1], gt_boxes[..., -1] heatmaps, anno_boxes, inds, masks = multi_apply( self.get_targets_single, gt_bboxes_3d, gt_labels_3d) # transpose heatmaps, because the dimension of tensors in each task is # different, we have to use numpy instead of torch to do the transpose. heatmaps = np.array(heatmaps).transpose(1, 0).tolist() heatmaps = [torch.stack(hms_) for hms_ in heatmaps] # transpose anno_boxes anno_boxes = np.array(anno_boxes).transpose(1, 0).tolist() anno_boxes = [torch.stack(anno_boxes_) for anno_boxes_ in anno_boxes] # transpose inds inds = np.array(inds).transpose(1, 0).tolist() inds = [torch.stack(inds_) for inds_ in inds] # transpose inds masks = np.array(masks).transpose(1, 0).tolist() masks = [torch.stack(masks_) for masks_ in masks] all_targets_dict = { 'heatmaps': heatmaps, 'anno_boxes': anno_boxes, 'inds': inds, 'masks': masks } return all_targets_dict def get_targets_single(self, gt_bboxes_3d, gt_labels_3d): """Generate training targets for a single sample. Args: gt_bboxes_3d (:obj:`LiDARInstance3DBoxes`): Ground truth gt boxes. gt_labels_3d (torch.Tensor): Labels of boxes. Returns: tuple
]: Tuple of target including \ the following results in order. - list[torch.Tensor]: Heatmap scores. - list[torch.Tensor]: Ground truth boxes. - list[torch.Tensor]: Indexes indicating the position \ of the valid boxes. - list[torch.Tensor]: Masks indicating which boxes \ are valid. """
device = gt_labels_3d.device """gt_bboxes_3d = torch.cat( (gt_bboxes_3d.gravity_center, gt_bboxes_3d.tensor[:, 3:]), dim=1).to(device) """ max_objs = self.target_cfg.MAX_OBJS grid_size = torch.tensor(self.grid_size) pc_range = torch.tensor(self.point_cloud_range) voxel_size = torch.tensor(self.target_cfg.VOXEL_SIZE) feature_map_size = grid_size[:2] // self.target_cfg.OUT_SIZE_FACTOR """ # reorganize the gt_dict by tasks task_masks = [] flag = 0 for class_name in self.class_names: print(gt_labels_3d) task_masks.append([ torch.where(gt_labels_3d == class_name.index(i) + flag) for i in class_name ]) flag += len(class_name) task_boxes = [] task_classes = [] flag2 = 0 for idx, mask in enumerate(task_masks): task_box = [] task_class = [] for m in mask: task_box.append(gt_bboxes_3d[m]) # 0 is background for each task, so we need to add 1 here. task_class.append(gt_labels_3d[m] - flag2) task_boxes.append(torch.cat(task_box, axis=0).to(device)) task_classes.append(torch.cat(task_class).long().to(device)) flag2 += len(mask) """ task_boxes = [gt_bboxes_3d] task_classes = [gt_labels_3d] draw_gaussian = draw_heatmap_gaussian heatmaps, anno_boxes, inds, masks = [], [], [], [] for idx in range(1): heatmap = gt_bboxes_3d.new_zeros( (len(self.class_names[idx]), feature_map_size[1], feature_map_size[0])) anno_box = gt_bboxes_3d.new_zeros((max_objs, 8), dtype=torch.float32) ind = gt_labels_3d.new_zeros((max_objs), dtype=torch.int64) mask = gt_bboxes_3d.new_zeros((max_objs), dtype=torch.uint8) num_objs = min(task_boxes[idx].shape[0], max_objs) for k in range(num_objs): cls_id = (task_classes[idx][k] - 1).int() width = task_boxes[idx][k][3] length = task_boxes[idx][k][4] width = width / voxel_size[0] / self.target_cfg.OUT_SIZE_FACTOR length = length / voxel_size[1] / self.target_cfg.OUT_SIZE_FACTOR if width > 0 and length > 0: radius = gaussian_radius( (length, width), min_overlap=self.target_cfg.GAUSSIAN_OVERLAP) radius = max(self.target_cfg.MIN_RADIUS, int(radius)) # be really careful for the coordinate system of # your box annotation. x, y, z = task_boxes[idx][k][0], task_boxes[idx][k][ 1], task_boxes[idx][k][2] coor_x = ( x - pc_range[0] ) / voxel_size[0] / self.target_cfg.OUT_SIZE_FACTOR coor_y = ( y - pc_range[1] ) / voxel_size[1] / self.target_cfg.OUT_SIZE_FACTOR center = torch.tensor([coor_x, coor_y], dtype=torch.float32, device=device) center_int = center.to(torch.int32) # throw out not in range objects to avoid out of array # area when creating the heatmap if not (0 <= center_int[0] < feature_map_size[0] and 0 <= center_int[1] < feature_map_size[1]): continue draw_gaussian(heatmap[cls_id], center_int, radius) new_idx = k x, y = center_int[0], center_int[1] assert (y * feature_map_size[0] + x < feature_map_size[0] * feature_map_size[1]) ind[new_idx] = y * feature_map_size[0] + x mask[new_idx] = 1 rot = task_boxes[idx][k][6] box_dim = task_boxes[idx][k][3:6] box_dim = box_dim.log() anno_box[new_idx] = torch.cat([ center - torch.tensor([x, y], device=device), z.unsqueeze(0), box_dim, torch.sin(rot).unsqueeze(0), torch.cos(rot).unsqueeze(0), ]) heatmaps.append(heatmap) anno_boxes.append(anno_box) masks.append(mask) inds.append(ind) return heatmaps, anno_boxes, inds, masks def generate_predicted_boxes(self, batch_size, cls_preds, box_preds, dir_cls_preds=None): """ Args: batch_size: cls_preds: (N, H, W, C1) box_preds: (N, H, W, C2) dir_cls_preds: (N, H, W, C3) Returns: batch_cls_preds: (B, num_boxes, num_classes) batch_box_preds: (B, num_boxes, 7+C) """ batch, H, W, code_size = box_preds.size() box_preds = box_preds.reshape(batch, H*W, code_size) batch_reg = box_preds[..., 0:2] batch_hei = box_preds[..., 2:3] batch_dim = torch.exp(box_preds[..., 3:6]) batch_rots = box_preds[..., 6:7] batch_rotc = box_preds[..., 7:8] ys, xs = torch.meshgrid([torch.arange(0, H), torch.arange(0, W)]) ys = ys.view(1, H, W).repeat(batch, 1, 1).to(cls_preds.device) xs = xs.view(1, H, W).repeat(batch, 1, 1).to(cls_preds.device) xs = xs.view(batch, -1, 1) + batch_reg[:, :, 0:1] ys = ys.view(batch, -1, 1) + batch_reg[:, :, 1:2] xs = xs * self.target_cfg.OUT_SIZE_FACTOR * self.target_cfg.VOXEL_SIZE[0] + self.point_cloud_range[0] ys = ys * self.target_cfg.OUT_SIZE_FACTOR * self.target_cfg.VOXEL_SIZE[1] + self.point_cloud_range[1] rot = torch.atan2(batch_rots, batch_rotc) batch_box_preds = torch.cat([xs, ys, batch_hei, batch_dim, rot], dim=2) batch_cls_preds = cls_preds.view(batch, H*W, -1) return batch_cls_preds, batch_box_preds def get_loss(self): cls_loss, tb_dict = self.get_cls_layer_loss() box_loss, tb_dict_box = self.get_box_reg_layer_loss() tb_dict.update(tb_dict_box) rpn_loss = cls_loss + box_loss tb_dict['rpn_loss'] = rpn_loss.item() return rpn_loss, tb_dict def get_cls_layer_loss(self): # NHWC -> NCHW pred_heatmaps = clip_sigmoid(self.forward_ret_dict['cls_preds']).permute(0, 3, 1, 2) gt_heatmaps = self.forward_ret_dict['heatmaps'][0] num_pos = gt_heatmaps.eq(1).float().sum().item() cls_loss = self.loss_cls( pred_heatmaps, gt_heatmaps, avg_factor=max(num_pos, 1)) cls_loss = cls_loss * self.model_cfg.LOSS_CONFIG.LOSS_WEIGHTS['cls_weight'] tb_dict = { 'rpn_loss_cls': cls_loss.item() } return cls_loss, tb_dict def get_box_reg_layer_loss(self): # Regression loss for dimension, offset, height, rotation target_box, inds, masks = self.forward_ret_dict['anno_boxes'][0], self.forward_ret_dict['inds'][0], self.forward_ret_dict['masks'][0] ind = inds num = masks.float().sum() pred = self.forward_ret_dict['box_preds'] # N x (HxW) x 7 pred = pred.view(pred.size(0), -1, pred.size(3)) pred = self._gather_feat(pred, ind) mask = masks.unsqueeze(2).expand_as(target_box).float() isnotnan = (~torch.isnan(target_box)).float() mask *= isnotnan code_weights = self.model_cfg.LOSS_CONFIG.LOSS_WEIGHTS['code_weights'] bbox_weights = mask * mask.new_tensor(code_weights) loc_loss = l1_loss( pred, target_box, bbox_weights, avg_factor=(num + 1e-4)) loc_loss = loc_loss * self.model_cfg.LOSS_CONFIG.LOSS_WEIGHTS['loc_weight'] box_loss = loc_loss tb_dict = { 'rpn_loss_loc': loc_loss.item() } return box_loss, tb_dict

版权声明:本文为CSDN博主「huang_victor」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/huang_victor/article/details/120778544

huang_victor

我还没有学会写个人说明!

暂无评论

发表评论

相关推荐

yolov5 检测一类物体

使用yolov5官方框架检测一类物体 yolov5的官方框架可较好的对共80种类进行目标检测,本文介绍一种直接修改源代码来只检测一类物体的方法以及通用的方法(利用数据集训练自己的权重)。 一、直接修

玩转KITTI3D目标检测:KITTI评估工具evaluate的使用

近期因实验需要利用kitti数据集,发现关于评估工具使用的部分网上教程不够详细,特此记录. 文末为了方便对数据结果观看,附上了修改代码. 1. KITTI评估工具来源 官网评估工具 下载后文件目录包含: matlab(2D/3D框显示和