AI Equipment Anomaly Detection System Development

We design and deploy artificial intelligence systems: from prototype to production-ready solutions. Our team combines expertise in machine learning, data engineering and MLOps to make AI work not in the lab, but in real business.
Showing 1 of 1 servicesAll 1566 services
AI Equipment Anomaly Detection System Development
Medium
~1-2 weeks
FAQ
AI Development Areas
AI Solution Development Stages
Latest works
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    823
  • image_logo-aider_0.jpg
    AIDER company logo development
    762
  • image_crm_chasseurs_493_0.webp
    CRM development for Chasseurs
    848

Development of an AI system for detecting equipment anomalies

Equipment anomaly detection is a task where false negatives (missed failures) are more costly than false positives (unnecessary checking). The system architecture is built around this priority: multiple detectors with different methods, consensus for high specificity, and early warning for critical assets.

Multi-level detection architecture

Anomaly Detection Levels:

Level Method Delay Type of anomaly
L1: Threshold Static thresholds according to ISO/GOST ms Gross violations
L2: Statistical EWMA, CUSUM, 3σ rules With Slow drift
L3: ML Unsupervised Isolation Forest, Autoencoder min Multidimensional patterns
L4: Supervised XGBoost on marked failures min Known failure types
L5: Physics Model of normal asset behavior h Deviation from the physical model

Combination of levels: L1/L2 – for immediate alerts, L3/L4 – for early diagnostics, L5 – for long-term trend.

Feature Engineering for Equipment

Temporal and frequency characteristics:

import numpy as np
from scipy import stats, signal

def extract_equipment_features(raw_signal, sampling_rate=1000):
    """
    Многодоменные признаки из сырого сигнала датчика (вибрация/ток/давление)
    """
    # Временная область
    features = {
        'rms': np.sqrt(np.mean(raw_signal**2)),
        'peak': np.max(np.abs(raw_signal)),
        'crest_factor': np.max(np.abs(raw_signal)) / np.sqrt(np.mean(raw_signal**2)),
        'kurtosis': stats.kurtosis(raw_signal),
        'skewness': stats.skew(raw_signal),
        'peak_to_peak': np.ptp(raw_signal),
        'shape_factor': np.sqrt(np.mean(raw_signal**2)) / np.mean(np.abs(raw_signal))
    }

    # Частотная область
    freqs, psd = signal.welch(raw_signal, fs=sampling_rate, nperseg=512)
    total_power = np.trapz(psd, freqs)

    # Энергия в диапазонах (Гц)
    bands = [(0, 100), (100, 500), (500, 2000), (2000, 5000)]
    for low, high in bands:
        mask = (freqs >= low) & (freqs < high)
        band_power = np.trapz(psd[mask], freqs[mask])
        features[f'band_power_{low}_{high}'] = band_power / total_power

    # Доминирующая частота
    features['dominant_freq'] = freqs[np.argmax(psd)]
    features['spectral_centroid'] = np.sum(freqs * psd) / np.sum(psd)

    return features

Delta signs (changes over time):

def compute_delta_features(current_features, baseline_features, trend_features_7d):
    """
    Важно не абсолютное значение, а отклонение от нормы конкретного актива
    """
    deltas = {}
    for key in current_features:
        if key in baseline_features:
            deltas[f'{key}_delta_abs'] = current_features[key] - baseline_features[key]
            if baseline_features[key] != 0:
                deltas[f'{key}_delta_pct'] = (
                    (current_features[key] - baseline_features[key]) /
                    abs(baseline_features[key]) * 100
                )

        # Тренд за 7 дней
        if key in trend_features_7d:
            deltas[f'{key}_trend_7d'] = trend_features_7d[key]  # slope из linregress

    return deltas

Unsupervised Anomaly Detection

Isolation Forest with seasonal adaptation:

from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import pandas as pd

