AI Mine Safety Monitoring System

We design and deploy artificial intelligence systems: from prototype to production-ready solutions. Our team combines expertise in machine learning, data engineering and MLOps to make AI work not in the lab, but in real business.
Showing 1 of 1 servicesAll 1566 services
AI Mine Safety Monitoring System
Medium
~2-4 weeks
FAQ
AI Development Areas
AI Solution Development Stages
Latest works
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    823
  • image_logo-aider_0.jpg
    AIDER company logo development
    762
  • image_crm_chasseurs_493_0.webp
    CRM development for Chasseurs
    848

Разработка AI для мониторинга безопасности в горнодобывающей промышленности

Горнодобывающие предприятия — второй по смертности сектор после строительства. AI-мониторинг шахт и карьеров решает: контроль СИЗ в подземных условиях (плохое освещение, запылённость), мониторинг опасных зон, трекинг персонала (underground personnel tracking), детекция аварийных ситуаций. Особенности: работа при минимальном освещении, термальные камеры для теплового контроля горных выработок, интеграция с SCADA системами.

Мониторинг безопасности в шахте

import numpy as np
import cv2
from ultralytics import YOLO
from dataclasses import dataclass
import time
from typing import Optional

@dataclass
class MineWorkerStatus:
    worker_id: int
    bbox: list
    has_helmet: bool
    has_lamp: bool              # налобный фонарь (обязателен в шахте)
    has_reflective_vest: bool
    has_mask: bool              # противопылевая маска
    in_danger_zone: bool
    proximity_to_machinery: bool
    compliance: bool

