Developing an AI system for analyzing IoT sensor data in production
The industrial IoT—hundreds and thousands of temperature, pressure, vibration, current, and speed sensors—generates a continuous data stream. An ML system transforms this data stream into production intelligence: real-time product quality, anomaly detection, and process parameter optimization.
Industrial IoT stack
Automation Levels (ISA-95):
- Level 1: Sensors/Actuators
- Level 2: Control (PLC, SCADA)
- Level 3: MES (Manufacturing Execution System)
- Level 4: ERP
ML analytics works at levels 2-3, using data from level 1.
Protocols:
protocols = {
'OPC-UA': 'стандарт Industry 4.0, discovery + security + историк',
'Modbus RTU/TCP': 'legacy оборудование, простые регистры',
'PROFIBUS/PROFINET': 'Siemens PCS 7, S7 ПЛК',
'EtherNet/IP': 'Allen-Bradley, Rockwell',
'MQTT': 'lightweight для IoT gateway → cloud',
'AMQP': 'корпоративные интеграции'
}
Real-Time Data Pipeline
Stream processing:
from kafka import KafkaConsumer, KafkaProducer
import json
class ManufacturingDataPipeline:
def __init__(self, kafka_bootstrap='kafka:9092'):
self.consumer = KafkaConsumer(
'sensor-raw',
bootstrap_servers=kafka_bootstrap,
value_deserializer=lambda m: json.loads(m.decode()),
group_id='analytics-group'
)
self.producer = KafkaProducer(
bootstrap_servers=kafka_bootstrap,
value_serializer=lambda v: json.dumps(v).encode()
)
def process_stream(self):
for message in self.consumer:
sensor_data = message.value
# 1. Валидация и очистка
cleaned = self.validate_and_clean(sensor_data)
# 2. Feature extraction (на 1-минутных окнах)
if self.should_extract_features(cleaned):
features = self.extract_features(cleaned)
# 3. Inference
anomaly_score = anomaly_model.predict([features])[0]
quality_prediction = quality_model.predict([features])[0]
# 4. Публикация результатов
self.producer.send('analytics-output', {
'machine_id': cleaned['machine_id'],
'timestamp': cleaned['timestamp'],
'anomaly_score': float(anomaly_score),
'quality_prediction': float(quality_prediction),
'features': features
})
Multisensory analysis of the production line
Sensor correlation:
def analyze_sensor_correlations(sensor_matrix, window_minutes=30):
"""
Корреляционная матрица датчиков:
- Потеря корреляции между коррелированными датчиками = аномалия
- Внезапная корреляция между некоррелированными = нетипичный режим
"""
# Ожидаемые корреляции (из нормального режима работы)
baseline_corr = compute_baseline_correlation(normal_operation_data)
# Текущая корреляция
current_corr = sensor_matrix.corr()
# Отклонение корреляционной структуры
corr_deviation = np.abs(current_corr - baseline_corr).mean().mean()
return corr_deviation # высокое значение = нетипичный режим
Process Variable Interaction:
def detect_process_regime_change(current_state, baseline_pca):
"""
PCA на нормализованных переменных процесса
Выход за пределы "нормального операционного пространства" = аномалия
"""
# Проецируем текущее состояние в пространство PCA
current_pca = baseline_pca.transform([current_state])
# SPE (Squared Prediction Error): ошибка реконструкции
reconstructed = baseline_pca.inverse_transform(current_pca)
spe = np.sum((current_state - reconstructed[0])**2)
# T²: расстояние в PCA пространстве от центра
t2 = np.sum(current_pca**2 / baseline_pca.explained_variance_)
return {'spe': spe, 't2': t2,
'anomaly': spe > spe_ucl or t2 > t2_ucl}
Quality Prediction - Soft Sensor
Online quality forecast:
def train_quality_soft_sensor(process_params, lab_results, n_lags=5):
"""
Входные: технологические параметры (онлайн, каждую минуту)
Выходные: качество продукта (лабораторный анализ, каждый час)
Temporal alignment: лаг между параметрами и качеством = время в процессе
"""
# Временные лаги для компенсации времени реакции процесса
lagged_features = create_lagged_features(process_params, n_lags)
# Объединение с лабораторными данными (alignment по времени)
aligned = align_timeseries(lagged_features, lab_results, tolerance='15min')
model = GradientBoostingRegressor(n_estimators=200)
model.fit(aligned[feature_cols], aligned['quality_measure'])
return model
# Использование: каждую минуту → мгновенный прогноз качества
# Не ждать лабораторного анализа через час
Adaptation to process drift:
class AdaptiveQualityModel:
"""
Производственные процессы дрейфуют:
- Износ инструмента
- Изменение партии сырья
- Сезонные температурные изменения
Онлайн-обновление модели при получении новых лабораторных данных
"""
def __init__(self, base_model, update_rate=0.1):
self.model = base_model
self.update_rate = update_rate
self.recent_samples = deque(maxlen=200)
def predict(self, features):
return self.model.predict([features])[0]
def update(self, features, true_quality):
self.recent_samples.append((features, true_quality))
if len(self.recent_samples) % 10 == 0: # перобучение каждые 10 новых точек
X_recent = [s[0] for s in self.recent_samples]
y_recent = [s[1] for s in self.recent_samples]
# Incremental fit с высоким весом новых данных
self.model.fit(X_recent, y_recent)
Anomaly Prioritization
Multi-level alerts:
def prioritize_manufacturing_alerts(anomalies, asset_criticality, production_impact):
"""
Не все аномалии одинаково важны
Приоритет = аномальность × критичность актива × текущая загрузка
"""
scored_alerts = []
for anomaly in anomalies:
priority_score = (
anomaly['severity'] *
asset_criticality[anomaly['machine_id']] *
production_impact.get(anomaly['machine_id'], 1.0)
)
scored_alerts.append({**anomaly, 'priority': priority_score})
return sorted(scored_alerts, key=lambda x: x['priority'], reverse=True)
Suppression of correlated alerts: One pump failure → hundreds of pressure, temperature, and flow alerts throughout the pipeline. Root cause suppression: identify the primary alert and group the others.
Integration with SCADA and MES
Historian Integration (OSIsoft PI):
from osisoft.pidevclub.piwebapi.api import DataApi
def read_pi_data(pi_server_url, tag_names, start_time, end_time):
"""
PI System: стандарт в нефтегазе, энергетике, хим. промышленности
PIWebAPI → REST доступ к историческим и real-time данным
"""
# Чтение данных тегов из PI historian
pass
# Также: OSIsoft AF (Asset Framework) для контекстной информации
MES Reporting: The ML system publishes aggregated KPIs to MES:
- OEE (Overall Equipment Effectiveness) for each line
- Quality rate: the proportion of products that are normal vs. rejected
- Performance: actual vs. planned performance
Deadlines: OPC-UA/Modbus collector, Kafka pipeline, multisensor anomaly, dashboard — 4-5 weeks. Soft quality sensor, adaptive model, process regime detection, MES integration, PI historian — 3-4 months.







