AI-прогнозирование временных рядов IoT-данных
IoT-временные ряды обладают специфическими характеристиками: высокая частота дискретизации (секунды-минуты), шум и артефакты датчиков, неполные данные при сбоях связи, дрейф датчиков. ML-прогнозирование в IoT-контексте требует надёжной предобработки и лёгких моделей, пригодных для edge-развёртывания.
Особенности IoT временных рядов
Проблемы качества данных:
def assess_data_quality(ts_df):
issues = {}
# Пропуски
issues['missing_rate'] = ts_df.isna().mean()
# Выбросы (датчиковые артефакты)
z_scores = (ts_df - ts_df.mean()) / ts_df.std()
issues['outlier_rate'] = (np.abs(z_scores) > 5).mean() # >5σ = артефакт датчика
# Stuck sensor: значение не меняется длительное время
issues['stuck_periods'] = detect_constant_windows(ts_df, min_duration=10)
# Drift: постепенное смещение без физической причины
long_trend = np.polyfit(range(len(ts_df)), ts_df.fillna(method='ffill'), 1)[0]
issues['drift_per_day'] = long_trend * 86400 / ts_df.index.freq.nanos * 1e9
return issues
Многомерные зависимости: IoT-устройства работают в связанных системах. Температура в цехе зависит от внешней температуры, загрузки оборудования, открытых ворот. Univariate прогноз игнорирует эти зависимости.
Предобработка
Обработка пропусков:
def handle_missing_iot(ts, method='interpolate'):
"""
Краткие пропуски (< 5 минут): линейная интерполяция
Длинные пропуски (> 5 минут): forward fill + flag столбец
"""
short_gaps = ts.copy()
long_gap_mask = ts.isna()
# Интерполяция коротких пропусков
short_gaps = short_gaps.interpolate(method='time', limit=5) # max 5 точек
# Для длинных: forward fill + маркер
short_gaps = short_gaps.fillna(method='ffill')
# Флаг для модели: была ли точка реальной или заполненной
is_imputed = long_gap_mask.astype(int)
return short_gaps, is_imputed
Нормализация с учётом дрейфа:
class AdaptiveNormalizer:
"""
Скользящая нормализация: mean и std пересчитываются на rolling window
Важно для производственных датчиков с сезонным дрейфом
"""
def __init__(self, window_size=24*60): # 24 часа в минутах
self.window = window_size
def transform(self, ts):
rolling_mean = ts.rolling(self.window).mean()
rolling_std = ts.rolling(self.window).std()
normalized = (ts - rolling_mean) / (rolling_std + 1e-8)
return normalized, rolling_mean, rolling_std
Модели для IoT прогнозирования
SARIMA для регулярных рядов: Подходит для данных с выраженной суточной/недельной сезонностью и стабильным паттерном (потребление электроэнергии, температура).
LightGBM с lag-фичами:
def create_lag_features(ts, lags=[1, 5, 15, 60, 1440]):
"""
Lag-фичи: значения в предыдущие моменты времени
Для минутных данных: 1, 5, 15, 60 минут назад + сутки назад
"""
df = pd.DataFrame({'value': ts})
for lag in lags:
df[f'lag_{lag}'] = ts.shift(lag)
# Rolling features
for window in [5, 15, 60]:
df[f'rolling_mean_{window}'] = ts.rolling(window).mean()
df[f'rolling_std_{window}'] = ts.rolling(window).std()
# Временные фичи
df['hour'] = df.index.hour
df['minute'] = df.index.minute
df['dayofweek'] = df.index.dayofweek
return df.dropna()
LSTM для сложных паттернов:
import torch.nn as nn
class IoTLSTMForecaster(nn.Module):
def __init__(self, n_features, hidden_size=64, forecast_horizon=12):
super().__init__()
self.lstm = nn.LSTM(n_features, hidden_size, batch_first=True)
self.dropout = nn.Dropout(0.2)
self.fc = nn.Linear(hidden_size, forecast_horizon)
def forward(self, x):
# x: (batch, seq_len, n_features)
lstm_out, (h_n, _) = self.lstm(x)
dropped = self.dropout(h_n[-1])
return self.fc(dropped)
Chronos (Amazon) / TimesFM (Google): Предобученные foundation models для временных рядов. Zero-shot прогноз без обучения на доменных данных → полезно для rare sensors или при ограниченной истории.
Edge Forecasting
Ограниченные ресурсы on-device:
- RAM: 256 KB - 4 MB
- CPU: 32-bit MCU или ARM Cortex-A
- Без GPU, без PyTorch
Легковесные модели:
# TinyML подход: квантизация + прунинг
# ONNX Runtime для edge
import onnxruntime as ort
# Экспорт обученной модели
torch.onnx.export(model, dummy_input, 'forecaster_edge.onnx',
opset_version=11)
# Оптимизация: квантизация INT8
from onnxruntime.quantization import quantize_dynamic
quantize_dynamic('forecaster_edge.onnx', 'forecaster_edge_quant.onnx')
# На edge-устройстве
session = ort.InferenceSession('forecaster_edge_quant.onnx')
forecast = session.run(None, {'input': recent_data})[0]
Экономия трафика: Edge-прогноз: отправка в облако только при аномалии vs. baseline прогноза. Экономия трафика 80-95%.
Anomaly Detection в IoT потоке
Online anomaly detection:
class StreamingAnomalyDetector:
def __init__(self, window=200, threshold=3.5):
self.history = deque(maxlen=window)
self.threshold = threshold
def update(self, value):
if len(self.history) > 20:
mean = np.mean(self.history)
std = np.std(self.history)
z_score = abs(value - mean) / (std + 1e-8)
is_anomaly = z_score > self.threshold
else:
is_anomaly = False
self.history.append(value)
return is_anomaly
Контекстуальные аномалии: Значение нормально по абсолютному уровню, но аномально для данного времени суток/дня недели. Residual-based detection: значение − прогноз > порога.
Масштабирование: 10,000+ датчиков
Hierarchical forecasting: Группировка датчиков по типу, локации, промышленной системе → агрегированные прогнозы сверху вниз (top-down reconciliation) более устойчивы.
AutoML для множества сенсоров:
# statsforecast: параллельное прогнозирование тысяч временных рядов
from statsforecast import StatsForecast
from statsforecast.models import AutoARIMA, AutoETS, CrostonOptimized
sf = StatsForecast(
models=[AutoARIMA(), AutoETS()],
freq='5T', # 5-минутные данные
n_jobs=-1 # параллельно на всех CPU
)
forecasts = sf.forecast(h=12, df=all_sensors_df)
Сроки: ingestion pipeline (MQTT/Kafka) + предобработка + baseline LightGBM прогнозы + Grafana дашборд — 4-5 недель. LSTM/TFT модели, edge ONNX inference, streaming anomaly detection, AutoML для fleet — 3-4 месяца.







