AI-система контроля СИЗ и охраны труда
Детекция средств индивидуальной защиты (каски, жилеты, перчатки, маски) — одна из стандартных industrial CV-задач. Сложность в деталях: каска «не надета» и каска «надета на затылок» визуально различаются на 5–15 пикселей при типичном разрешении камеры. Плюс дисбаланс: нарушений на корректно работающем предприятии 2–5% от кадров.
Детектор СИЗ: конфигурация и датасет
from ultralytics import YOLO
import cv2
import numpy as np
from pathlib import Path
# Классы для контроля СИЗ (расширяемый список)
PPE_CLASSES = {
0: 'person',
1: 'helmet_on', # каска надета правильно
2: 'helmet_off', # каска отсутствует / снята
3: 'helmet_incorrect', # каска надета неправильно
4: 'vest_on',
5: 'vest_off',
6: 'gloves_on',
7: 'gloves_off',
8: 'mask_on',
9: 'mask_off',
10: 'glasses_on',
11: 'glasses_off',
}
# Критические нарушения (требуют немедленного алерта)
CRITICAL_VIOLATIONS = {2, 3, 5, 7, 9}
def train_ppe_detector(data_yaml: str) -> YOLO:
model = YOLO('yolov8m.pt')
model.train(
data=data_yaml,
imgsz=640,
batch=16,
epochs=200,
device='0',
# СИЗ — мелкие объекты относительно размера кадра
# Увеличиваем вес маленьких bbox в loss
box=7.5,
cls=0.5,
# Аугментации: имитируем разное освещение на производстве
hsv_h=0.015, hsv_s=0.7, hsv_v=0.4,
degrees=5,
translate=0.1,
scale=0.3,
# Без mosaic — СИЗ нельзя уменьшать бесконтрольно
mosaic=0.3,
copy_paste=0.2,
erasing=0.3, # случайное стирание — имитация частичной видимости
)
return model
Привязка СИЗ к персоне
Детектор видит людей и СИЗ отдельно. Нужно определить, какие СИЗ принадлежат какому человеку — это задача ассоциации.
from scipy.optimize import linear_sum_assignment
import numpy as np
def associate_ppe_to_persons(
person_boxes: list, # [[x1,y1,x2,y2], ...]
ppe_boxes: list, # [[x1,y1,x2,y2, class_id], ...]
iou_threshold: float = 0.3
) -> dict:
"""
Для каждой персоны определяем набор её СИЗ.
Используем вертикальную близость: каска должна быть над head bbox.
"""
person_ppe = {i: [] for i in range(len(person_boxes))}
for ppe in ppe_boxes:
px1, py1, px2, py2, cls_id = ppe
ppe_cx = (px1 + px2) / 2
ppe_cy = (py1 + py2) / 2
best_person = -1
best_overlap = 0
for pi, person in enumerate(person_boxes):
bx1, by1, bx2, by2 = person
# Расширяем bbox персоны для headgear — сверху на 30%
person_h = by2 - by1
ext_by1 = by1 - person_h * 0.3
# Проверяем, попадает ли центр СИЗ в расширенный bbox
if bx1 < ppe_cx < bx2 and ext_by1 < ppe_cy < by2:
# Вертикальное перекрытие как метрика близости
y_overlap = min(py2, by2) - max(py1, ext_by1)
if y_overlap > best_overlap:
best_overlap = y_overlap
best_person = pi
if best_person >= 0:
person_ppe[best_person].append({
'bbox': [px1, py1, px2, py2],
'class_id': int(cls_id),
'ppe_type': PPE_CLASSES.get(int(cls_id), 'unknown')
})
return person_ppe
def check_ppe_compliance(
person_ppe: dict,
required_ppe: set = {1, 4}, # требуемые классы (helmet_on, vest_on)
person_boxes: list = None
) -> list[dict]:
"""Формируем список нарушений по персонам"""
violations = []
for person_id, ppe_list in person_ppe.items():
worn_classes = {item['class_id'] for item in ppe_list}
missing = []
# Проверяем требуемые СИЗ
for req_cls in required_ppe:
# helmet_on=1, helmet_off=2, helmet_incorrect=3
# Если есть off/incorrect — нарушение
if req_cls == 1: # каска
if 2 in worn_classes or 3 in worn_classes:
missing.append({'type': 'helmet', 'severity': 'critical'})
elif 1 not in worn_classes:
missing.append({'type': 'helmet', 'severity': 'critical'})
elif req_cls == 4: # жилет
if 5 in worn_classes:
missing.append({'type': 'vest', 'severity': 'high'})
elif 4 not in worn_classes:
missing.append({'type': 'vest', 'severity': 'high'})
if missing:
violation = {
'person_id': person_id,
'person_bbox': person_boxes[person_id] if person_boxes else None,
'violations': missing,
'max_severity': 'critical' if any(
v['severity'] == 'critical' for v in missing
) else 'high'
}
violations.append(violation)
return violations
Алерты и интеграция
import asyncio
import aiohttp
from datetime import datetime
class PPEAlertSystem:
def __init__(
self,
telegram_bot_token: str,
alert_chat_id: str,
cooldown_seconds: int = 300 # алерт не чаще раза в 5 минут на зону
):
self.bot_token = telegram_bot_token
self.chat_id = alert_chat_id
self.cooldown = cooldown_seconds
self.last_alerts: dict[str, datetime] = {}
async def send_violation_alert(
self,
zone_id: str,
violations: list,
frame_image: bytes
) -> None:
"""Алерт с фото нарушения"""
now = datetime.now()
last = self.last_alerts.get(zone_id)
if last and (now - last).total_seconds() < self.cooldown:
return # в cooldown
self.last_alerts[zone_id] = now
violation_text = '\n'.join([
f"• {v['type'].upper()} — {v['severity']}"
for violation in violations
for v in violation['violations']
])
caption = (
f"🚨 Нарушение охраны труда\n"
f"Зона: {zone_id}\n"
f"Время: {now.strftime('%H:%M:%S')}\n"
f"Нарушений: {len(violations)}\n\n"
f"{violation_text}"
)
async with aiohttp.ClientSession() as session:
url = f'https://api.telegram.org/bot{self.bot_token}/sendPhoto'
data = aiohttp.FormData()
data.add_field('chat_id', self.chat_id)
data.add_field('caption', caption)
data.add_field('photo', frame_image,
filename='violation.jpg',
content_type='image/jpeg')
await session.post(url, data=data)
Metrics на реальных датасетах
| Dataset | Модель | [email protected] | FPS (RTX3060) |
|---|---|---|---|
| Safety Helmet (Roboflow) | YOLOv8s | 0.921 | 120fps |
| PPE-COCO (шлем+жилет+перчатки) | YOLOv8m | 0.874 | 80fps |
| Кастомный (производство, 8 классов СИЗ) | YOLOv8l | 0.841 | 55fps |
Сроки
| Задача | Срок |
|---|---|
| Детектор каска+жилет (public dataset fine-tuning) | 2–3 недели |
| Кастомный набор СИЗ + камерная интеграция | 4–7 недель |
| Полная система с алертами, дашбордом, статистикой | 8–14 недель |







