Development of an AI system for log analysis and automatic anomaly detection
Logs are a rich source of information about system status, but their volume (billions of lines per day) makes manual analysis impossible. The ML system automatically structures unstructured logs, detects anomalous patterns, and prioritizes incidents.
The problem of scale
Volumes: A large microservice system generates 1-10 GB of logs per minute. Traditional grep and the Kibana dashboard are reactive tools. The ML system operates proactively.
Log data structure:
# Неструктурированная строка лога
raw_log = "2024-01-15 14:23:45 ERROR ServiceA [req-id-123] Failed to connect to DB: Connection timeout after 30000ms, retry 3/5"
# После ML-парсинга:
parsed_log = {
'timestamp': '2024-01-15T14:23:45Z',
'level': 'ERROR',
'service': 'ServiceA',
'request_id': 'req-id-123',
'event_template': 'Failed to connect to <*>: Connection timeout after <*>ms, retry <*/5>',
'template_id': 'template_db_connection_timeout',
'parameters': {
'target': 'DB',
'timeout_ms': 30000,
'retry_attempt': 3
}
}
Parsing and templating
Drain3 — online parsing:
from drain3 import TemplateMiner
from drain3.template_miner_config import TemplateMinerConfig
config = TemplateMinerConfig()
config.drain_sim_th = 0.4 # порог схожести шаблонов
config.drain_depth = 4 # глубина дерева
miner = TemplateMiner(config=config)
def process_log_stream(log_stream):
template_stats = {}
for log_line in log_stream:
result = miner.add_log_message(log_line.message)
cluster_id = result['cluster_id']
if cluster_id not in template_stats:
template_stats[cluster_id] = {
'template': result['template_mined'],
'count': 0,
'timestamps': [],
'log_level_dist': {}
}
template_stats[cluster_id]['count'] += 1
template_stats[cluster_id]['timestamps'].append(log_line.timestamp)
return template_stats
Alternatives to Drain3:
- Spell: Streaming Parser for Execution Logs
- IPLoM: Iterative Partitioning Log Mining
- LLM-based: GPT/Claude for semantic parsing of complex, non-standard formats
Detection of abnormal patterns
Frequency anomaly (count-based):
import pandas as pd
from collections import deque
import numpy as np
class TemplateFrequencyMonitor:
"""
Мониторинг частоты появления каждого template-шаблона
Аномалия: резкий рост ERROR шаблона
"""
def __init__(self, window_minutes=10, baseline_minutes=60):
self.baseline_window = deque(maxlen=baseline_minutes)
self.current_window = deque(maxlen=window_minutes)
def update(self, template_counts_per_minute):
self.baseline_window.append(template_counts_per_minute)
self.current_window.append(template_counts_per_minute)
if len(self.current_window) < self.current_window.maxlen:
return {} # недостаточно данных
anomalies = {}
current = pd.DataFrame(list(self.current_window)).mean()
baseline = pd.DataFrame(list(self.baseline_window)).mean()
for template_id in current.index:
base_rate = baseline.get(template_id, 1)
curr_rate = current[template_id]
spike_ratio = curr_rate / (base_rate + 0.1)
if spike_ratio > 5 and curr_rate > 10: # 5x spike и минимальный объём
anomalies[template_id] = {
'spike_ratio': spike_ratio,
'current_rate': curr_rate,
'baseline_rate': base_rate
}
return anomalies
Semantic anomaly (embedding-based):
from sentence_transformers import SentenceTransformer
from sklearn.ensemble import IsolationForest
class SemanticLogAnomalyDetector:
"""
Embedding логов → Isolation Forest
Обнаруживает семантически необычные сообщения
даже если их частота нормальная
"""
def __init__(self):
self.encoder = SentenceTransformer('all-MiniLM-L6-v2')
self.detector = IsolationForest(contamination=0.01)
def fit(self, normal_log_templates):
embeddings = self.encoder.encode(normal_log_templates)
self.detector.fit(embeddings)
def detect(self, new_log_line):
embedding = self.encoder.encode([new_log_line])
score = self.detector.score_samples(embedding)[0]
is_anomaly = score < self.detector.threshold_
return is_anomaly, score
Sequence-based anomaly:
def detect_sequence_anomalies(log_sequence, expected_sequences, n_gram=3):
"""
Некоторые последовательности событий нормальны
Нетипичная последовательность = аномалия в workflow системы
"""
# N-gram модель нормальных последовательностей
from collections import Counter
normal_ngrams = Counter()
for seq in expected_sequences:
for i in range(len(seq) - n_gram + 1):
ngram = tuple(seq[i:i+n_gram])
normal_ngrams[ngram] += 1
# Обнаружение необычных переходов в новой последовательности
anomalous_transitions = []
for i in range(len(log_sequence) - n_gram + 1):
ngram = tuple(log_sequence[i:i+n_gram])
if ngram not in normal_ngrams:
anomalous_transitions.append({
'position': i,
'ngram': ngram,
'context': log_sequence[max(0, i-2):i+n_gram+2]
})
return anomalous_transitions
ML criticality classification
Severity Classification:
from transformers import pipeline
class LogSeverityClassifier:
def __init__(self):
# Fine-tuned BERT на размеченных логах
self.classifier = pipeline(
'text-classification',
model='path/to/finetuned-log-severity-bert'
)
def classify(self, log_message):
"""
Классы: informational / warning / error / critical
Не просто по уровню логирования (ERROR != critical impact),
а по семантике сообщения
"""
result = self.classifier(log_message[:512])[0]
return {
'severity': result['label'],
'confidence': result['score'],
'requires_immediate_action': result['label'] == 'critical' and result['score'] > 0.85
}
Log aggregation and correlation
Cross-service Log Correlation:
def correlate_across_services(logs_by_service, request_id_field='request_id'):
"""
Один request_id может проходить через 10+ сервисов
Агрегация всех логов одного запроса → полная картина ошибки
"""
request_logs = {}
for service, logs in logs_by_service.items():
for log in logs:
req_id = log.get(request_id_field)
if req_id:
if req_id not in request_logs:
request_logs[req_id] = []
request_logs[req_id].append({
'service': service,
'timestamp': log['timestamp'],
'level': log['level'],
'message': log['message']
})
# Сортировка по времени для каждого request
for req_id in request_logs:
request_logs[req_id].sort(key=lambda x: x['timestamp'])
# Находим запросы с ERROR в цепочке
failed_requests = {
req_id: logs for req_id, logs in request_logs.items()
if any(l['level'] == 'ERROR' for l in logs)
}
return failed_requests
Practical implementation
ELK Stack + ML Layer:
- Elasticsearch: log storage and retrieval
- Logstash/Fluent Bit: collection and formatting
- Kibana: Basic Visualization
- ML Layer (Python FastAPI): Drain3 + anomaly detection + severity classifier
Kafka for streaming:
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'logs-topic',
bootstrap_servers=['kafka:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
template_monitor = TemplateFrequencyMonitor()
anomaly_detector = SemanticLogAnomalyDetector()
for message in consumer:
log_entry = message.value
# Парсинг
parsed = miner.add_log_message(log_entry['message'])
# Частотная аномалия
freq_anomalies = template_monitor.update_single(parsed['template_mined'])
# Семантическая аномалия
is_anomaly, score = anomaly_detector.detect(log_entry['message'])
if freq_anomalies or is_anomaly:
send_to_alert_manager(log_entry, freq_anomalies, score)
Timeframe: Drain3 parsing + frequency anomaly + Elasticsearch integration — 3-4 weeks. Semantic anomaly (BERT), sequence detection, cross-service correlation, severity classification — 2-3 months.







