An AI-based system for monitoring emissions and environmental safety in chemical production
Chemical production is one of the largest sources of industrial emissions. Continuous Emission Monitoring System (CEMS) with an AI layer addresses three objectives: compliance with regulations (maximum permissible emissions/maximum permissible emissions), early detection of emergency emissions, and optimization of process conditions to reduce emissions.
CEMS architecture with AI
Data sources:
cems_architecture = {
'analyzers': {
'NOx': 'хемилюминесцентный, диапазон 0-1000 ppm',
'SO2': 'ультрафиолетовый флуоресцентный, 0-2000 ppm',
'CO': 'недисперсионный инфракрасный (NDIR)',
'CO2': 'NDIR, 0-20% объёмных',
'HCl': 'NDIR для хлорорганических производств',
'PM2.5/PM10': 'оптический рассеиватель + бета-абсорбция',
'VOC': 'ПИД (Photoionization Detector)'
},
'flow_meter': {
'type': 'ультразвуковой или термический',
'use': 'расчёт массового потока выбросов (г/с, т/год)'
},
'scada_process': {
'parameters': 'температура, давление, расход сырья, режимы реактора',
'use': 'корреляция выбросов с технологическими параметрами'
}
}
Monitoring compliance with regulations
Calculation of standard indicators:
import pandas as pd
import numpy as np
def check_regulatory_compliance(emissions_data: pd.DataFrame,
permits: dict,
regulation: str = 'RU_ND') -> dict:
"""
РФ: нормативы ПДВ (Предельно Допустимые Выбросы) по каждому источнику.
ЕС: EU IED (Industrial Emissions Directive) — BAT Associated Emission Levels.
"""
violations = []
for pollutant, permit_value in permits.items():
if pollutant not in emissions_data.columns:
continue
# Мгновенное превышение
instantaneous = emissions_data[pollutant].iloc[-1]
if instantaneous > permit_value:
violations.append({
'pollutant': pollutant,
'type': 'instantaneous_exceedance',
'current_value': round(instantaneous, 3),
'permit': permit_value,
'exceedance_factor': round(instantaneous / permit_value, 2)
})
# Среднесуточное (НОРМАТИВНОЕ ТРЕБОВАНИЕ: не превышать СДВ более 3 дней в году)
daily_avg = emissions_data[pollutant].resample('D').mean()
if len(daily_avg) > 0:
days_exceeded = (daily_avg > permit_value).sum()
if days_exceeded > 0:
violations.append({
'pollutant': pollutant,
'type': 'daily_average_exceeded',
'days_exceeded': int(days_exceeded),
'avg_exceedance': round(daily_avg[daily_avg > permit_value].mean(), 3)
})
# Годовой суммарный выброс
if regulation == 'RU_ND' and 'annual_limit_tonnes' in permits:
annual_actual = emissions_data[pollutant].sum() * 3600 * 1e-6 # г/с → т/год (упрощение)
annual_limit = permits['annual_limit_tonnes'].get(pollutant, float('inf'))
if annual_actual > annual_limit * 0.9:
violations.append({
'pollutant': pollutant,
'type': 'annual_limit_approaching',
'current_tonnes': round(annual_actual, 2),
'annual_limit': annual_limit,
'utilization_pct': round(annual_actual / annual_limit * 100, 1)
})
return {
'compliance': len(violations) == 0,
'violations': violations,
'regulatory_status': 'compliant' if not violations else 'violation'
}
Detection of emergency emissions
Abnormal Emission - Rate of Increase:
class EmissionSpikeDetector:
def __init__(self, pollutants: list, ewma_alpha: float = 0.1):
self.baselines = {p: {'mean': None, 'std': None} for p in pollutants}
self.alpha = ewma_alpha
self.history = {p: [] for p in pollutants}
def update_and_detect(self, timestamp, readings: dict) -> dict:
alerts = []
for pollutant, value in readings.items():
if pollutant not in self.baselines:
continue
self.history[pollutant].append(value)
if len(self.history[pollutant]) < 30:
# Накапливаем baseline
if len(self.history[pollutant]) == 30:
self.baselines[pollutant]['mean'] = np.mean(self.history[pollutant])
self.baselines[pollutant]['std'] = np.std(self.history[pollutant])
continue
mean = self.baselines[pollutant]['mean']
std = self.baselines[pollutant]['std']
# Z-score
z = (value - mean) / (std + 1e-9)
# EWMA обновление (медленно, чтобы не подстроиться под аварию)
if abs(z) < 2: # обновляем baseline только в нормальном режиме
self.baselines[pollutant]['mean'] = (
self.alpha * value + (1 - self.alpha) * mean
)
# Скорость нарастания (ROC)
if len(self.history[pollutant]) >= 5:
rate_of_change = (value - self.history[pollutant][-5]) / 4 # за 4 интервала
if abs(z) > 4 or rate_of_change > std * 3:
alerts.append({
'pollutant': pollutant,
'value': value,
'z_score': round(z, 1),
'rate_of_change': round(rate_of_change, 3),
'severity': 'emergency' if abs(z) > 6 else 'alert',
'action': 'emergency_shutdown_check' if abs(z) > 6 else 'investigate_source'
})
return {'timestamp': str(timestamp), 'alerts': alerts, 'healthy': len(alerts) == 0}
Predictive emission model
Soft sensor for unattended sources:
from sklearn.ensemble import GradientBoostingRegressor
def train_emission_prediction_model(process_data: pd.DataFrame,
emission_data: pd.DataFrame,
pollutant: str) -> GradientBoostingRegressor:
"""
Предсказываем выброс по технологическим параметрам.
Использование: 1) мониторинг при отказе анализатора, 2) оптимизация режима.
"""
process_features = [
'reactor_temperature', 'feed_flow_rate', 'pressure',
'oxygen_content', 'fuel_type_encoded',
'load_pct', 'catalyst_activity'
]
combined = process_data.merge(emission_data[['timestamp', pollutant]],
on='timestamp', how='inner')
combined = combined.dropna(subset=process_features + [pollutant])
model = GradientBoostingRegressor(
n_estimators=200,
max_depth=5,
learning_rate=0.05
)
model.fit(combined[process_features], combined[pollutant])
return model # использовать: model.predict([[T, F, P, O2, fuel, load, cat]])
Integration: Roshydromet GIS (data publication), Rosprirodnadzor electronic reporting, SAP EHS (Environment Health Safety) for incidents. CEMS data, according to Rosprirodnadzor Order 17-P, must be stored for 5 years.
Timeframe: CEMS connection + compliance checks + spike detector + dashboard — 3-4 weeks. Predictive emissions model, emission reduction optimization, SAP EHS integration, and regulatory reporting — 2-3 months.







