AI system for autonomous marine vessels
Autonomous vessel control is a task at the intersection of CV, planning, and maritime law. COLREG (the International Collision Prevention Regulations) requires the system not only to detect vessels but also to correctly interpret their lights and execute yield maneuvers according to the regulations. This level of complexity is significantly higher than that of autonomous vehicles.
Perception: Multisensory Fusion at Sea
The marine environment is specific: waves, spray on the lens, fog, blinding sun on the water, night lights.
import numpy as np
import cv2
from ultralytics import YOLO
class MarinePerceptionSystem:
def __init__(self, config: dict):
# Детектор судов и препятствий: YOLOv8l, дообученный на морских данных
self.vessel_detector = YOLO(config['vessel_model'])
# Радар-данные (ARPA/AIS)
self.radar_parser = RadarARPAParser(config['radar_port'])
self.ais_receiver = AISReceiver(config['ais_port'])
# LiDAR (Ouster OS1) для ближней зоны < 100м
self.lidar_processor = MarineLiDAR(config['lidar_config'])
self.camera_matrix = np.array(config['cam_intrinsics'])
def fuse_detections(self, frame: np.ndarray,
radar_tracks: list,
ais_contacts: list,
lidar_points: np.ndarray) -> list[dict]:
# Камера: обнаружение судов, буёв, плавника, людей за бортом
cam_dets = self.vessel_detector(frame, conf=0.4)
fused_contacts = []
for det in cam_dets[0].boxes:
cls = self.vessel_detector.model.names[int(det.cls)]
bbox = list(map(int, det.xyxy[0]))
bearing = self._bearing_from_bbox(bbox, frame.shape)
# Дистанция из LiDAR (если есть точки в секторе)
distance = self._lidar_distance_in_sector(lidar_points, bearing)
# Сопоставление с AIS (если есть MMSI в том направлении)
ais_match = self._match_ais(bearing, distance, ais_contacts)
contact = {
'class': cls,
'bearing_deg': bearing,
'distance_m': distance,
'confidence': float(det.conf),
'bbox': bbox,
'ais_data': ais_match,
'source': 'camera+lidar'
}
# Дополняем радарным треком
radar_match = self._match_radar(bearing, distance, radar_tracks)
if radar_match:
contact['cog'] = radar_match.get('cog') # курс
contact['sog'] = radar_match.get('sog') # скорость
contact['tcpa'] = radar_match.get('tcpa') # время до CPA
contact['cpa'] = radar_match.get('cpa') # минимальное расстояние
fused_contacts.append(contact)
return fused_contacts
def _bearing_from_bbox(self, bbox: list, frame_shape: tuple) -> float:
cx = (bbox[0] + bbox[2]) / 2
fov_h = 60 # градусов горизонтального обзора
return (cx / frame_shape[1] - 0.5) * fov_h # относительно курса
COLREG-compliant maneuver planning
class COLREGPlanner:
"""
Правила МППСС-72 (COLREG): определяем тип ситуации
и обязательный манёвр.
"""
def assess_situation(self, own_vessel: dict,
target: dict) -> dict:
bearing_to_target = target['bearing_deg']
tcpa = target.get('tcpa', float('inf'))
cpa = target.get('cpa', float('inf'))
situation = 'safe'
action = 'none'
# COLREG Rule 13: обгон
if -22.5 <= bearing_to_target <= 22.5 and tcpa < 12 * 60:
situation = 'overtaking'
# Мы обгоняем: уступаем дорогу
action = 'alter_course_starboard'
# COLREG Rule 14: курсы на встречу
elif abs(bearing_to_target) < 5:
situation = 'head_on'
action = 'alter_course_starboard'
# COLREG Rule 15: пересечение курсов
elif 0 < bearing_to_target < 112.5:
situation = 'crossing_give_way'
action = 'alter_course_starboard_or_reduce_speed'
elif -112.5 < bearing_to_target < 0:
situation = 'crossing_stand_on'
action = 'maintain_course_and_speed'
return {
'situation': situation,
'action': action,
'tcpa_minutes': tcpa / 60,
'cpa_meters': cpa,
'urgency': 'HIGH' if cpa < 500 and tcpa < 5 * 60 else
'MEDIUM' if cpa < 1000 else 'LOW'
}
Man Overboard Detection
MOB detection is a critical function. A person in the water is a small object (30x40 pixels at 50 meters) that can be hidden by waves.
class MOBDetector:
def __init__(self):
self.detector = YOLO('yolov8m_mob.pt') # дообученный на морских людях
self.thermal_model = ThermalPersonDetector() # для ночи
def detect(self, frame: np.ndarray,
thermal_frame: np.ndarray = None) -> list:
# RGB детекция
rgb_dets = self.detector(frame, conf=0.35, classes=[0]) # person
dets = list(rgb_dets[0].boxes)
# Ночью или при плохой видимости — тепловизор
if thermal_frame is not None:
thermal_dets = self.thermal_model.detect(thermal_frame)
dets.extend(thermal_dets)
# Фильтр: человек в воде имеет bbox близко к горизонту
horizon_y = frame.shape[0] * 0.4 # примерно
mob_candidates = []
for det in dets:
bbox = list(map(int, det.xyxy[0]))
if bbox[1] > horizon_y: # ниже горизонта = в воде
mob_candidates.append({
'bbox': bbox,
'confidence': float(det.conf)
})
return mob_candidates
System characteristics
| Parameter | Meaning |
|---|---|
| Vessel detection range (camera) | Up to 3 km (clear weather) |
| LiDAR Range (Ouster OS1-64) | Up to 120m |
| Latency of the full cycle of perception | 150–250 ms |
| Supported object types | Vessels, buoys, fins, MOB |
| Integration | NMEA 2000, CAN bus, ARPA radar |
| Project type | Term |
|---|---|
| Perception system (perception only) | 3-5 months |
| Perception + COLREG planning | 6–10 months |
| Completely self-contained system with certification | 18–36 months |







