Development of an AI system for detecting equipment anomalies
Equipment anomaly detection is a task where false negatives (missed failures) are more costly than false positives (unnecessary checking). The system architecture is built around this priority: multiple detectors with different methods, consensus for high specificity, and early warning for critical assets.
Multi-level detection architecture
Anomaly Detection Levels:
| Level | Method | Delay | Type of anomaly |
|---|---|---|---|
| L1: Threshold | Static thresholds according to ISO/GOST | ms | Gross violations |
| L2: Statistical | EWMA, CUSUM, 3σ rules | With | Slow drift |
| L3: ML Unsupervised | Isolation Forest, Autoencoder | min | Multidimensional patterns |
| L4: Supervised | XGBoost on marked failures | min | Known failure types |
| L5: Physics | Model of normal asset behavior | h | Deviation from the physical model |
Combination of levels: L1/L2 – for immediate alerts, L3/L4 – for early diagnostics, L5 – for long-term trend.
Feature Engineering for Equipment
Temporal and frequency characteristics:
import numpy as np
from scipy import stats, signal
def extract_equipment_features(raw_signal, sampling_rate=1000):
"""
Многодоменные признаки из сырого сигнала датчика (вибрация/ток/давление)
"""
# Временная область
features = {
'rms': np.sqrt(np.mean(raw_signal**2)),
'peak': np.max(np.abs(raw_signal)),
'crest_factor': np.max(np.abs(raw_signal)) / np.sqrt(np.mean(raw_signal**2)),
'kurtosis': stats.kurtosis(raw_signal),
'skewness': stats.skew(raw_signal),
'peak_to_peak': np.ptp(raw_signal),
'shape_factor': np.sqrt(np.mean(raw_signal**2)) / np.mean(np.abs(raw_signal))
}
# Частотная область
freqs, psd = signal.welch(raw_signal, fs=sampling_rate, nperseg=512)
total_power = np.trapz(psd, freqs)
# Энергия в диапазонах (Гц)
bands = [(0, 100), (100, 500), (500, 2000), (2000, 5000)]
for low, high in bands:
mask = (freqs >= low) & (freqs < high)
band_power = np.trapz(psd[mask], freqs[mask])
features[f'band_power_{low}_{high}'] = band_power / total_power
# Доминирующая частота
features['dominant_freq'] = freqs[np.argmax(psd)]
features['spectral_centroid'] = np.sum(freqs * psd) / np.sum(psd)
return features
Delta signs (changes over time):
def compute_delta_features(current_features, baseline_features, trend_features_7d):
"""
Важно не абсолютное значение, а отклонение от нормы конкретного актива
"""
deltas = {}
for key in current_features:
if key in baseline_features:
deltas[f'{key}_delta_abs'] = current_features[key] - baseline_features[key]
if baseline_features[key] != 0:
deltas[f'{key}_delta_pct'] = (
(current_features[key] - baseline_features[key]) /
abs(baseline_features[key]) * 100
)
# Тренд за 7 дней
if key in trend_features_7d:
deltas[f'{key}_trend_7d'] = trend_features_7d[key] # slope из linregress
return deltas
Unsupervised Anomaly Detection
Isolation Forest with seasonal adaptation:
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import pandas as pd
class EquipmentAnomalyDetector:
def __init__(self, contamination=0.02):
self.scaler = StandardScaler()
self.model = IsolationForest(
contamination=contamination,
n_estimators=200,
random_state=42
)
self.baseline_built = False
def fit_baseline(self, normal_operation_features: pd.DataFrame):
"""
Обучение на исторически нормальных данных конкретного актива
Минимум 30 дней нормальной работы
"""
X = self.scaler.fit_transform(normal_operation_features)
self.model.fit(X)
self.baseline_built = True
# Порог: 5-й перцентиль аномальных скоров на нормальных данных
scores = self.model.score_samples(X)
self.threshold = np.percentile(scores, 5)
def detect(self, current_features: dict) -> dict:
if not self.baseline_built:
return {'status': 'no_baseline', 'anomaly': False}
X = self.scaler.transform([list(current_features.values())])
score = self.model.score_samples(X)[0]
is_anomaly = score < self.threshold
return {
'anomaly_score': float(-score), # инвертируем: выше = аномальнее
'anomaly': bool(is_anomaly),
'severity': self._classify_severity(-score)
}
def _classify_severity(self, anomaly_score):
if anomaly_score > 0.8: return 'critical'
if anomaly_score > 0.6: return 'high'
if anomaly_score > 0.4: return 'medium'
return 'low'
Autoencoder for multisensory data
LSTM Autoencoder - Reconstruction Error as an Anomaly:
import torch
import torch.nn as nn
class SensorAutoencoder(nn.Module):
def __init__(self, input_dim, hidden_dim=32, latent_dim=8, seq_len=60):
super().__init__()
self.encoder = nn.LSTM(input_dim, hidden_dim, batch_first=True)
self.bottleneck = nn.Linear(hidden_dim, latent_dim)
self.decoder = nn.LSTM(latent_dim, hidden_dim, batch_first=True)
self.output_layer = nn.Linear(hidden_dim, input_dim)
def forward(self, x):
# x: (batch, seq_len, input_dim)
enc_out, _ = self.encoder(x)
z = self.bottleneck(enc_out[:, -1, :]) # последний hidden state
# Разворачиваем latent для декодера
z_expanded = z.unsqueeze(1).repeat(1, x.shape[1], 1)
dec_out, _ = self.decoder(z_expanded)
reconstruction = self.output_layer(dec_out)
return reconstruction
def detect_anomaly_autoencoder(model, sensor_window, threshold_percentile=95):
"""
threshold устанавливается из нормальных данных валидационного набора
"""
with torch.no_grad():
reconstruction = model(sensor_window)
error = torch.mean((sensor_window - reconstruction)**2, dim=[1, 2])
return error.item()
Alerts and noise suppression
Multi-level consensus:
def consensus_anomaly_decision(l2_statistical, l3_isolation_forest,
l4_supervised, asset_criticality):
"""
Разные детекторы ловят разные типы аномалий.
Консенсус снижает false positive rate.
"""
votes = sum([
1 if l2_statistical['anomaly'] else 0,
1 if l3_isolation_forest['anomaly'] else 0,
1 if l4_supervised.get('anomaly', False) else 0
])
severity = max(
l2_statistical.get('severity', 'none'),
l3_isolation_forest.get('severity', 'none'),
key=lambda s: {'none': 0, 'low': 1, 'medium': 2, 'high': 3, 'critical': 4}[s]
)
# Критичное оборудование: достаточно 1 детектора
# Стандартное: нужно 2 детектора
threshold = 1 if asset_criticality >= 4 else 2
return {
'alert': votes >= threshold,
'confidence': votes / 3,
'severity': severity,
'detectors_triggered': votes
}
Deduplication of correlated alerts: A single pump failure → a pressure + temperature + flow anomaly. Grouping by time (±5 min) and topology (adjacent assets) forms a single incident with a set of signals.
Model drift and overfitting
Detection of concept drift:
from scipy.stats import ks_2samp
def detect_model_drift(recent_features, baseline_features, p_threshold=0.01):
"""
KS-тест: если распределение признаков сдвинулось — модель устарела
Триггер для переобучения на новых данных
"""
drift_features = []
for col in recent_features.columns:
stat, p_value = ks_2samp(baseline_features[col], recent_features[col])
if p_value < p_threshold:
drift_features.append(col)
drift_ratio = len(drift_features) / len(recent_features.columns)
return {
'drift_detected': drift_ratio > 0.3,
'drifted_features': drift_features,
'drift_ratio': drift_ratio
}
Automatic pipeline: weekly drift check → when drift_ratio > 0.3, retraining is triggered on the last 60 days of data (if they are marked as normal according to the operational log).
Timeframe: Isolation Forest + base features + EWMA alerting + dashboard — 3-4 weeks. Autoencoder with LSTM, multisensor consensus, drift detection, automatic retraining — 2-3 months.