class EquipmentAnomalyDetector:
    def __init__(self, contamination=0.02):
        self.scaler = StandardScaler()
        self.model = IsolationForest(
            contamination=contamination,
            n_estimators=200,
            random_state=42
        )
        self.baseline_built = False

    def fit_baseline(self, normal_operation_features: pd.DataFrame):
        """
        Обучение на исторически нормальных данных конкретного актива
        Минимум 30 дней нормальной работы
        """
        X = self.scaler.fit_transform(normal_operation_features)
        self.model.fit(X)
        self.baseline_built = True

        # Порог: 5-й перцентиль аномальных скоров на нормальных данных
        scores = self.model.score_samples(X)
        self.threshold = np.percentile(scores, 5)

    def detect(self, current_features: dict) -> dict:
        if not self.baseline_built:
            return {'status': 'no_baseline', 'anomaly': False}

        X = self.scaler.transform([list(current_features.values())])
        score = self.model.score_samples(X)[0]
        is_anomaly = score < self.threshold

        return {
            'anomaly_score': float(-score),  # инвертируем: выше = аномальнее
            'anomaly': bool(is_anomaly),
            'severity': self._classify_severity(-score)
        }

    def _classify_severity(self, anomaly_score):
        if anomaly_score > 0.8: return 'critical'
        if anomaly_score > 0.6: return 'high'
        if anomaly_score > 0.4: return 'medium'
        return 'low'

Autoencoder for multisensory data

LSTM Autoencoder - Reconstruction Error as an Anomaly:

import torch
import torch.nn as nn

class SensorAutoencoder(nn.Module):
    def __init__(self, input_dim, hidden_dim=32, latent_dim=8, seq_len=60):
        super().__init__()
        self.encoder = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        self.bottleneck = nn.Linear(hidden_dim, latent_dim)
        self.decoder = nn.LSTM(latent_dim, hidden_dim, batch_first=True)
        self.output_layer = nn.Linear(hidden_dim, input_dim)

    def forward(self, x):
        # x: (batch, seq_len, input_dim)
        enc_out, _ = self.encoder(x)
        z = self.bottleneck(enc_out[:, -1, :])  # последний hidden state

        # Разворачиваем latent для декодера
        z_expanded = z.unsqueeze(1).repeat(1, x.shape[1], 1)
        dec_out, _ = self.decoder(z_expanded)
        reconstruction = self.output_layer(dec_out)
        return reconstruction

def detect_anomaly_autoencoder(model, sensor_window, threshold_percentile=95):
    """
    threshold устанавливается из нормальных данных валидационного набора
    """
    with torch.no_grad():
        reconstruction = model(sensor_window)
        error = torch.mean((sensor_window - reconstruction)**2, dim=[1, 2])
    return error.item()

Alerts and noise suppression

Multi-level consensus:

def consensus_anomaly_decision(l2_statistical, l3_isolation_forest,
                                l4_supervised, asset_criticality):
    """
    Разные детекторы ловят разные типы аномалий.
    Консенсус снижает false positive rate.
    """
    votes = sum([
        1 if l2_statistical['anomaly'] else 0,
        1 if l3_isolation_forest['anomaly'] else 0,
        1 if l4_supervised.get('anomaly', False) else 0
    ])

    severity = max(
        l2_statistical.get('severity', 'none'),
        l3_isolation_forest.get('severity', 'none'),
        key=lambda s: {'none': 0, 'low': 1, 'medium': 2, 'high': 3, 'critical': 4}[s]
    )

    # Критичное оборудование: достаточно 1 детектора
    # Стандартное: нужно 2 детектора
    threshold = 1 if asset_criticality >= 4 else 2

    return {
        'alert': votes >= threshold,
        'confidence': votes / 3,
        'severity': severity,
        'detectors_triggered': votes
    }

Deduplication of correlated alerts: A single pump failure → a pressure + temperature + flow anomaly. Grouping by time (±5 min) and topology (adjacent assets) forms a single incident with a set of signals.

Model drift and overfitting

Detection of concept drift:

from scipy.stats import ks_2samp

def detect_model_drift(recent_features, baseline_features, p_threshold=0.01):
    """
    KS-тест: если распределение признаков сдвинулось — модель устарела
    Триггер для переобучения на новых данных
    """
    drift_features = []
    for col in recent_features.columns:
        stat, p_value = ks_2samp(baseline_features[col], recent_features[col])
        if p_value < p_threshold:
            drift_features.append(col)

    drift_ratio = len(drift_features) / len(recent_features.columns)
    return {
        'drift_detected': drift_ratio > 0.3,
        'drifted_features': drift_features,
        'drift_ratio': drift_ratio
    }

Automatic pipeline: weekly drift check → when drift_ratio &gt; 0.3, retraining is triggered on the last 60 days of data (if they are marked as normal according to the operational log).

Timeframe: Isolation Forest + base features + EWMA alerting + dashboard — 3-4 weeks. Autoencoder with LSTM, multisensor consensus, drift detection, automatic retraining — 2-3 months.