System Development мониторинга дорожного трафика через видеоаналитику
Видеоаналитика дорожного трафика — автоматизация задач, которые ранее требовали ручного подсчёта или дорогостоящих петлевых датчиков в асфальте. Типовые метрики: объём трафика по направлениям, скорость потока, плотность, классификация ТС, детекция инцидентов (ДТП, остановившиеся автомобили, нарушения).
Мультикатегорийная классификация транспорта
from ultralytics import YOLO
import numpy as np
VEHICLE_CLASSES = {
0: 'bicycle',
1: 'bus',
2: 'car',
3: 'motorcycle',
4: 'truck',
5: 'pedestrian'
}
class TrafficMonitor:
def __init__(self, model_path: str, detector_conf: float = 0.5):
self.detector = YOLO(model_path) # обученный на COCO + дообученный
self.conf = detector_conf
self.tracker = ByteTracker()
self.speed_estimator = SpeedEstimator()
# Счётчики по классам и направлениям
self.counts = {cls: {'north': 0, 'south': 0, 'east': 0, 'west': 0}
for cls in VEHICLE_CLASSES.values()}
def process(self, frame: np.ndarray) -> dict:
detections = self.detector(frame, conf=self.conf,
classes=list(VEHICLE_CLASSES.keys()))
tracks = self.tracker.update(detections[0])
vehicles = []
for track in tracks:
vehicle_class = VEHICLE_CLASSES.get(int(track.class_id), 'car')
bbox = track.bbox.tolist()
center = (int((bbox[0]+bbox[2])/2), int((bbox[1]+bbox[3])/2))
# Оценка скорости по изменению позиции трека
speed = self.speed_estimator.estimate(track.track_id, center)
vehicles.append({
'track_id': track.track_id,
'class': vehicle_class,
'bbox': bbox,
'center': center,
'speed_kmh': speed,
'direction': self._estimate_direction(track.track_id)
})
return {
'vehicles': vehicles,
'total': len(vehicles),
'by_class': self._count_by_class(vehicles),
'avg_speed': self._average_speed(vehicles)
}
Оценка скорости
class SpeedEstimator:
def __init__(self, fps: float = 30.0, pixels_per_meter: float = 50.0):
self.fps = fps
self.ppm = pixels_per_meter
self.track_positions = {}
self.track_speeds = {}
def estimate(self, track_id: int, position: tuple) -> float:
"""Скорость в км/ч через изменение позиции"""
if track_id not in self.track_positions:
self.track_positions[track_id] = []
self.track_positions[track_id].append(position)
history = self.track_positions[track_id]
if len(history) < 5:
return 0.0
# Среднее смещение за последние 5 кадров
recent = history[-5:]
total_dist_px = sum(
np.linalg.norm(np.array(recent[i]) - np.array(recent[i-1]))
for i in range(1, len(recent))
)
avg_dist_px_per_frame = total_dist_px / (len(recent) - 1)
dist_m_per_frame = avg_dist_px_per_frame / self.ppm
speed_ms = dist_m_per_frame * self.fps
speed_kmh = speed_ms * 3.6
self.track_speeds[track_id] = speed_kmh
return round(speed_kmh, 1)
Детекция дорожных инцидентов
class IncidentDetector:
def __init__(self, stopped_threshold_sec: float = 30.0):
self.stopped_vehicles = {} # track_id -> (position, first_seen)
self.stopped_threshold = stopped_threshold_sec
self.incident_cooldown = {}
def check_incidents(self, vehicles: list[dict],
timestamp: float) -> list[dict]:
incidents = []
for vehicle in vehicles:
tid = vehicle['track_id']
speed = vehicle.get('speed_kmh', 0)
pos = vehicle['center']
# Детекция остановившегося ТС
if speed < 2: # фактически стоит
if tid not in self.stopped_vehicles:
self.stopped_vehicles[tid] = (pos, timestamp)
else:
stopped_pos, first_seen = self.stopped_vehicles[tid]
duration = timestamp - first_seen
if duration > self.stopped_threshold:
if tid not in self.incident_cooldown or \
timestamp - self.incident_cooldown[tid] > 120:
incidents.append({
'type': 'stopped_vehicle',
'vehicle_id': tid,
'class': vehicle['class'],
'position': pos,
'duration_sec': duration
})
self.incident_cooldown[tid] = timestamp
else:
# Убираем из словаря остановившихся
self.stopped_vehicles.pop(tid, None)
return incidents
Классификация транспортных потоков
Выходные метрики для транспортного ведомства:
- PCE (Passenger Car Equivalent): грузовик = 2.0 PCE, автобус = 1.5 PCE, мотоцикл = 0.5 PCE
- LOS (Level of Service): V/C ratio → уровень A–F
- Speed distribution: гистограмма скоростей
- Headway: интервал между автомобилями
| Метрика системы | Значение |
|---|---|
| Точность классификации ТС | 92–96% |
| Точность подсчёта (recall) | 95–98% |
| Точность скорости | ±5–10 км/ч |
| Latency обнаружения инцидента | < 5 секунд |
| Масштаб | Срок |
|---|---|
| 1–4 перекрёстка, базовый мониторинг | 5–7 недель |
| 10–30 точек, интеграция с АСУДД | 10–16 недель |
| Городская система, 100+ камер | 18–28 недель |







