AI Equipment Failure Prediction System Development

We design and deploy artificial intelligence systems: from prototype to production-ready solutions. Our team combines expertise in machine learning, data engineering and MLOps to make AI work not in the lab, but in real business.
Showing 1 of 1 servicesAll 1566 services
AI Equipment Failure Prediction System Development
Complex
~2-4 weeks
FAQ
AI Development Areas
AI Solution Development Stages
Latest works
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    823
  • image_logo-aider_0.jpg
    AIDER company logo development
    762
  • image_crm_chasseurs_493_0.webp
    CRM development for Chasseurs
    848

Development of an AI-based system for predicting equipment failures

Failure Prediction differs from anomaly detection in that its goal is not "something is wrong right now," but "a failure will occur in N days." This requires a degradation model, RUL (Remaining Useful Life) assessment, and working with unbalanced data—failures are always fewer than normal operation.

Failure Prediction System Architecture

Difference from traditional monitoring:

Мониторинг: текущее состояние → норма/аномалия
Failure Prediction: тренд деградации → вероятность отказа в горизонте T

The system includes three interconnected blocks:

  • Degradation model - tracks the progressive deterioration of the condition
  • RUL Estimator — estimates the remaining resource in days/cycles
  • Failure Classifier - binary classification: failure in the next 7/14/30 days

Data and markup

Building a dataset from service history:

import pandas as pd
import numpy as np
from datetime import timedelta

def build_failure_prediction_dataset(sensor_data: pd.DataFrame,
                                      maintenance_log: pd.DataFrame,
                                      prediction_window_days=14):
    """
    Ключевая задача: разметить каждую точку наблюдения.
    - За prediction_window_days до отказа → label=1
    - В остальное время → label=0
    """
    dataset_rows = []

    for asset_id in sensor_data['asset_id'].unique():
        asset_sensors = sensor_data[sensor_data['asset_id'] == asset_id].sort_values('timestamp')
        asset_failures = maintenance_log[
            (maintenance_log['asset_id'] == asset_id) &
            (maintenance_log['event_type'] == 'failure')
        ]['timestamp'].tolist()

        for _, row in asset_sensors.iterrows():
            ts = row['timestamp']
            label = 0
            days_to_failure = None

            # Ближайший будущий отказ
            future_failures = [f for f in asset_failures if f > ts]
            if future_failures:
                next_failure = min(future_failures)
                days_to_failure = (next_failure - ts).days
                if days_to_failure <= prediction_window_days:
                    label = 1

            dataset_rows.append({
                **row.to_dict(),
                'label': label,
                'days_to_failure': days_to_failure
            })

    return pd.DataFrame(dataset_rows)

Imbalance problem: Typical ratio: 1 failure per 50-200 days of normal operation. Approaches:

  • scale_pos_weight in XGBoost/LightGBM
  • SMOTE-Tomek (generation of synthetic examples of class 1)
  • Cost-sensitive learning with error cost matrix

RUL (Remaining Useful Life) models

1. Regression on days_to_failure:

from xgboost import XGBRegressor
from sklearn.model_selection import TimeSeriesSplit

def train_rul_model(features_df, target_col='days_to_failure'):
    """
    Обучаем только на временных окнах перед отказом:
    нет смысла учить "до отказа 365 дней" — слишком много неопределённости
    """
    # Только данные в пределах 90 дней до отказа
    train_data = features_df[features_df[target_col] <= 90].dropna(subset=[target_col])

    X = train_data.drop(columns=[target_col, 'label', 'timestamp', 'asset_id'])
    y = np.log1p(train_data[target_col])  # log-трансформация стабилизирует обучение

    # Временная кросс-валидация
    tscv = TimeSeriesSplit(n_splits=5)
    model = XGBRegressor(
        n_estimators=300,
        learning_rate=0.05,
        max_depth=6,
        subsample=0.8
    )
    model.fit(X, y)
    return model  # предсказание: np.expm1(model.predict(X_new))

2. Survival Analysis:

from lifelines import WeibullAFTFitter

def train_survival_model(asset_history):
    """
    Weibull AFT: моделирует время до отказа как распределение.
    Преимущество: корректно обрабатывает censored данные (актив ещё работает).
    """
    # T = время до события (отказ или конец наблюдения)
    # E = 1 если событие произошло, 0 если censored
    survival_data = prepare_survival_data(asset_history)

    model = WeibullAFTFitter()
    model.fit(
        survival_data,
        duration_col='lifetime_days',
        event_col='failure_occurred',
        ancillary=True  # моделируем и scale, и shape
    )
    return model

