AI IoT Sensor Data Analysis System for Manufacturing

We design and deploy artificial intelligence systems: from prototype to production-ready solutions. Our team combines expertise in machine learning, data engineering and MLOps to make AI work not in the lab, but in real business.
Showing 1 of 1 servicesAll 1566 services
AI IoT Sensor Data Analysis System for Manufacturing
Medium
~1-2 weeks
FAQ
AI Development Areas
AI Solution Development Stages
Latest works
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    823
  • image_logo-aider_0.jpg
    AIDER company logo development
    762
  • image_crm_chasseurs_493_0.webp
    CRM development for Chasseurs
    848

Developing an AI system for analyzing IoT sensor data in production

The industrial IoT—hundreds and thousands of temperature, pressure, vibration, current, and speed sensors—generates a continuous data stream. An ML system transforms this data stream into production intelligence: real-time product quality, anomaly detection, and process parameter optimization.

Industrial IoT stack

Automation Levels (ISA-95):

  • Level 1: Sensors/Actuators
  • Level 2: Control (PLC, SCADA)
  • Level 3: MES (Manufacturing Execution System)
  • Level 4: ERP

ML analytics works at levels 2-3, using data from level 1.

Protocols:

protocols = {
    'OPC-UA': 'стандарт Industry 4.0, discovery + security + историк',
    'Modbus RTU/TCP': 'legacy оборудование, простые регистры',
    'PROFIBUS/PROFINET': 'Siemens PCS 7, S7 ПЛК',
    'EtherNet/IP': 'Allen-Bradley, Rockwell',
    'MQTT': 'lightweight для IoT gateway → cloud',
    'AMQP': 'корпоративные интеграции'
}

Real-Time Data Pipeline

Stream processing:

from kafka import KafkaConsumer, KafkaProducer
import json

class ManufacturingDataPipeline:
    def __init__(self, kafka_bootstrap='kafka:9092'):
        self.consumer = KafkaConsumer(
            'sensor-raw',
            bootstrap_servers=kafka_bootstrap,
            value_deserializer=lambda m: json.loads(m.decode()),
            group_id='analytics-group'
        )
        self.producer = KafkaProducer(
            bootstrap_servers=kafka_bootstrap,
            value_serializer=lambda v: json.dumps(v).encode()
        )

    def process_stream(self):
        for message in self.consumer:
            sensor_data = message.value

            # 1. Валидация и очистка
            cleaned = self.validate_and_clean(sensor_data)

            # 2. Feature extraction (на 1-минутных окнах)
            if self.should_extract_features(cleaned):
                features = self.extract_features(cleaned)

                # 3. Inference
                anomaly_score = anomaly_model.predict([features])[0]
                quality_prediction = quality_model.predict([features])[0]

                # 4. Публикация результатов
                self.producer.send('analytics-output', {
                    'machine_id': cleaned['machine_id'],
                    'timestamp': cleaned['timestamp'],
                    'anomaly_score': float(anomaly_score),
                    'quality_prediction': float(quality_prediction),
                    'features': features
                })

Multisensory analysis of the production line

Sensor correlation:

def analyze_sensor_correlations(sensor_matrix, window_minutes=30):
    """
    Корреляционная матрица датчиков:
    - Потеря корреляции между коррелированными датчиками = аномалия
    - Внезапная корреляция между некоррелированными = нетипичный режим
    """
    # Ожидаемые корреляции (из нормального режима работы)
    baseline_corr = compute_baseline_correlation(normal_operation_data)

    # Текущая корреляция
    current_corr = sensor_matrix.corr()

    # Отклонение корреляционной структуры
    corr_deviation = np.abs(current_corr - baseline_corr).mean().mean()

    return corr_deviation  # высокое значение = нетипичный режим

Process Variable Interaction:

