Development of an AI system for automatic incident diagnostics (Root Cause Analysis)
Root Cause Analysis (RCA) is the process of determining the root cause of an incident. Manual RCA in a complex microservice environment takes hours. Automated ML-RCA reduces this process to minutes by analyzing thousands of metrics, logs, and traces simultaneously.
Data sources for RCA
Three Pillars of Observability:
observability_data = {
'metrics': {
'source': 'Prometheus, InfluxDB, Datadog',
'examples': 'CPU, memory, request_rate, error_rate, latency p50/p95/p99',
'granularity': '15s-1min scraping'
},
'logs': {
'source': 'Elasticsearch, Loki',
'examples': 'application errors, stack traces, audit events',
'granularity': 'per-event, structured JSON'
},
'traces': {
'source': 'Jaeger, Zipkin, Tempo',
'examples': 'distributed request flow, span timing, dependencies',
'granularity': 'per-request sampling 1-100%'
}
}
Events and Changes:
- Kubernetes Events: pod restarts, OOMKill, scheduling failures
- Deployment history: CI/CD pipeline events (deployment time and version)
- Infrastructure changes: Terraform applies, auto-scaling events
- External: cloud provider status page, CDN incidents
Correlation-based RCA
Metric Correlation Analysis:
import pandas as pd
import numpy as np
from scipy.stats import pearsonr
def find_correlated_metrics(incident_time, all_metrics, window_minutes=30, threshold=0.7):
"""
Ищем метрики, коррелирующие с индикатором инцидента (error rate)
во временном окне вокруг инцидента
"""
incident_window = all_metrics[
incident_time - pd.Timedelta(minutes=window_minutes):
incident_time + pd.Timedelta(minutes=5)
]
incident_metric = incident_window['error_rate_service_X']
correlations = {}
for metric_name in incident_window.columns:
if metric_name == 'error_rate_service_X':
continue
series = incident_window[metric_name].dropna()
if len(series) < 10:
continue
# Pearson correlation с временным сдвигом (lag correlation)
max_corr = 0
best_lag = 0
for lag in range(-10, 1): # lag от -10 до 0 минут (причина предшествует следствию)
shifted = incident_window['error_rate_service_X'].shift(-lag)
common_idx = series.index.intersection(shifted.dropna().index)
if len(common_idx) < 5:
continue
corr, pval = pearsonr(series[common_idx], shifted[common_idx])
if abs(corr) > abs(max_corr) and pval < 0.05:
max_corr = corr
best_lag = lag
if abs(max_corr) > threshold:
correlations[metric_name] = {'correlation': max_corr, 'lag_minutes': best_lag}
return sorted(correlations.items(), key=lambda x: abs(x[1]['correlation']), reverse=True)
Change Correlation:
def correlate_with_changes(incident_time, change_log, window_hours=24):
"""
Изменения за последние 24 часа до инцидента
Ближайшее по времени изменение = вероятный виновник
"""
recent_changes = change_log[
change_log['timestamp'] > incident_time - pd.Timedelta(hours=window_hours)
]
recent_changes = recent_changes[recent_changes['timestamp'] <= incident_time]
# Сортировка по близости к инциденту
recent_changes['minutes_before_incident'] = (
incident_time - recent_changes['timestamp']
).dt.total_seconds() / 60
return recent_changes.sort_values('minutes_before_incident')
Dependency Graph and Causal Discovery
Automated Dependency Mapping:
import networkx as nx
from collections import defaultdict
class DependencyGraphBuilder:
def __init__(self):
self.call_counts = defaultdict(lambda: defaultdict(int))
def process_trace(self, trace):
"""
OpenTelemetry trace → граф вызовов
"""
for span in trace.spans:
if span.parent_service and span.service:
self.call_counts[span.parent_service][span.service] += 1
def build_graph(self, min_calls=100):
G = nx.DiGraph()
for caller, callees in self.call_counts.items():
for callee, count in callees.items():
if count >= min_calls:
G.add_edge(caller, callee, weight=count)
return G
def propagate_failure(self, G, failed_service, anomaly_scores):
"""
Если сервис A вызывает B, а B деградирует → А может пострадать
Вычисляем вероятность "заражения" для каждого сервиса
"""
propagated_risk = {}
for service in nx.ancestors(G, failed_service) | {failed_service}:
path_to_failed = nx.has_path(G, service, failed_service)
if path_to_failed:
shortest_path = nx.shortest_path_length(G, service, failed_service)
propagated_risk[service] = anomaly_scores[failed_service] * (0.7 ** shortest_path)
return propagated_risk
PC Algorithm for Causal Discovery:
from causal_discovery_toolbox.structure_learning import PC
def discover_causal_structure(metrics_df, alpha=0.05):
"""
PC алгоритм: из корреляционной матрицы → граф причинно-следственных связей
Не просто correlation, а направленный граф
Работает для стационарных процессов, не слишком нелинейных
"""
pc = PC()
dag = pc.predict(metrics_df) # возвращает NetworkX DiGraph
return dag
Log Analysis for RCA
Log Parsing and Anomaly Detection:
from drain3 import TemplateMiner
def parse_and_analyze_logs(log_lines, incident_time, window_minutes=10):
"""
Drain3: онлайн парсинг логов → шаблоны
Аномалия: резкий рост частоты определённого шаблона ошибки
"""
miner = TemplateMiner()
template_counts = defaultdict(list)
for line in log_lines:
result = miner.add_log_message(line.message)
template_id = result['cluster_id']
template_counts[template_id].append(line.timestamp)
# Для каждого шаблона: нормальная частота vs. частота во время инцидента
incident_start = incident_time - pd.Timedelta(minutes=window_minutes)
incident_end = incident_time + pd.Timedelta(minutes=5)
anomalous_templates = []
for template_id, timestamps in template_counts.items():
ts = pd.Series(timestamps)
baseline_count = ts[ts < incident_start].count()
incident_count = ts[(ts >= incident_start) & (ts <= incident_end)].count()
if baseline_count > 0 and incident_count / baseline_count > 5:
anomalous_templates.append({
'template': miner.drain.id_to_cluster[template_id].get_template(),
'spike_ratio': incident_count / baseline_count,
'count': incident_count
})
return sorted(anomalous_templates, key=lambda x: x['spike_ratio'], reverse=True)
RCA Report Generation
Structured RCA Output:
def generate_rca_report(incident_id, correlation_results, dependency_analysis,
log_analysis, change_log, llm):
rca_data = {
'incident_id': incident_id,
'probable_root_cause': rank_root_causes(correlation_results, dependency_analysis),
'contributing_factors': extract_contributing_factors(log_analysis),
'timeline': build_incident_timeline(correlation_results, change_log),
'affected_services': extract_affected_services(dependency_analysis),
'recommended_actions': generate_remediation(probable_root_cause)
}
# LLM формирует human-readable нарратив
narrative = llm.invoke(f"Generate an RCA report: {rca_data}")
return RCAReport(**rca_data, narrative=narrative)
Accumulation of knowledge: Each incident → precedent database. For the next similar incident: search for similar features by vector embedding → recommendation: "In June, a similar incident was resolved by restarting redis-cluster."
Timeframe: Metrics + log parsing + basic correlation analysis — 4-5 weeks. Causal graph, Drain3 log templates, LLM narrative, incident knowledge base — 3-4 months.







