AI-детекция людей в опасных зонах
Зона ограниченного доступа — любая область, нахождение в которой несёт риск: зона работы робота-манипулятора, опасная зона штамповочного пресса, периметр высокого напряжения, охраняемая территория. Задача: детектировать проникновение человека в зону в реальном времени с latency < 100ms и минимальным количеством ложных тревог.
Постановка задачи: не детекция человека, а геопространственная проверка
Стандартная ошибка: детектируем человека и проверяем его bbox на пересечение с polygon зоны. Проблема: человек стоит у границы зоны, bbox пересекается с зоной на 10% — тревога? Правильно: отслеживаем конкретные точки тела (ноги) в зоне, а не весь bbox.
import cv2
import numpy as np
from shapely.geometry import Point, Polygon
from ultralytics import YOLO
from collections import defaultdict
class DangerZoneMonitor:
"""
Мониторинг опасных зон через:
1. Pose estimation → ключевые точки (ноги, руки)
2. Проверка точек в зонах через Shapely
3. Трекинг персон для снижения false positives
"""
def __init__(
self,
model_path: str = 'yolov8m-pose.pt',
danger_zones: list[dict] = None,
# Список зон: [{'id': 'zone_1', 'polygon': [(x1,y1), ...], 'severity': 'critical'}]
min_frames_before_alert: int = 3 # 3 кадра подряд = настоящее нарушение
):
self.model = YOLO(model_path)
self.zones = [
{
'id': z['id'],
'polygon': Polygon(z['polygon']),
'severity': z.get('severity', 'high'),
'check_points': z.get('check_points', 'feet') # 'feet' | 'any' | 'center'
}
for z in (danger_zones or [])
]
self.min_frames = min_frames_before_alert
self.violation_counters: dict = defaultdict(int) # person_track_id → consecutive frames
def process_frame(
self, frame: np.ndarray
) -> dict:
"""
Возвращает {'violations': [...], 'annotated_frame': np.ndarray}
"""
# YOLOv8-pose: детекция + позиция + трекинг
results = self.model.track(
frame,
persist=True, # ByteTrack трекинг
conf=0.4,
verbose=False
)[0]
active_track_ids = set()
violations = []
if results.keypoints is not None and results.boxes.id is not None:
keypoints = results.keypoints.xy.cpu().numpy() # (N, 17, 2)
confidences = results.keypoints.conf.cpu().numpy() # (N, 17)
track_ids = results.boxes.id.cpu().numpy().astype(int)
for i, (kpts, confs, track_id) in enumerate(
zip(keypoints, confidences, track_ids)
):
active_track_ids.add(track_id)
check_points = self._get_check_points(kpts, confs)
for zone in self.zones:
in_zone = any(
confs[idx] > 0.5 and
zone['polygon'].contains(Point(kpts[idx]))
for idx in check_points
)
if in_zone:
self.violation_counters[track_id] += 1
if self.violation_counters[track_id] >= self.min_frames:
violations.append({
'track_id': int(track_id),
'zone_id': zone['id'],
'severity': zone['severity'],
'consecutive_frames': self.violation_counters[track_id],
'person_bbox': results.boxes.xyxy[i].cpu().numpy().tolist()
})
else:
self.violation_counters[track_id] = 0
# Сбрасываем счётчики для исчезнувших персон
for track_id in list(self.violation_counters.keys()):
if track_id not in active_track_ids:
del self.violation_counters[track_id]
annotated = self._annotate_frame(frame, violations, results)
return {'violations': violations, 'annotated_frame': annotated}
def _get_check_points(
self, keypoints: np.ndarray, confidences: np.ndarray
) -> list[int]:
"""
COCO keypoints: 0=нос, 1-4=глаза/уши, 5-6=плечи, 7-8=локти,
9-10=запястья, 11-12=бёдра, 13-14=колени, 15-16=щиколотки
Ноги = индексы 15, 16 (щиколотки)
"""
feet_indices = [15, 16]
# Если ноги не детектированы → используем бёдра как fallback
feet_detected = any(confidences[i] > 0.5 for i in feet_indices)
if feet_detected:
return feet_indices
return [11, 12] # бёдра как запасные точки
def _annotate_frame(
self, frame: np.ndarray, violations: list, results
) -> np.ndarray:
annotated = frame.copy()
# Рисуем зоны
for zone in self.zones:
pts = np.array(list(zone['polygon'].exterior.coords), dtype=np.int32)
color = (0, 0, 255) if any(
v['zone_id'] == zone['id'] for v in violations
) else (0, 255, 0)
cv2.polylines(annotated, [pts], True, color, 2)
cv2.fillPoly(
annotated,
[pts],
tuple(int(c * 0.2) for c in color)
)
# Рисуем нарушителей
for v in violations:
bbox = list(map(int, v['person_bbox']))
cv2.rectangle(
annotated,
(bbox[0], bbox[1]), (bbox[2], bbox[3]),
(0, 0, 255), 3
)
cv2.putText(
annotated,
f"VIOLATION: {v['zone_id']}",
(bbox[0], bbox[1] - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.8,
(0, 0, 255), 2
)
return annotated
Калибровка камеры для точных координат
import cv2
import numpy as np
class ZoneCameraCalibrator:
"""
Перспективная трансформация: пиксельные координаты → реальные (метры).
Необходима для зон, заданных в метрах (требования ОТ).
"""
def __init__(
self,
reference_points_px: list, # 4 точки в пикселях (на изображении)
reference_points_world: list # 4 точки в метрах (реальные координаты)
):
src = np.float32(reference_points_px)
dst = np.float32(reference_points_world)
self.H = cv2.getPerspectiveTransform(src, dst)
def pixel_to_world(self, px: tuple) -> tuple:
"""(x_px, y_px) → (x_meters, y_meters)"""
pt = np.float32([[[px[0], px[1]]]])
world = cv2.perspectiveTransform(pt, self.H)
return float(world[0][0][0]), float(world[0][0][1])
def world_to_pixel(self, world: tuple) -> tuple:
"""(x_meters, y_meters) → (x_px, y_px)"""
H_inv = np.linalg.inv(self.H)
pt = np.float32([[[world[0], world[1]]]])
px = cv2.perspectiveTransform(pt, H_inv)
return int(px[0][0][0]), int(px[0][0][1])
Latency и параметры системы
| Параметр | Рекомендация | Почему |
|---|---|---|
| Разрешение инференса | 640×640 | Баланс скорость/точность для людей |
| FPS камеры | 15–25 fps | Движение человека: 1м/с → 4–7 см/кадр |
| min_frames_before_alert | 3–5 кадров | Снижение ложных тревог |
| Latency требование | < 100ms | Время реакции системы безопасности |
| GPU | RTX 3060 12GB | 8–12 камер одновременно при 640px |
Сроки
| Задача | Срок |
|---|---|
| Детектор проникновения в зону (1–3 камеры) | 3–5 недель |
| Масштабируемая система (10+ камер, дашборд) | 7–12 недель |
| Сертификация как система безопасности (SIL) | 20+ недель |







