AI Development for Sensor Fusion: LiDAR + Camera + RADAR
Sensor fusion combines data from disparate sensors to provide a complete picture of the surroundings, unachievable with any single sensor. Camera: high resolution, color, text, but no depth. LiDAR: accurate 3D map, 360°, works at night, but is weak in precipitation. RADAR: velocity measurement (Doppler), works in any weather, but has low angular resolution. Three fusion levels: early (raw data), feature (intermediate representations), late (results of independent models).
BEVFusion: Camera + LiDAR Fusion
import torch
import torch.nn as nn
import numpy as np
from typing import Optional
class CameraLiDARFusion(nn.Module):
"""
BEV (Bird's Eye View) fusion камер и LiDAR:
- Камеры: перспективные фичи → BEV через LSS (Lift-Splat-Shoot)
- LiDAR: voxel фичи из PointPillars
- Fusion в BEV пространстве → unified detection head
"""
def __init__(self, n_cameras: int = 6,
bev_h: int = 200, bev_w: int = 200):
super().__init__()
self.n_cameras = n_cameras
self.bev_h = bev_h
self.bev_w = bev_w
# Camera backbone (общий для всех камер)
import timm
self.cam_backbone = timm.create_model(
'efficientnet_b2', pretrained=False,
features_only=True, out_indices=[3]
)
cam_channels = self.cam_backbone.feature_info.channels()[-1]
# LSS: подъём фичей в 3D через глубину
self.depth_net = nn.Sequential(
nn.Conv2d(cam_channels, 128, 3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(128, 64, 1) # 64 bins глубины
)
self.cam_feat_net = nn.Conv2d(cam_channels, 64, 1)
# LiDAR PointPillar encoder
self.lidar_encoder = PointPillarEncoder(out_channels=128)
# BEV fusion head
self.fusion_conv = nn.Sequential(
nn.Conv2d(128 + 64, 256, 3, padding=1),
nn.BatchNorm2d(256),
nn.ReLU(inplace=True),
nn.Conv2d(256, 256, 3, padding=1),
nn.BatchNorm2d(256),
nn.ReLU(inplace=True)
)
# Detection head (simplified CenterPoint)
self.det_head = nn.Sequential(
nn.Conv2d(256, 128, 3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(128, 10, 1) # heatmap для 10 классов объектов
)
def forward(self, camera_imgs: torch.Tensor,
lidar_points: torch.Tensor,
cam_intrinsics: torch.Tensor,
cam_extrinsics: torch.Tensor) -> dict:
B, N, C, H, W = camera_imgs.shape
# Обработка всех камер батчем
imgs_flat = camera_imgs.view(B*N, C, H, W)
cam_feats = self.cam_backbone(imgs_flat)[0] # [B*N, C', H', W']
cam_feats = cam_feats.view(B, N, *cam_feats.shape[1:])
# LSS projection (упрощённо)
bev_cam = self._lss_project(cam_feats, cam_intrinsics, cam_extrinsics)
# LiDAR BEV features
bev_lidar = self.lidar_encoder(lidar_points)
# Resize для совмещения разрешений
bev_cam_r = nn.functional.interpolate(
bev_cam, size=(self.bev_h, self.bev_w), mode='bilinear'
)
bev_lidar_r = nn.functional.interpolate(
bev_lidar, size=(self.bev_h, self.bev_w), mode='bilinear'
)
# Конкатенация и fusion
fused = torch.cat([bev_cam_r, bev_lidar_r], dim=1)
fused = self.fusion_conv(fused)
heatmap = self.det_head(fused)
return {
'bev_features': fused,
'detection_heatmap': heatmap
}
def _lss_project(self, cam_feats, intrinsics, extrinsics):
"""Упрощённая LSS проекция в BEV"""
B, N = cam_feats.shape[:2]
# Агрегация через max pooling как baseline
merged = cam_feats.max(dim=1).values # [B, C', H', W']
return merged
class PointPillarEncoder(nn.Module):
def __init__(self, out_channels: int = 128):
super().__init__()
self.pillar_net = nn.Sequential(
nn.Linear(9, 64),
nn.ReLU(),
nn.Linear(64, out_channels)
)
def forward(self, points: torch.Tensor) -> torch.Tensor:
# Упрощённый backbone: points → BEV feature map
B = points.shape[0]
feats = self.pillar_net(points[:, :, :9] if points.shape[-1] >= 9
else torch.zeros(B, 1, 9, device=points.device))
return feats.mean(dim=1).unsqueeze(-1).unsqueeze(-1).expand(-1, -1, 50, 50)
RADAR + Camera Fusion for object speed
class RadarCameraFusion:
"""
Late fusion: независимые детекции RADAR и Camera → объединение.
RADAR даёт: дистанцию, скорость (Doppler), угол.
Camera даёт: класс объекта, точный bbox, visual features.
"""
def __init__(self, max_association_dist_m: float = 3.0):
self.max_dist = max_association_dist_m
def fuse(self, camera_detections: list[dict],
radar_targets: list[dict]) -> list[dict]:
"""
camera_detections: [{'bbox', 'class', 'confidence', 'distance_est'}]
radar_targets: [{'range_m', 'azimuth_deg', 'velocity_mps', 'rcs'}]
"""
fused = []
# Преобразование RADAR polar → Cartesian
radar_xy = []
for rt in radar_targets:
angle_rad = np.radians(rt['azimuth_deg'])
rx = rt['range_m'] * np.sin(angle_rad)
ry = rt['range_m'] * np.cos(angle_rad)
radar_xy.append((rx, ry, rt))
matched_radar = set()
for cam_det in camera_detections:
best_radar_idx = None
best_dist = self.max_dist
cam_dist = cam_det.get('distance_est', float('inf'))
# Простая ассоциация по дистанции (y ~ range)
for i, (rx, ry, rt) in enumerate(radar_xy):
if i in matched_radar:
continue
dist_diff = abs(ry - cam_dist)
if dist_diff < best_dist:
best_dist = dist_diff
best_radar_idx = i
fused_det = {**cam_det}
if best_radar_idx is not None:
matched_radar.add(best_radar_idx)
_, _, rt = radar_xy[best_radar_idx]
fused_det['range_m'] = rt['range_m']
fused_det['velocity_mps'] = rt['velocity_mps']
fused_det['azimuth_deg'] = rt['azimuth_deg']
fused_det['radar_matched'] = True
# Refinement: уточняем дистанцию RADAR'ом (точнее монокамеры)
fused_det['distance_est'] = rt['range_m']
else:
fused_det['velocity_mps'] = None
fused_det['radar_matched'] = False
fused.append(fused_det)
return fused
Temporal Fusion and Trajectory Prediction
class TemporalObjectFusion:
"""
Kalman Filter для объединения измерений во времени.
State: [x, y, vx, vy, ax, ay] в метрах
"""
def __init__(self):
import filterpy.kalman as kalman
self.trackers: dict[int, kalman.KalmanFilter] = {}
self._next_id = 0
def _create_tracker(self) -> 'kalman.KalmanFilter':
from filterpy.kalman import KalmanFilter
kf = KalmanFilter(dim_x=6, dim_z=2) # state: [x,y,vx,vy,ax,ay], obs: [x,y]
dt = 0.1 # 10 FPS
kf.F = np.array([[1,0,dt,0,0.5*dt**2,0],
[0,1,0,dt,0,0.5*dt**2],
[0,0,1,0,dt,0],
[0,0,0,1,0,dt],
[0,0,0,0,1,0],
[0,0,0,0,0,1]])
kf.H = np.array([[1,0,0,0,0,0],
[0,1,0,0,0,0]])
kf.R *= 0.5 # measurement noise
kf.Q *= 0.1 # process noise
return kf
def update(self, object_id: int, x_m: float, y_m: float) -> dict:
if object_id not in self.trackers:
self.trackers[object_id] = self._create_tracker()
self.trackers[object_id].x = np.array([[x_m],[y_m],[0],[0],[0],[0]])
kf = self.trackers[object_id]
kf.predict()
kf.update(np.array([[x_m], [y_m]]))
state = kf.x.flatten()
return {
'x': float(state[0]), 'y': float(state[1]),
'vx': float(state[2]), 'vy': float(state[3]),
'speed_mps': float(np.sqrt(state[2]**2 + state[3]**2))
}
| Configuration |
mAP (3D Det) |
Latency |
| Camera only (DepthEst) |
38–45% |
25 ms |
| LiDAR only (PointPillars) |
60–68% |
35 ms |
| Camera + LiDAR (BEVFusion) |
70–75% |
65 ms |
| + RADAR (late fusion) |
72–77% |
70 ms |
| Full late fusion (all 3) |
74–79% |
80 ms |
| Task |
Term |
| Camera + LiDAR late fusion pipeline |
10–16 weeks |
| BEVFusion architecture with learning |
20–30 weeks |
| Production-ready + RADAR + temporal fusion |
32–48 weeks |