AI-based IoT device fault detection system
An IoT device itself can be a source of problems: incorrect readings due to sensor degradation, battery drain, firmware failure, or an unstable communication channel. Diagnosing a device malfunction is different from detecting a data anomaly—it's important to determine whether the problem is in the physical world or the device itself.
Diagnostic signs of device malfunction
Device-level issue signatures:
device_fault_patterns = {
'sensor_stuck': {
'signature': 'одно значение повторяется N раз подряд',
'detection': 'std(last_N) ≈ 0',
'examples': ['температура 25.0°C последние 24 часа', 'давление 1013.25 hPa без изменений']
},
'sensor_drift': {
'signature': 'медленный монотонный тренд без физической причины',
'detection': 'slope значительный, но остальные датчики зоны стабильны',
'examples': ['CO2 датчик дрейфует на 50ppm/сутки без людей в помещении']
},
'bit_flip': {
'signature': 'одиночные аномальные значения, потом возврат к норме',
'detection': 'spike duration = 1 reading, isolation forest score',
'examples': ['температура 127°C (0x7F) на 1 секунду']
},
'connectivity_issue': {
'signature': 'пропуски в данных, регулярные или случайные',
'detection': 'gap analysis в timestamp последовательности',
'examples': ['пакеты теряются каждые N минут = CRON-джоб конфликт']
},
'power_degradation': {
'signature': 'нарастающие пропуски + увеличение времени ответа',
'detection': 'temporal pattern of gaps + RSSI снижение',
'examples': ['батарейный датчик теряет пакеты при заряде < 20%']
}
}
Detection of stuck-value and noise-floor
Algorithm for detecting a "frozen" sensor:
import numpy as np
import pandas as pd
def detect_stuck_sensor(readings: pd.Series,
window: int = 20,
variance_threshold: float = 1e-6) -> dict:
"""
Датчик завис: стандартное отклонение за последние N значений близко к 0.
Учитываем допустимый шум (quantization noise): для 12-bit ADC ≈ 0.01% диапазона.
"""
if len(readings) < window:
return {'status': 'insufficient_data'}
recent = readings.tail(window)
variance = recent.var()
unique_values = recent.nunique()
stuck_by_variance = variance < variance_threshold
stuck_by_unique = unique_values == 1
# Мягкий критерий: < 3 уникальных значений за 20 измерений (квантование)
low_variation = unique_values <= 2 and window >= 20
return {
'stuck_detected': stuck_by_variance or stuck_by_unique,
'low_variation_warning': low_variation,
'variance': float(variance),
'unique_values_in_window': int(unique_values),
'action': 'sensor_inspection' if stuck_by_variance else None
}
def detect_noise_floor_anomaly(readings: pd.Series,
expected_noise_std: float) -> dict:
"""
Слишком тихий датчик: шум ниже физического минимума = застывший или сглаженный.
Слишком шумный: std резко выросла = деградация датчика или помехи.
"""
recent_std = readings.tail(60).std()
noise_ratio = recent_std / (expected_noise_std + 1e-9)
if noise_ratio < 0.1:
return {'anomaly': 'too_quiet', 'noise_ratio': noise_ratio,
'action': 'check_if_sensor_stuck_or_filtered'}
elif noise_ratio > 10:
return {'anomaly': 'too_noisy', 'noise_ratio': noise_ratio,
'action': 'check_grounding_and_power_supply'}
return {'anomaly': None, 'noise_ratio': round(noise_ratio, 2)}
Data gap analysis
Classification of missing data patterns:
from scipy.stats import chi2_contingency
def analyze_data_gaps(timestamps: pd.DatetimeIndex,
expected_interval_seconds: int = 60) -> dict:
"""
Пропуски могут быть:
- Случайные: проблемы радиоканала
- Регулярные: конкретное время = ОС обновляется, перегрев в полдень
- Нарастающие: батарея садится
"""
actual_intervals = timestamps.to_series().diff().dt.total_seconds().dropna()
# Пропуски = интервал > 1.5 × expected
gaps = actual_intervals[actual_intervals > expected_interval_seconds * 1.5]
gap_ratio = len(gaps) / len(actual_intervals)
# Тест на регулярность пропусков (по часам суток)
gap_timestamps = timestamps[actual_intervals[actual_intervals > expected_interval_seconds * 1.5].index]
hours_of_gaps = gap_timestamps.hour
# Chi-square: равномерны ли пропуски по часам?
hour_counts = pd.Series(hours_of_gaps).value_counts().reindex(range(24), fill_value=0)
_, p_value = chi2_contingency([hour_counts.values, np.full(24, len(gap_timestamps)/24)])[:2]
periodic_gaps = p_value < 0.05 # пропуски сконцентрированы в определённое время
# Тренд: нарастают ли пропуски со временем?
if len(gaps) > 5:
x = np.arange(len(gaps))
trend_slope = np.polyfit(x, gaps.values, 1)[0]
else:
trend_slope = 0.0
return {
'gap_ratio': round(gap_ratio, 3),
'total_gaps': len(gaps),
'periodic_gaps': periodic_gaps,
'peak_gap_hours': hours_of_gaps.value_counts().head(3).index.tolist() if len(hours_of_gaps) > 0 else [],
'increasing_trend': trend_slope > 5, # пропуски нарастают
'fault_type': (
'battery_degradation' if trend_slope > 10 else
'periodic_interference' if periodic_gaps else
'random_connectivity' if gap_ratio > 0.05 else
'normal'
)
}
Multi-device diagnostics
Cluster analysis of device behavior:
from sklearn.ensemble import IsolationForest
def fleet_device_health_check(device_metrics: pd.DataFrame) -> pd.DataFrame:
"""
Проверяем все устройства в группе — находим аутсайдеров.
Признаки: gap_ratio, stuck_events_count, snr_avg, battery_level_trend.
Устройства с аномальным поведением относительно группы = кандидаты на замену.
"""
feature_cols = [
'gap_ratio_7d',
'stuck_events_7d',
'rssi_avg',
'battery_decline_rate', # % в день
'error_frame_rate',
'reading_frequency_deviation' # отклонение частоты от expected
]
X = device_metrics[feature_cols].fillna(0)
model = IsolationForest(contamination=0.1, random_state=42)
device_metrics['anomaly_score'] = -model.fit_predict(X)
device_metrics['health_label'] = np.where(
device_metrics['anomaly_score'] == -1, 'faulty_candidate', 'healthy'
)
return device_metrics.sort_values('anomaly_score', ascending=False)
Remote Diagnostics and OTA Update: When diagnosing a software fault, an automatic OTA reboot or firmware update (via AWS IoT Jobs, Azure Device Twins) is performed. A hardware fault is reported to field service management. Integration with ServiceNow and Salesforce Field Service.
Timeframe: Stuck-value + gap analysis + basic device health dashboard — 2-3 weeks. Fleet anomaly detection, periodic fault classification, OTA trigger, service management integration — 6-8 weeks.







