Development of an AI-based infrastructure monitoring and automatic alerting system (AIOps)
AIOps (Artificial Intelligence for IT Operations) combines machine learning with monitoring data—metrics, logs, and traces—to reduce MTTR, eliminate noise, and perform predictive diagnostics. For a team of 5-10 engineers managing hundreds of services, AIOps is the only way to remain effective.
Problems of classical monitoring
Alert storm: A single incident generates hundreds of alerts from interconnected systems. The engineer is drowning in notifications and misses the root cause.
Static thresholds: CPU threshold > 80% — false alarm at night during a batch job and missed problem during the day with normal 75% + growth trend.
Manual correlation: An engineer manually correlates metrics, logs, and traces from different systems. AIOps automates this.
Data infrastructure
Unified Observability Pipeline:
Application Metrics → Prometheus
Infrastructure Metrics → Node Exporter, cloud_exporter
Logs → Fluent Bit → Elasticsearch / Loki
Traces → OpenTelemetry → Jaeger / Tempo
Events → Kubernetes Events
↓
Kafka (общий event bus)
↓
AIOps Platform (ML inference + correlation)
↓
Alert Manager → PagerDuty / OpsGenie / Slack
Data volume: Enterprise: billions of metric points per day. ML pipeline requirements: batch for training, streaming for inference < 30 seconds latency.
Dynamic thresholds
Prophets/SARIMA for seasonal thresholds:
from prophet import Prophet
import pandas as pd
def train_dynamic_threshold(metric_series, confidence_level=0.99):
"""
Prophet обучается на нормальном поведении метрики
Возвращает upper/lower band для каждого момента времени
"""
df = pd.DataFrame({
'ds': metric_series.index,
'y': metric_series.values
})
model = Prophet(
seasonality_mode='multiplicative',
weekly_seasonality=True,
daily_seasonality=True,
interval_width=confidence_level
)
model.fit(df)
future = model.make_future_dataframe(periods=60, freq='5min')
forecast = model.predict(future)
# Аномалия: значение вне yhat_lower / yhat_upper
return forecast[['ds', 'yhat_lower', 'yhat_upper']]
def detect_anomaly(current_value, forecast_row):
return (current_value < forecast_row['yhat_lower'] or
current_value > forecast_row['yhat_upper'])
EWMA for real-time adaptation:
class EWMAThreshold:
def __init__(self, alpha=0.1, k=3.0):
self.alpha = alpha # скорость адаптации
self.k = k # количество сигм
self.ewma = None
self.ewmv = None # EWMA variance
def update(self, value):
if self.ewma is None:
self.ewma = value
self.ewmv = 0
return False
deviation = value - self.ewma
self.ewma = self.alpha * value + (1 - self.alpha) * self.ewma
self.ewmv = self.alpha * deviation**2 + (1 - self.alpha) * self.ewmv
threshold = self.k * np.sqrt(self.ewmv)
is_anomaly = abs(deviation) > threshold
return is_anomaly
Alert correlation and noise reduction
Alert Clustering:
from sklearn.cluster import DBSCAN
import numpy as np
def cluster_alerts(alerts_df, temporal_eps=300, spatial_eps=0.5):
"""
Кластеризация алертов: временная близость + семантическая схожесть
"""
# Фичи: timestamp + service embedding + severity
features = np.column_stack([
alerts_df['timestamp'].astype(int) / 1e9, # Unix timestamp
alerts_df['service_embedding'], # Word2Vec/FastText
alerts_df['severity_numeric']
])
# Нормализация
from sklearn.preprocessing import StandardScaler
features_scaled = StandardScaler().fit_transform(features)
clusters = DBSCAN(eps=0.5, min_samples=2).fit_predict(features_scaled)
alerts_df['incident_cluster'] = clusters
return alerts_df.groupby('incident_cluster').agg({
'alert_id': 'count',
'service': lambda x: x.mode()[0],
'severity': 'max',
'timestamp': 'min',
'message': list
})
Causal Graph for RCA:
import networkx as nx
class ServiceDependencyGraph:
def __init__(self):
self.graph = nx.DiGraph()
def build_from_traces(self, traces):
for trace in traces:
for span in trace.spans:
if span.parent:
self.graph.add_edge(span.parent_service, span.service,
latency=span.latency)
def find_root_cause(self, incident_services, anomaly_time):
"""
Из списка аномальных сервисов → найти upstream корень
"""
anomaly_set = set(incident_services)
root_candidates = []
for service in anomaly_set:
# Предки: если предок тоже аномальный → он ближе к корню
ancestors = nx.ancestors(self.graph, service)
if not ancestors.intersection(anomaly_set):
root_candidates.append(service)
return root_candidates
Predictive diagnostics
Pre-incident trends:
def detect_precursor_patterns(metrics_history, incident_labels, window=2*60):
"""
Обучение: за сколько минут до инцидента появляются первые признаки?
"""
precursor_features = []
for incident_time in incident_labels:
# Окно за 2 часа до инцидента
pre_incident = metrics_history[
incident_time - timedelta(minutes=window):incident_time
]
features = {
'error_rate_trend': np.polyfit(range(window), pre_incident['error_rate'], 1)[0],
'p99_latency_trend': np.polyfit(range(window), pre_incident['p99'], 1)[0],
'cpu_trend': np.polyfit(range(window), pre_incident['cpu'], 1)[0],
'memory_leak_indicator': detect_memory_leak_pattern(pre_incident['memory'])
}
precursor_features.append(features)
# Модель: за 30 минут до инцидента → предсказание
incident_predictor = LogisticRegression()
incident_predictor.fit(precursor_features, labels)
return incident_predictor
LLM-assisted incident analysis
Contextual analysis of the incident:
from anthropic import Anthropic
def generate_incident_summary(incident_data, llm_client):
"""
LLM генерирует читаемое резюме инцидента из технических данных
"""
context = f"""
Incident at {incident_data['timestamp']}:
- Affected services: {incident_data['services']}
- Anomalous metrics: {incident_data['metrics']}
- Recent changes: {incident_data['recent_deployments']}
- Related logs (sample): {incident_data['log_samples'][:5]}
- Similar past incidents: {incident_data['similar_incidents']}
"""
response = llm_client.messages.create(
model='claude-opus-4',
messages=[{
'role': 'user',
'content': f'Provide a concise incident analysis and recommended next steps:\n{context}'
}]
)
return response.content[0].text
Integration
Grafana AIOps Plugin: Unified dashboard: anomalies, alert clusters, RCA graph, precursor warnings.
PagerDuty Event Intelligence: Commercial AIOps add-on. Custom ML can be integrated via the PagerDuty Events API.
ServiceNow AIOps: Automatic creation of incidents with ML classification of priority and assignment group.
Stack: Prometheus + Grafana (metrics) + Loki (logs) + Tempo (traces) + Kafka (stream) + ClickHouse (analytics) + FastAPI (ML inference) + React (custom AIOps UI).
Timeframe: Dynamic thresholds + alert clustering + Slack integration — 4-5 weeks. Causal graph RCA, precursor detection, LLM incident summary, PagerDuty/ServiceNow integration — 3-4 months.







