AI IoT Time Series Forecasting System

We design and deploy artificial intelligence systems: from prototype to production-ready solutions. Our team combines expertise in machine learning, data engineering and MLOps to make AI work not in the lab, but in real business.
Showing 1 of 1 servicesAll 1566 services
AI IoT Time Series Forecasting System
Medium
~2-4 weeks
FAQ
AI Development Areas
AI Solution Development Stages
Latest works
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    823
  • image_logo-aider_0.jpg
    AIDER company logo development
    762
  • image_crm_chasseurs_493_0.webp
    CRM development for Chasseurs
    848

AI-прогнозирование временных рядов IoT-данных

IoT-временные ряды обладают специфическими характеристиками: высокая частота дискретизации (секунды-минуты), шум и артефакты датчиков, неполные данные при сбоях связи, дрейф датчиков. ML-прогнозирование в IoT-контексте требует надёжной предобработки и лёгких моделей, пригодных для edge-развёртывания.

Особенности IoT временных рядов

Проблемы качества данных:

def assess_data_quality(ts_df):
    issues = {}

    # Пропуски
    issues['missing_rate'] = ts_df.isna().mean()

    # Выбросы (датчиковые артефакты)
    z_scores = (ts_df - ts_df.mean()) / ts_df.std()
    issues['outlier_rate'] = (np.abs(z_scores) > 5).mean()  # >5σ = артефакт датчика

    # Stuck sensor: значение не меняется длительное время
    issues['stuck_periods'] = detect_constant_windows(ts_df, min_duration=10)

    # Drift: постепенное смещение без физической причины
    long_trend = np.polyfit(range(len(ts_df)), ts_df.fillna(method='ffill'), 1)[0]
    issues['drift_per_day'] = long_trend * 86400 / ts_df.index.freq.nanos * 1e9

    return issues

Многомерные зависимости: IoT-устройства работают в связанных системах. Температура в цехе зависит от внешней температуры, загрузки оборудования, открытых ворот. Univariate прогноз игнорирует эти зависимости.

Предобработка

Обработка пропусков:

def handle_missing_iot(ts, method='interpolate'):
    """
    Краткие пропуски (< 5 минут): линейная интерполяция
    Длинные пропуски (> 5 минут): forward fill + flag столбец
    """
    short_gaps = ts.copy()
    long_gap_mask = ts.isna()

    # Интерполяция коротких пропусков
    short_gaps = short_gaps.interpolate(method='time', limit=5)  # max 5 точек

    # Для длинных: forward fill + маркер
    short_gaps = short_gaps.fillna(method='ffill')

    # Флаг для модели: была ли точка реальной или заполненной
    is_imputed = long_gap_mask.astype(int)

    return short_gaps, is_imputed

Нормализация с учётом дрейфа:

class AdaptiveNormalizer:
    """
    Скользящая нормализация: mean и std пересчитываются на rolling window
    Важно для производственных датчиков с сезонным дрейфом
    """
    def __init__(self, window_size=24*60):  # 24 часа в минутах
        self.window = window_size

    def transform(self, ts):
        rolling_mean = ts.rolling(self.window).mean()
        rolling_std = ts.rolling(self.window).std()
        normalized = (ts - rolling_mean) / (rolling_std + 1e-8)
        return normalized, rolling_mean, rolling_std

Модели для IoT прогнозирования

SARIMA для регулярных рядов: Подходит для данных с выраженной суточной/недельной сезонностью и стабильным паттерном (потребление электроэнергии, температура).

LightGBM с lag-фичами:

def create_lag_features(ts, lags=[1, 5, 15, 60, 1440]):
    """
    Lag-фичи: значения в предыдущие моменты времени
    Для минутных данных: 1, 5, 15, 60 минут назад + сутки назад
    """
    df = pd.DataFrame({'value': ts})
    for lag in lags:
        df[f'lag_{lag}'] = ts.shift(lag)

    # Rolling features
    for window in [5, 15, 60]:
        df[f'rolling_mean_{window}'] = ts.rolling(window).mean()
        df[f'rolling_std_{window}'] = ts.rolling(window).std()

    # Временные фичи
    df['hour'] = df.index.hour
    df['minute'] = df.index.minute
    df['dayofweek'] = df.index.dayofweek

    return df.dropna()

LSTM для сложных паттернов:

import torch.nn as nn

class IoTLSTMForecaster(nn.Module):
    def __init__(self, n_features, hidden_size=64, forecast_horizon=12):
        super().__init__()
        self.lstm = nn.LSTM(n_features, hidden_size, batch_first=True)
        self.dropout = nn.Dropout(0.2)
        self.fc = nn.Linear(hidden_size, forecast_horizon)

    def forward(self, x):
        # x: (batch, seq_len, n_features)
        lstm_out, (h_n, _) = self.lstm(x)
        dropped = self.dropout(h_n[-1])
        return self.fc(dropped)

Chronos (Amazon) / TimesFM (Google): Предобученные foundation models для временных рядов. Zero-shot прогноз без обучения на доменных данных → полезно для rare sensors или при ограниченной истории.

Edge Forecasting

Ограниченные ресурсы on-device:

  • RAM: 256 KB - 4 MB
  • CPU: 32-bit MCU или ARM Cortex-A
  • Без GPU, без PyTorch

Легковесные модели:

# TinyML подход: квантизация + прунинг
# ONNX Runtime для edge
import onnxruntime as ort

# Экспорт обученной модели
torch.onnx.export(model, dummy_input, 'forecaster_edge.onnx',
                  opset_version=11)

# Оптимизация: квантизация INT8
from onnxruntime.quantization import quantize_dynamic
quantize_dynamic('forecaster_edge.onnx', 'forecaster_edge_quant.onnx')

# На edge-устройстве
session = ort.InferenceSession('forecaster_edge_quant.onnx')
forecast = session.run(None, {'input': recent_data})[0]

Экономия трафика: Edge-прогноз: отправка в облако только при аномалии vs. baseline прогноза. Экономия трафика 80-95%.

Anomaly Detection в IoT потоке

Online anomaly detection:

class StreamingAnomalyDetector:
    def __init__(self, window=200, threshold=3.5):
        self.history = deque(maxlen=window)
        self.threshold = threshold

    def update(self, value):
        if len(self.history) > 20:
            mean = np.mean(self.history)
            std = np.std(self.history)
            z_score = abs(value - mean) / (std + 1e-8)
            is_anomaly = z_score > self.threshold
        else:
            is_anomaly = False

        self.history.append(value)
        return is_anomaly

Контекстуальные аномалии: Значение нормально по абсолютному уровню, но аномально для данного времени суток/дня недели. Residual-based detection: значение − прогноз > порога.

Масштабирование: 10,000+ датчиков

Hierarchical forecasting: Группировка датчиков по типу, локации, промышленной системе → агрегированные прогнозы сверху вниз (top-down reconciliation) более устойчивы.

AutoML для множества сенсоров:

# statsforecast: параллельное прогнозирование тысяч временных рядов
from statsforecast import StatsForecast
from statsforecast.models import AutoARIMA, AutoETS, CrostonOptimized

sf = StatsForecast(
    models=[AutoARIMA(), AutoETS()],
    freq='5T',  # 5-минутные данные
    n_jobs=-1   # параллельно на всех CPU
)
forecasts = sf.forecast(h=12, df=all_sensors_df)

Сроки: ingestion pipeline (MQTT/Kafka) + предобработка + baseline LightGBM прогнозы + Grafana дашборд — 4-5 недель. LSTM/TFT модели, edge ONNX inference, streaming anomaly detection, AutoML для fleet — 3-4 месяца.