AI Pipeline Leak Detection System Development

We design and deploy artificial intelligence systems: from prototype to production-ready solutions. Our team combines expertise in machine learning, data engineering and MLOps to make AI work not in the lab, but in real business.
Showing 1 of 1 servicesAll 1566 services
AI Pipeline Leak Detection System Development
Medium
~1-2 weeks
FAQ
AI Development Areas
AI Solution Development Stages
Latest works
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    823
  • image_logo-aider_0.jpg
    AIDER company logo development
    762
  • image_crm_chasseurs_493_0.webp
    CRM development for Chasseurs
    848

Development of an AI system for pipeline leak detection

An oil or gas leak in a pipeline is an environmental disaster and financial losses. A one-hour detection delay at a flow rate of 500 m³/h equals 500 cubic meters of product leaking into the ground. An AI system reduces detection time from hours to minutes using acoustics, pressure, and flow metering.

Leak detection methods

Comparison of methods:

Method Detection time Minimal leakage Localization
Balance (expense) 5-30 min 1-3% of the flow ±500 m
Pressure drop 1-5 min 0.5% ±100 m
Acoustic seconds 0.1% ±10 m
Negative shock wave 30-60 s 0.5% ±50 m
Fiber Optic DAS seconds 0.01% ±1 m

Balance sheet method with ML

Input-output imbalance detection:

import numpy as np
import pandas as pd
from scipy.stats import chi2

class PipelineBalanceMonitor:
    def __init__(self, line_id: str, balance_threshold_pct: float = 1.0):
        self.line_id = line_id
        self.threshold = balance_threshold_pct / 100
        self.ewma_balance = None
        self.alpha = 0.1

    def update(self, inlet_flow_m3h: float, outlet_flow_m3h: float,
               line_pack_change: float = 0.0) -> dict:
        """
        Баланс = Вход - Выход - Изменение линейного запаса (compressibility)
        Для газа: line pack меняется при изменении давления.
        """
        balance = inlet_flow_m3h - outlet_flow_m3h - line_pack_change
        balance_pct = balance / (inlet_flow_m3h + 1e-9)

        # EWMA фильтр
        if self.ewma_balance is None:
            self.ewma_balance = balance_pct
        else:
            self.ewma_balance = self.alpha * balance_pct + (1 - self.alpha) * self.ewma_balance

        leak_detected = self.ewma_balance > self.threshold

        return {
            'line_id': self.line_id,
            'instantaneous_balance_pct': round(balance_pct * 100, 3),
            'ewma_balance_pct': round(self.ewma_balance * 100, 3),
            'leak_detected': leak_detected,
            'estimated_leak_rate_m3h': max(0, balance) if leak_detected else 0
        }

Negative Pressure Wave Detection

Physics and algorithm:

def detect_negative_pressure_wave(pressure_sensors: dict,
                                   pipeline_params: dict) -> dict:
    """
    При разрыве трубы → резкое снижение давления в точке разрыва →
    волна пониженного давления распространяется в обе стороны.
    Скорость волны: a = sqrt(K/ρ) ≈ 1000-1300 м/с для жидкостей.

    Зная время прихода волны к двум датчикам → местоположение разрыва.
    """
    wave_speed = pipeline_params.get('pressure_wave_speed_ms', 1100)  # м/с

    # Определяем время прихода волны к каждому датчику
    arrival_times = {}
    for sensor_id, pressure_series in pressure_sensors.items():
        # Ищем резкое падение: производная < -threshold
        dP_dt = np.gradient(pressure_series.values, 1)  # 1-секундные данные
        sharp_drop_indices = np.where(dP_dt < -5)[0]  # > 5 bar/s = ударная волна

        if len(sharp_drop_indices) > 0:
            arrival_times[sensor_id] = sharp_drop_indices[0]

    if len(arrival_times) < 2:
        return {'leak_detected': False}

    # Локализация по двум датчикам A и B
    sensor_ids = list(arrival_times.keys())[:2]
    t_A = arrival_times[sensor_ids[0]]
    t_B = arrival_times[sensor_ids[1]]
    delta_t = abs(t_A - t_B)  # секунды

    # Расстояние от A до точки разрыва
    distance_AB = pipeline_params['sensor_distances'].get(
        (sensor_ids[0], sensor_ids[1]), 5000  # м между датчиками
    )
    dist_from_A = (distance_AB - wave_speed * delta_t) / 2
    dist_from_A = max(0, min(dist_from_A, distance_AB))

    return {
        'leak_detected': True,
        'leak_location_m_from_sensor_A': round(dist_from_A, 0),
        'confidence': 'high',
        'delta_t_seconds': delta_t
    }

