Разработка AI для мониторинга безопасности в горнодобывающей промышленности
Горнодобывающие предприятия — второй по смертности сектор после строительства. AI-мониторинг шахт и карьеров решает: контроль СИЗ в подземных условиях (плохое освещение, запылённость), мониторинг опасных зон, трекинг персонала (underground personnel tracking), детекция аварийных ситуаций. Особенности: работа при минимальном освещении, термальные камеры для теплового контроля горных выработок, интеграция с SCADA системами.
Мониторинг безопасности в шахте
import numpy as np
import cv2
from ultralytics import YOLO
from dataclasses import dataclass
import time
from typing import Optional
@dataclass
class MineWorkerStatus:
worker_id: int
bbox: list
has_helmet: bool
has_lamp: bool # налобный фонарь (обязателен в шахте)
has_reflective_vest: bool
has_mask: bool # противопылевая маска
in_danger_zone: bool
proximity_to_machinery: bool
compliance: bool
class MineMonitoringSystem:
"""
Система мониторинга безопасности в горнодобывающей отрасли.
Учёт специфики: низкое освещение, дым/пыль, ИК-камеры.
Mine Safety Dataset (custom):
- mining_helmet, lamp_headlamp, reflective_jacket
- mining_equipment, person, danger_zone_marker
"""
MINE_PPE_CLASSES = {
0: 'person',
1: 'mining_helmet', # шахтная каска с фонарём
2: 'headlamp', # фонарь отдельно
3: 'reflective_jacket', # световозвращающий жилет
4: 'dust_mask',
5: 'safety_boots',
6: 'no_helmet', # нарушение
7: 'no_headlamp', # нарушение
8: 'mining_equipment', # комбайн, погрузчик
}
CRITICAL_ZONES = ['roof_instability', 'gas_presence', 'machinery_working']
def __init__(self, model_path: str,
thermal_model_path: Optional[str] = None,
danger_zones: Optional[dict] = None,
device: str = 'cuda'):
self.model = YOLO(model_path)
self.thermal_model = YOLO(thermal_model_path) if thermal_model_path else None
self.danger_zones = danger_zones or {}
self.device = device
self._worker_tracks: dict[int, dict] = {}
def process_frame(self, frame: np.ndarray,
camera_id: str,
timestamp: float = None) -> dict:
if timestamp is None:
timestamp = time.time()
# Улучшение для тёмных/пыльных условий
enhanced = self._enhance_low_light(frame)
results = self.model.track(
enhanced, persist=True, conf=0.40,
verbose=False, tracker='bytetrack.yaml'
)
workers_status = []
violations = []
if results[0].boxes is None:
return self._empty_result(camera_id, timestamp)
persons = {}
ppe_items = {}
for box in results[0].boxes:
cls_id = int(box.cls.item())
cls_name = self.MINE_PPE_CLASSES.get(cls_id, 'unknown')
x1, y1, x2, y2 = map(int, box.xyxy[0])
cx, cy = (x1+x2)//2, (y1+y2)//2
tid = int(box.id.item()) if box.id is not None else -1
if cls_name == 'person':
persons[tid] = {
'bbox': [x1,y1,x2,y2], 'center': (cx,cy),
'has_helmet': False, 'has_lamp': False,
'has_vest': False, 'has_mask': False,
'violations': []
}
elif cls_name in ('no_helmet', 'no_headlamp'):
ppe_items[tid] = {
'type': cls_name, 'center': (cx, cy),
'violation': True
}
elif cls_name in ('mining_helmet', 'headlamp',
'reflective_jacket', 'dust_mask'):
ppe_items[tid] = {
'type': cls_name, 'center': (cx, cy),
'violation': False
}
# Ассоциация PPE с работниками
for item_tid, item_data in ppe_items.items():
nearest = self._find_nearest(item_data['center'], persons)
if nearest is None:
continue
p = persons[nearest]
if item_data['violation']:
p['violations'].append(item_data['type'])
else:
t = item_data['type']
if t == 'mining_helmet':
p['has_helmet'] = True
elif t == 'headlamp':
p['has_lamp'] = True
elif t == 'reflective_jacket':
p['has_vest'] = True
elif t == 'dust_mask':
p['has_mask'] = True
# Оценка каждого работника
for pid, pdata in persons.items():
in_danger = self._check_danger_zone(pdata['center'])
compliance = (pdata['has_helmet'] and
not pdata['violations'] and
not in_danger)
ws = MineWorkerStatus(
worker_id=pid, bbox=pdata['bbox'],
has_helmet=pdata['has_helmet'],
has_lamp=pdata['has_lamp'],
has_reflective_vest=pdata['has_vest'],
has_mask=pdata['has_mask'],
in_danger_zone=in_danger,
proximity_to_machinery=False,
compliance=compliance
)
workers_status.append(ws)
if not compliance or pdata['violations']:
violations.append({
'worker_id': pid,
'bbox': pdata['bbox'],
'issues': pdata['violations'] + (['danger_zone'] if in_danger else []),
'severity': 'critical' if in_danger else 'warning'
})
total = len(workers_status)
compliant = sum(1 for w in workers_status if w.compliance)
return {
'camera_id': camera_id,
'timestamp': timestamp,
'workers': [w.__dict__ for w in workers_status],
'violations': violations,
'compliance_rate_pct': round(compliant/max(total,1)*100, 1),
'critical_alert': any(v['severity']=='critical' for v in violations)
}
def _enhance_low_light(self, frame: np.ndarray) -> np.ndarray:
"""Улучшение видимости в тёмных условиях шахты"""
lab = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB)
clahe = cv2.createCLAHE(clipLimit=4.0, tileGridSize=(8,8))
lab[:,:,0] = clahe.apply(lab[:,:,0])
enhanced = cv2.cvtColor(lab, cv2.COLOR_LAB2BGR)
# Небольшое повышение яркости
enhanced = cv2.convertScaleAbs(enhanced, alpha=1.2, beta=20)
return enhanced
def _check_danger_zone(self, center: tuple) -> bool:
for zone_id, polygon in self.danger_zones.items():
poly = np.array(polygon, dtype=np.int32)
if cv2.pointPolygonTest(poly,
(float(center[0]), float(center[1])), False) >= 0:
return True
return False
def _find_nearest(self, center: tuple, persons: dict) -> Optional[int]:
min_dist = 150
nearest = None
cx, cy = center
for pid, p in persons.items():
px, py = p['center']
dist = np.sqrt((cx-px)**2 + (cy-py)**2)
if dist < min_dist:
min_dist = dist
nearest = pid
return nearest
def _empty_result(self, camera_id: str, timestamp: float) -> dict:
return {
'camera_id': camera_id, 'timestamp': timestamp,
'workers': [], 'violations': [],
'compliance_rate_pct': 100.0, 'critical_alert': False
}
| Условие |
Detection Rate |
False Positive |
| Нормальное освещение (открытый карьер) |
93–97% |
2–4% |
| Тёмная шахта (CLAHE enhanced) |
82–89% |
6–12% |
| Пыль/дым (коэффициент снижения 15–30%) |
72–82% |
10–18% |
| Тепловизор (LWIR, любые условия) |
88–94% |
3–7% |
| Задача |
Срок |
| PPE мониторинг для карьера (хорошее освещение) |
4–6 недель |
| Подземная шахта + ИК + SCADA интеграция |
10–16 недель |
| Enterprise mine safety платформа |
18–28 недель |