class MineMonitoringSystem:
    """
    Система мониторинга безопасности в горнодобывающей отрасли.
    Учёт специфики: низкое освещение, дым/пыль, ИК-камеры.
    Mine Safety Dataset (custom):
    - mining_helmet, lamp_headlamp, reflective_jacket
    - mining_equipment, person, danger_zone_marker
    """
    MINE_PPE_CLASSES = {
        0: 'person',
        1: 'mining_helmet',     # шахтная каска с фонарём
        2: 'headlamp',          # фонарь отдельно
        3: 'reflective_jacket', # световозвращающий жилет
        4: 'dust_mask',
        5: 'safety_boots',
        6: 'no_helmet',         # нарушение
        7: 'no_headlamp',       # нарушение
        8: 'mining_equipment',  # комбайн, погрузчик
    }

    CRITICAL_ZONES = ['roof_instability', 'gas_presence', 'machinery_working']

    def __init__(self, model_path: str,
                  thermal_model_path: Optional[str] = None,
                  danger_zones: Optional[dict] = None,
                  device: str = 'cuda'):
        self.model = YOLO(model_path)
        self.thermal_model = YOLO(thermal_model_path) if thermal_model_path else None
        self.danger_zones = danger_zones or {}
        self.device = device
        self._worker_tracks: dict[int, dict] = {}

    def process_frame(self, frame: np.ndarray,
                       camera_id: str,
                       timestamp: float = None) -> dict:
        if timestamp is None:
            timestamp = time.time()

        # Улучшение для тёмных/пыльных условий
        enhanced = self._enhance_low_light(frame)

        results = self.model.track(
            enhanced, persist=True, conf=0.40,
            verbose=False, tracker='bytetrack.yaml'
        )

        workers_status = []
        violations = []

        if results[0].boxes is None:
            return self._empty_result(camera_id, timestamp)

        persons = {}
        ppe_items = {}

        for box in results[0].boxes:
            cls_id = int(box.cls.item())
            cls_name = self.MINE_PPE_CLASSES.get(cls_id, 'unknown')
            x1, y1, x2, y2 = map(int, box.xyxy[0])
            cx, cy = (x1+x2)//2, (y1+y2)//2
            tid = int(box.id.item()) if box.id is not None else -1

            if cls_name == 'person':
                persons[tid] = {
                    'bbox': [x1,y1,x2,y2], 'center': (cx,cy),
                    'has_helmet': False, 'has_lamp': False,
                    'has_vest': False, 'has_mask': False,
                    'violations': []
                }
            elif cls_name in ('no_helmet', 'no_headlamp'):
                ppe_items[tid] = {
                    'type': cls_name, 'center': (cx, cy),
                    'violation': True
                }
            elif cls_name in ('mining_helmet', 'headlamp',
                               'reflective_jacket', 'dust_mask'):
                ppe_items[tid] = {
                    'type': cls_name, 'center': (cx, cy),
                    'violation': False
                }

        # Ассоциация PPE с работниками
        for item_tid, item_data in ppe_items.items():
            nearest = self._find_nearest(item_data['center'], persons)
            if nearest is None:
                continue
            p = persons[nearest]
            if item_data['violation']:
                p['violations'].append(item_data['type'])
            else:
                t = item_data['type']
                if t == 'mining_helmet':
                    p['has_helmet'] = True
                elif t == 'headlamp':
                    p['has_lamp'] = True
                elif t == 'reflective_jacket':
                    p['has_vest'] = True
                elif t == 'dust_mask':
                    p['has_mask'] = True

        # Оценка каждого работника
        for pid, pdata in persons.items():
            in_danger = self._check_danger_zone(pdata['center'])
            compliance = (pdata['has_helmet'] and
                          not pdata['violations'] and
                          not in_danger)

            ws = MineWorkerStatus(
                worker_id=pid, bbox=pdata['bbox'],
                has_helmet=pdata['has_helmet'],
                has_lamp=pdata['has_lamp'],
                has_reflective_vest=pdata['has_vest'],
                has_mask=pdata['has_mask'],
                in_danger_zone=in_danger,
                proximity_to_machinery=False,
                compliance=compliance
            )
            workers_status.append(ws)

            if not compliance or pdata['violations']:
                violations.append({
                    'worker_id': pid,
                    'bbox': pdata['bbox'],
                    'issues': pdata['violations'] + (['danger_zone'] if in_danger else []),
                    'severity': 'critical' if in_danger else 'warning'
                })

        total = len(workers_status)
        compliant = sum(1 for w in workers_status if w.compliance)

        return {
            'camera_id': camera_id,
            'timestamp': timestamp,
            'workers': [w.__dict__ for w in workers_status],
            'violations': violations,
            'compliance_rate_pct': round(compliant/max(total,1)*100, 1),
            'critical_alert': any(v['severity']=='critical' for v in violations)
        }

    def _enhance_low_light(self, frame: np.ndarray) -> np.ndarray:
        """Улучшение видимости в тёмных условиях шахты"""
        lab = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB)
        clahe = cv2.createCLAHE(clipLimit=4.0, tileGridSize=(8,8))
        lab[:,:,0] = clahe.apply(lab[:,:,0])
        enhanced = cv2.cvtColor(lab, cv2.COLOR_LAB2BGR)
        # Небольшое повышение яркости
        enhanced = cv2.convertScaleAbs(enhanced, alpha=1.2, beta=20)
        return enhanced

    def _check_danger_zone(self, center: tuple) -> bool:
        for zone_id, polygon in self.danger_zones.items():
            poly = np.array(polygon, dtype=np.int32)
            if cv2.pointPolygonTest(poly,
                                     (float(center[0]), float(center[1])), False) >= 0:
                return True
        return False

    def _find_nearest(self, center: tuple, persons: dict) -> Optional[int]:
        min_dist = 150
        nearest = None
        cx, cy = center
        for pid, p in persons.items():
            px, py = p['center']
            dist = np.sqrt((cx-px)**2 + (cy-py)**2)
            if dist < min_dist:
                min_dist = dist
                nearest = pid
        return nearest

    def _empty_result(self, camera_id: str, timestamp: float) -> dict:
        return {
            'camera_id': camera_id, 'timestamp': timestamp,
            'workers': [], 'violations': [],
            'compliance_rate_pct': 100.0, 'critical_alert': False
        }
Условие Detection Rate False Positive
Нормальное освещение (открытый карьер) 93–97% 2–4%
Тёмная шахта (CLAHE enhanced) 82–89% 6–12%
Пыль/дым (коэффициент снижения 15–30%) 72–82% 10–18%
Тепловизор (LWIR, любые условия) 88–94% 3–7%
Задача Срок
PPE мониторинг для карьера (хорошее освещение) 4–6 недель
Подземная шахта + ИК + SCADA интеграция 10–16 недель
Enterprise mine safety платформа 18–28 недель