def predict_failure_probability(model, asset_features, horizon_days=30):
    """
    P(отказ в течение horizon_days)
    """
    survival_at_horizon = model.predict_survival_function(
        asset_features, times=[horizon_days]
    )
    return 1 - survival_at_horizon.values[0][0]

LSTM with multi-task learning

Joint prediction of RUL and degradation mode:

import torch
import torch.nn as nn

class FailurePredictionLSTM(nn.Module):
    """
    Многозадачная модель:
    - Выход 1: RUL (регрессия)
    - Выход 2: вероятность отказа в 7/14/30 дней (классификация)
    - Выход 3: стадия деградации (0-нормально, 1-начало, 2-прогрессивная, 3-критично)
    """
    def __init__(self, input_dim, hidden_dim=128, num_layers=2):
        super().__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers,
                             batch_first=True, dropout=0.2)
        self.attention = nn.MultiheadAttention(hidden_dim, num_heads=4, batch_first=True)

        # Три головы
        self.rul_head = nn.Sequential(
            nn.Linear(hidden_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )
        self.failure_head = nn.Sequential(
            nn.Linear(hidden_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 3),   # 7/14/30 дней
            nn.Sigmoid()
        )
        self.stage_head = nn.Linear(hidden_dim, 4)  # 4 стадии

    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        attn_out, _ = self.attention(lstm_out, lstm_out, lstm_out)
        pooled = attn_out.mean(dim=1)

        return {
            'rul': self.rul_head(pooled),
            'failure_prob': self.failure_head(pooled),
            'stage': self.stage_head(pooled)
        }

Calibration and decision threshold

Probability Calibration:

from sklearn.calibration import CalibratedClassifierCV
from sklearn.isotonic import IsotonicRegression

def calibrate_failure_probabilities(raw_probs, true_labels):
    """
    Исторически: если модель говорит P(failure)=0.7,
    реальная частота отказов должна быть ≈70%.
    Isotonic regression калибрует эту связь.
    """
    calibrator = IsotonicRegression(out_of_bounds='clip')
    calibrator.fit(raw_probs, true_labels)
    return calibrator

Cost matrix for the optimal threshold:

def find_optimal_threshold(probabilities, true_labels,
                            cost_false_negative=100,   # пропустить отказ = дорого
                            cost_false_positive=5):    # лишняя проверка = дешево
    """
    Для критичного оборудования: смещаем порог вниз (агрессивная детекция)
    """
    thresholds = np.arange(0.05, 0.95, 0.01)
    min_cost = float('inf')
    best_threshold = 0.5

    for t in thresholds:
        predictions = (probabilities >= t).astype(int)
        fn = np.sum((predictions == 0) & (true_labels == 1))
        fp = np.sum((predictions == 1) & (true_labels == 0))
        total_cost = fn * cost_false_negative + fp * cost_false_positive

        if total_cost < min_cost:
            min_cost = total_cost
            best_threshold = t

    return best_threshold

Integration with CMMS and maintenance planning

Automatic generation of forecast schedule:

def generate_maintenance_schedule(assets_predictions, maintenance_capacity):
    """
    Объединяем прогнозы по всем активам → расписание ТО с учётом ресурсов.
    Балансируем: не выдавать все ТО на один день.
    """
    schedule = []

    # Сортируем по P(failure) × criticality
    prioritized = sorted(
        assets_predictions,
        key=lambda x: x['failure_prob_14d'] * x['criticality'],
        reverse=True
    )

    maintenance_queue = []
    for asset in prioritized:
        if asset['failure_prob_14d'] > 0.4:
            recommended_date = asset['optimal_maintenance_date']
            maintenance_queue.append({
                'asset_id': asset['asset_id'],
                'date': recommended_date,
                'priority': 'urgent' if asset['failure_prob_7d'] > 0.5 else 'planned',
                'estimated_hours': asset['estimated_labor_hours'],
                'parts_required': asset['spare_parts']
            })

    # Выравнивание нагрузки по дням
    return level_load(maintenance_queue, maintenance_capacity)

System Performance Metrics:

  • Lead time — the average number of days of warning before actual failure
  • Precision@7days — the proportion of correct alerts 7 days before failure
  • Coverage — the percentage of failures predicted at least 3 days in advance
  • False alarm rate - extra trips per month to the asset

Deadlines: XGBoost Failure Classifier + basic RUL + alerting + CMMS webhook — 4-5 weeks. LSTM multi-task model, survival analysis, threshold optimization, automatic maintenance scheduling, drift monitoring — 3-4 months.