Development of an AI system for pipeline leak detection
An oil or gas leak in a pipeline is an environmental disaster and financial losses. A one-hour detection delay at a flow rate of 500 m³/h equals 500 cubic meters of product leaking into the ground. An AI system reduces detection time from hours to minutes using acoustics, pressure, and flow metering.
Leak detection methods
Comparison of methods:
| Method | Detection time | Minimal leakage | Localization |
|---|---|---|---|
| Balance (expense) | 5-30 min | 1-3% of the flow | ±500 m |
| Pressure drop | 1-5 min | 0.5% | ±100 m |
| Acoustic | seconds | 0.1% | ±10 m |
| Negative shock wave | 30-60 s | 0.5% | ±50 m |
| Fiber Optic DAS | seconds | 0.01% | ±1 m |
Balance sheet method with ML
Input-output imbalance detection:
import numpy as np
import pandas as pd
from scipy.stats import chi2
class PipelineBalanceMonitor:
def __init__(self, line_id: str, balance_threshold_pct: float = 1.0):
self.line_id = line_id
self.threshold = balance_threshold_pct / 100
self.ewma_balance = None
self.alpha = 0.1
def update(self, inlet_flow_m3h: float, outlet_flow_m3h: float,
line_pack_change: float = 0.0) -> dict:
"""
Баланс = Вход - Выход - Изменение линейного запаса (compressibility)
Для газа: line pack меняется при изменении давления.
"""
balance = inlet_flow_m3h - outlet_flow_m3h - line_pack_change
balance_pct = balance / (inlet_flow_m3h + 1e-9)
# EWMA фильтр
if self.ewma_balance is None:
self.ewma_balance = balance_pct
else:
self.ewma_balance = self.alpha * balance_pct + (1 - self.alpha) * self.ewma_balance
leak_detected = self.ewma_balance > self.threshold
return {
'line_id': self.line_id,
'instantaneous_balance_pct': round(balance_pct * 100, 3),
'ewma_balance_pct': round(self.ewma_balance * 100, 3),
'leak_detected': leak_detected,
'estimated_leak_rate_m3h': max(0, balance) if leak_detected else 0
}
Negative Pressure Wave Detection
Physics and algorithm:
def detect_negative_pressure_wave(pressure_sensors: dict,
pipeline_params: dict) -> dict:
"""
При разрыве трубы → резкое снижение давления в точке разрыва →
волна пониженного давления распространяется в обе стороны.
Скорость волны: a = sqrt(K/ρ) ≈ 1000-1300 м/с для жидкостей.
Зная время прихода волны к двум датчикам → местоположение разрыва.
"""
wave_speed = pipeline_params.get('pressure_wave_speed_ms', 1100) # м/с
# Определяем время прихода волны к каждому датчику
arrival_times = {}
for sensor_id, pressure_series in pressure_sensors.items():
# Ищем резкое падение: производная < -threshold
dP_dt = np.gradient(pressure_series.values, 1) # 1-секундные данные
sharp_drop_indices = np.where(dP_dt < -5)[0] # > 5 bar/s = ударная волна
if len(sharp_drop_indices) > 0:
arrival_times[sensor_id] = sharp_drop_indices[0]
if len(arrival_times) < 2:
return {'leak_detected': False}
# Локализация по двум датчикам A и B
sensor_ids = list(arrival_times.keys())[:2]
t_A = arrival_times[sensor_ids[0]]
t_B = arrival_times[sensor_ids[1]]
delta_t = abs(t_A - t_B) # секунды
# Расстояние от A до точки разрыва
distance_AB = pipeline_params['sensor_distances'].get(
(sensor_ids[0], sensor_ids[1]), 5000 # м между датчиками
)
dist_from_A = (distance_AB - wave_speed * delta_t) / 2
dist_from_A = max(0, min(dist_from_A, distance_AB))
return {
'leak_detected': True,
'leak_location_m_from_sensor_A': round(dist_from_A, 0),
'confidence': 'high',
'delta_t_seconds': delta_t
}
Acoustic monitoring
Acoustic sensor signal processing:
from scipy import signal
import librosa
def analyze_acoustic_signal(audio_signal: np.ndarray,
sampling_rate: int = 44100) -> dict:
"""
Утечка производит характерный акустический сигнал:
- Широкополосный шум 100-10000 Гц
- Характеристика зависит от давления и размера отверстия
"""
# Фильтрация: полосовой фильтр 500-5000 Гц (диапазон утечек)
b, a = signal.butter(4, [500, 5000], btype='band', fs=sampling_rate)
filtered = signal.filtfilt(b, a, audio_signal)
# Признаки
rms = np.sqrt(np.mean(filtered**2))
kurtosis = np.mean((filtered - filtered.mean())**4) / (np.std(filtered)**4 + 1e-9)
# Спектральные признаки
freqs, psd = signal.welch(filtered, fs=sampling_rate, nperseg=1024)
band_energy = np.trapz(psd[(freqs >= 1000) & (freqs <= 3000)], freqs[(freqs >= 1000) & (freqs <= 3000)])
total_energy = np.trapz(psd, freqs)
leak_band_ratio = band_energy / (total_energy + 1e-9)
# Классификация: сравнение с шаблонами нормального шума трубопровода
leak_score = rms * 0.4 + leak_band_ratio * 0.4 + min(1, kurtosis / 10) * 0.2
return {
'rms_amplitude': float(rms),
'kurtosis': float(kurtosis),
'leak_band_ratio': float(leak_band_ratio),
'leak_score': float(leak_score),
'leak_detected': leak_score > 0.6
}
Fusion detection
Combination of methods:
def fuse_leak_detections(balance_result: dict,
npw_result: dict,
acoustic_result: dict) -> dict:
"""
Каждый метод имеет свои ложные срабатывания.
Голосование с весами по надёжности метода.
"""
votes = 0
total_weight = 0
location_estimates = []
if balance_result.get('leak_detected'):
votes += 0.4
location_estimates.append(balance_result.get('estimated_location'))
total_weight += 0.4
if npw_result.get('leak_detected'):
votes += 0.4
location_estimates.append(npw_result.get('leak_location_m_from_sensor_A'))
total_weight += 0.4
if acoustic_result.get('leak_detected'):
votes += 0.2
location_estimates.append(acoustic_result.get('acoustic_location_m'))
total_weight += 0.2
confidence = votes / total_weight
best_location = min(location_estimates, key=lambda x: 0 if x is None else 1) if location_estimates else None
return {
'leak_confirmed': confidence >= 0.4, # хотя бы один надёжный метод
'confidence': round(confidence, 2),
'estimated_location_m': best_location,
'alert_level': 'critical' if confidence > 0.8 else 'warning'
}
Integration with SCADA and dispatcher: OPC-UA connector for Emerson DeltaV, Honeywell Experion, and ABB 800xA. Upon leak detection, automatic closing of section valves via SCADA (Emergency Shutdown). SMS/push notification to the dispatcher.
Deadlines: Balance method + alerts + SCADA connector — 3-4 weeks. NPW localization, acoustics, fusion detection, automatic valve closure — 3-4 months.