Acoustic monitoring

Acoustic sensor signal processing:

from scipy import signal
import librosa

def analyze_acoustic_signal(audio_signal: np.ndarray,
                              sampling_rate: int = 44100) -> dict:
    """
    Утечка производит характерный акустический сигнал:
    - Широкополосный шум 100-10000 Гц
    - Характеристика зависит от давления и размера отверстия
    """
    # Фильтрация: полосовой фильтр 500-5000 Гц (диапазон утечек)
    b, a = signal.butter(4, [500, 5000], btype='band', fs=sampling_rate)
    filtered = signal.filtfilt(b, a, audio_signal)

    # Признаки
    rms = np.sqrt(np.mean(filtered**2))
    kurtosis = np.mean((filtered - filtered.mean())**4) / (np.std(filtered)**4 + 1e-9)

    # Спектральные признаки
    freqs, psd = signal.welch(filtered, fs=sampling_rate, nperseg=1024)
    band_energy = np.trapz(psd[(freqs >= 1000) & (freqs <= 3000)], freqs[(freqs >= 1000) & (freqs <= 3000)])
    total_energy = np.trapz(psd, freqs)
    leak_band_ratio = band_energy / (total_energy + 1e-9)

    # Классификация: сравнение с шаблонами нормального шума трубопровода
    leak_score = rms * 0.4 + leak_band_ratio * 0.4 + min(1, kurtosis / 10) * 0.2

    return {
        'rms_amplitude': float(rms),
        'kurtosis': float(kurtosis),
        'leak_band_ratio': float(leak_band_ratio),
        'leak_score': float(leak_score),
        'leak_detected': leak_score > 0.6
    }

Fusion detection

Combination of methods:

def fuse_leak_detections(balance_result: dict,
                          npw_result: dict,
                          acoustic_result: dict) -> dict:
    """
    Каждый метод имеет свои ложные срабатывания.
    Голосование с весами по надёжности метода.
    """
    votes = 0
    total_weight = 0
    location_estimates = []

    if balance_result.get('leak_detected'):
        votes += 0.4
        location_estimates.append(balance_result.get('estimated_location'))
    total_weight += 0.4

    if npw_result.get('leak_detected'):
        votes += 0.4
        location_estimates.append(npw_result.get('leak_location_m_from_sensor_A'))
    total_weight += 0.4

    if acoustic_result.get('leak_detected'):
        votes += 0.2
        location_estimates.append(acoustic_result.get('acoustic_location_m'))
    total_weight += 0.2

    confidence = votes / total_weight
    best_location = min(location_estimates, key=lambda x: 0 if x is None else 1) if location_estimates else None

    return {
        'leak_confirmed': confidence >= 0.4,  # хотя бы один надёжный метод
        'confidence': round(confidence, 2),
        'estimated_location_m': best_location,
        'alert_level': 'critical' if confidence > 0.8 else 'warning'
    }

Integration with SCADA and dispatcher: OPC-UA connector for Emerson DeltaV, Honeywell Experion, and ABB 800xA. Upon leak detection, automatic closing of section valves via SCADA (Emergency Shutdown). SMS/push notification to the dispatcher.

Deadlines: Balance method + alerts + SCADA connector — 3-4 weeks. NPW localization, acoustics, fusion detection, automatic valve closure — 3-4 months.