def detect_process_regime_change(current_state, baseline_pca):
    """
    PCA на нормализованных переменных процесса
    Выход за пределы "нормального операционного пространства" = аномалия
    """
    # Проецируем текущее состояние в пространство PCA
    current_pca = baseline_pca.transform([current_state])

    # SPE (Squared Prediction Error): ошибка реконструкции
    reconstructed = baseline_pca.inverse_transform(current_pca)
    spe = np.sum((current_state - reconstructed[0])**2)

    # T²: расстояние в PCA пространстве от центра
    t2 = np.sum(current_pca**2 / baseline_pca.explained_variance_)

    return {'spe': spe, 't2': t2,
            'anomaly': spe > spe_ucl or t2 > t2_ucl}

Quality Prediction - Soft Sensor

Online quality forecast:

def train_quality_soft_sensor(process_params, lab_results, n_lags=5):
    """
    Входные: технологические параметры (онлайн, каждую минуту)
    Выходные: качество продукта (лабораторный анализ, каждый час)

    Temporal alignment: лаг между параметрами и качеством = время в процессе
    """
    # Временные лаги для компенсации времени реакции процесса
    lagged_features = create_lagged_features(process_params, n_lags)

    # Объединение с лабораторными данными (alignment по времени)
    aligned = align_timeseries(lagged_features, lab_results, tolerance='15min')

    model = GradientBoostingRegressor(n_estimators=200)
    model.fit(aligned[feature_cols], aligned['quality_measure'])

    return model

# Использование: каждую минуту → мгновенный прогноз качества
# Не ждать лабораторного анализа через час

Adaptation to process drift:

class AdaptiveQualityModel:
    """
    Производственные процессы дрейфуют:
    - Износ инструмента
    - Изменение партии сырья
    - Сезонные температурные изменения

    Онлайн-обновление модели при получении новых лабораторных данных
    """
    def __init__(self, base_model, update_rate=0.1):
        self.model = base_model
        self.update_rate = update_rate
        self.recent_samples = deque(maxlen=200)

    def predict(self, features):
        return self.model.predict([features])[0]

    def update(self, features, true_quality):
        self.recent_samples.append((features, true_quality))

        if len(self.recent_samples) % 10 == 0:  # перобучение каждые 10 новых точек
            X_recent = [s[0] for s in self.recent_samples]
            y_recent = [s[1] for s in self.recent_samples]
            # Incremental fit с высоким весом новых данных
            self.model.fit(X_recent, y_recent)

Anomaly Prioritization

Multi-level alerts:

def prioritize_manufacturing_alerts(anomalies, asset_criticality, production_impact):
    """
    Не все аномалии одинаково важны
    Приоритет = аномальность × критичность актива × текущая загрузка
    """
    scored_alerts = []
    for anomaly in anomalies:
        priority_score = (
            anomaly['severity'] *
            asset_criticality[anomaly['machine_id']] *
            production_impact.get(anomaly['machine_id'], 1.0)
        )
        scored_alerts.append({**anomaly, 'priority': priority_score})

    return sorted(scored_alerts, key=lambda x: x['priority'], reverse=True)

Suppression of correlated alerts: One pump failure → hundreds of pressure, temperature, and flow alerts throughout the pipeline. Root cause suppression: identify the primary alert and group the others.

Integration with SCADA and MES

Historian Integration (OSIsoft PI):

from osisoft.pidevclub.piwebapi.api import DataApi

def read_pi_data(pi_server_url, tag_names, start_time, end_time):
    """
    PI System: стандарт в нефтегазе, энергетике, хим. промышленности
    PIWebAPI → REST доступ к историческим и real-time данным
    """
    # Чтение данных тегов из PI historian
    pass

# Также: OSIsoft AF (Asset Framework) для контекстной информации

MES Reporting: The ML system publishes aggregated KPIs to MES:

  • OEE (Overall Equipment Effectiveness) for each line
  • Quality rate: the proportion of products that are normal vs. rejected
  • Performance: actual vs. planned performance

Deadlines: OPC-UA/Modbus collector, Kafka pipeline, multisensor anomaly, dashboard — 4-5 weeks. Soft quality sensor, adaptive model, process regime detection, MES integration, PI historian — 3-4 months.