System Development мониторинга конвейерной линии через видеоаналитику
Мониторинг конвейера через видеоаналитику решает задачи, которые трудно автоматизировать другими сенсорами: обнаружение застрявших предметов, контроль скорости движения ленты, детекция нештатных ситуаций (завал, разлив, выпадение продукта), подсчёт производительности. Преимущество перед точечными датчиками — один поток видео даёт информацию о всей зоне наблюдения.
Задачи видеоаналитики конвейера
Детекция остановки ленты: оптический поток кадров. Если motion vector близок к нулю в зоне ленты — остановка.
import cv2
import numpy as np
class ConveyorMonitor:
def __init__(self, config: dict):
self.belt_roi = config['belt_roi'] # Region of Interest: зона ленты
self.normal_speed_range = config['belt_speed_range'] # пикс/сек
self.alarm_callbacks = []
# Для оптического потока
self.prev_frame = None
self.lk_params = dict(
winSize=(21, 21),
maxLevel=3,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)
)
def process_frame(self, frame: np.ndarray) -> dict:
roi_frame = self._crop_roi(frame, self.belt_roi)
gray = cv2.cvtColor(roi_frame, cv2.COLOR_BGR2GRAY)
analysis = {'timestamp': get_timestamp()}
if self.prev_frame is not None:
# Optical flow для измерения скорости ленты
flow = cv2.calcOpticalFlowFarneback(
self.prev_frame, gray,
None, 0.5, 3, 15, 3, 5, 1.2, 0
)
# Средний вектор движения в направлении ленты
mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
mean_speed = float(np.mean(mag))
analysis['belt_speed'] = mean_speed
analysis['belt_status'] = self._check_speed(mean_speed)
# Обнаружение аномальных объектов (застревание)
analysis['obstruction'] = self._detect_obstruction(gray, flow)
self.prev_frame = gray.copy()
return analysis
def _check_speed(self, speed: float) -> str:
min_speed, max_speed = self.normal_speed_range
if speed < min_speed * 0.1:
return 'STOPPED'
elif speed < min_speed * 0.5:
return 'SLOW'
elif speed > max_speed * 1.5:
return 'FAST'
return 'NORMAL'
Подсчёт продукции на конвейере
class ProductCounter:
def __init__(self, count_line_y: int, model_path: str):
self.detector = YOLO(model_path)
self.count_line_y = count_line_y
self.tracker = ByteTracker()
self.counted_ids = set()
self.count = 0
def process(self, frame: np.ndarray) -> int:
detections = self.detector(frame, conf=0.5)
tracks = self.tracker.update(detections[0])
for track in tracks:
cx = int((track.bbox[0] + track.bbox[2]) / 2)
cy = int((track.bbox[1] + track.bbox[3]) / 2)
# Если объект пересёк счётную линию
if (track.track_id not in self.counted_ids and
abs(cy - self.count_line_y) < 10):
self.count += 1
self.counted_ids.add(track.track_id)
return self.count
Детекция нештатных ситуаций
class AnomalyDetector:
def __init__(self):
# PatchCore для обнаружения аномалий на ленте
from anomalib.models import PatchCore
self.model = PatchCore.load('conveyor_anomaly.pt')
# Детектор разлива (жидкости, сыпучих материалов)
self.spill_detector = YOLO('spill_detector.pt')
def check_belt_surface(self, roi: np.ndarray) -> dict:
anomaly_result = self.model.predict(roi)
issues = []
if anomaly_result.pred_score > 0.7:
issues.append({
'type': 'surface_anomaly',
'score': float(anomaly_result.pred_score),
'region': self._get_anomaly_region(anomaly_result.anomaly_map)
})
# Детекция разлива
spill_results = self.spill_detector(roi, conf=0.4)
for box in spill_results[0].boxes:
issues.append({
'type': 'spill',
'bbox': box.xyxy[0].tolist(),
'confidence': float(box.conf)
})
return {
'has_issues': len(issues) > 0,
'issues': issues,
'severity': 'CRITICAL' if issues else 'OK'
}
Тепловая карта продуктивности
Визуализация производительности за смену: количество единиц в час, динамика скорости конвейера, инциденты и их длительность. Dashboard на Grafana с метриками из InfluxDB (time-series база данных).
Integration с MES/SCADA
Результаты аналитики передаются в Manufacturing Execution System через OPC-UA или REST API. Автоматическая остановка линии при критических инцидентах через PLC.
| Метрика | Значение |
|---|---|
| Точность детекции остановки | 99.5%+ |
| Точность подсчёта продукции | 98–99.5% |
| Latency детекции инцидента | < 500 ms |
| Ложные срабатывания | < 0.5% |
| Масштаб системы | Срок |
|---|---|
| 1 линия, базовый мониторинг | 4–6 недель |
| 3–5 линий, детекция аномалий | 8–12 недель |
| Цех целиком, MES интеграция | 14–20 недель |







