Road Traffic Monitoring System via Video Analytics

We design and deploy artificial intelligence systems: from prototype to production-ready solutions. Our team combines expertise in machine learning, data engineering and MLOps to make AI work not in the lab, but in real business.
Showing 1 of 1 servicesAll 1566 services
Road Traffic Monitoring System via Video Analytics
Complex
~1-2 weeks
FAQ
AI Development Areas
AI Solution Development Stages
Latest works
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    823
  • image_logo-aider_0.jpg
    AIDER company logo development
    762
  • image_crm_chasseurs_493_0.webp
    CRM development for Chasseurs
    848

System Development мониторинга дорожного трафика через видеоаналитику

Видеоаналитика дорожного трафика — автоматизация задач, которые ранее требовали ручного подсчёта или дорогостоящих петлевых датчиков в асфальте. Типовые метрики: объём трафика по направлениям, скорость потока, плотность, классификация ТС, детекция инцидентов (ДТП, остановившиеся автомобили, нарушения).

Мультикатегорийная классификация транспорта

from ultralytics import YOLO
import numpy as np

VEHICLE_CLASSES = {
    0: 'bicycle',
    1: 'bus',
    2: 'car',
    3: 'motorcycle',
    4: 'truck',
    5: 'pedestrian'
}

class TrafficMonitor:
    def __init__(self, model_path: str, detector_conf: float = 0.5):
        self.detector = YOLO(model_path)  # обученный на COCO + дообученный
        self.conf = detector_conf
        self.tracker = ByteTracker()
        self.speed_estimator = SpeedEstimator()

        # Счётчики по классам и направлениям
        self.counts = {cls: {'north': 0, 'south': 0, 'east': 0, 'west': 0}
                       for cls in VEHICLE_CLASSES.values()}

    def process(self, frame: np.ndarray) -> dict:
        detections = self.detector(frame, conf=self.conf,
                                    classes=list(VEHICLE_CLASSES.keys()))
        tracks = self.tracker.update(detections[0])

        vehicles = []
        for track in tracks:
            vehicle_class = VEHICLE_CLASSES.get(int(track.class_id), 'car')
            bbox = track.bbox.tolist()
            center = (int((bbox[0]+bbox[2])/2), int((bbox[1]+bbox[3])/2))

            # Оценка скорости по изменению позиции трека
            speed = self.speed_estimator.estimate(track.track_id, center)

            vehicles.append({
                'track_id': track.track_id,
                'class': vehicle_class,
                'bbox': bbox,
                'center': center,
                'speed_kmh': speed,
                'direction': self._estimate_direction(track.track_id)
            })

        return {
            'vehicles': vehicles,
            'total': len(vehicles),
            'by_class': self._count_by_class(vehicles),
            'avg_speed': self._average_speed(vehicles)
        }

Оценка скорости

class SpeedEstimator:
    def __init__(self, fps: float = 30.0, pixels_per_meter: float = 50.0):
        self.fps = fps
        self.ppm = pixels_per_meter
        self.track_positions = {}
        self.track_speeds = {}

    def estimate(self, track_id: int, position: tuple) -> float:
        """Скорость в км/ч через изменение позиции"""
        if track_id not in self.track_positions:
            self.track_positions[track_id] = []
        self.track_positions[track_id].append(position)

        history = self.track_positions[track_id]
        if len(history) < 5:
            return 0.0

        # Среднее смещение за последние 5 кадров
        recent = history[-5:]
        total_dist_px = sum(
            np.linalg.norm(np.array(recent[i]) - np.array(recent[i-1]))
            for i in range(1, len(recent))
        )
        avg_dist_px_per_frame = total_dist_px / (len(recent) - 1)
        dist_m_per_frame = avg_dist_px_per_frame / self.ppm
        speed_ms = dist_m_per_frame * self.fps
        speed_kmh = speed_ms * 3.6

        self.track_speeds[track_id] = speed_kmh
        return round(speed_kmh, 1)

Детекция дорожных инцидентов

class IncidentDetector:
    def __init__(self, stopped_threshold_sec: float = 30.0):
        self.stopped_vehicles = {}  # track_id -> (position, first_seen)
        self.stopped_threshold = stopped_threshold_sec
        self.incident_cooldown = {}

    def check_incidents(self, vehicles: list[dict],
                         timestamp: float) -> list[dict]:
        incidents = []

        for vehicle in vehicles:
            tid = vehicle['track_id']
            speed = vehicle.get('speed_kmh', 0)
            pos = vehicle['center']

            # Детекция остановившегося ТС
            if speed < 2:  # фактически стоит
                if tid not in self.stopped_vehicles:
                    self.stopped_vehicles[tid] = (pos, timestamp)
                else:
                    stopped_pos, first_seen = self.stopped_vehicles[tid]
                    duration = timestamp - first_seen

                    if duration > self.stopped_threshold:
                        if tid not in self.incident_cooldown or \
                           timestamp - self.incident_cooldown[tid] > 120:
                            incidents.append({
                                'type': 'stopped_vehicle',
                                'vehicle_id': tid,
                                'class': vehicle['class'],
                                'position': pos,
                                'duration_sec': duration
                            })
                            self.incident_cooldown[tid] = timestamp
            else:
                # Убираем из словаря остановившихся
                self.stopped_vehicles.pop(tid, None)

        return incidents

Классификация транспортных потоков

Выходные метрики для транспортного ведомства:

  • PCE (Passenger Car Equivalent): грузовик = 2.0 PCE, автобус = 1.5 PCE, мотоцикл = 0.5 PCE
  • LOS (Level of Service): V/C ratio → уровень A–F
  • Speed distribution: гистограмма скоростей
  • Headway: интервал между автомобилями
Метрика системы Значение
Точность классификации ТС 92–96%
Точность подсчёта (recall) 95–98%
Точность скорости ±5–10 км/ч
Latency обнаружения инцидента < 5 секунд
Масштаб Срок
1–4 перекрёстка, базовый мониторинг 5–7 недель
10–30 точек, интеграция с АСУДД 10–16 недель
Городская система, 100+ камер 18–28 недель