AI Log Analysis and Automatic Anomaly Detection System

We design and deploy artificial intelligence systems: from prototype to production-ready solutions. Our team combines expertise in machine learning, data engineering and MLOps to make AI work not in the lab, but in real business.
Showing 1 of 1 servicesAll 1566 services
AI Log Analysis and Automatic Anomaly Detection System
Medium
~1-2 weeks
FAQ
AI Development Areas
AI Solution Development Stages
Latest works
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    823
  • image_logo-aider_0.jpg
    AIDER company logo development
    762
  • image_crm_chasseurs_493_0.webp
    CRM development for Chasseurs
    848

Development of an AI system for log analysis and automatic anomaly detection

Logs are a rich source of information about system status, but their volume (billions of lines per day) makes manual analysis impossible. The ML system automatically structures unstructured logs, detects anomalous patterns, and prioritizes incidents.

The problem of scale

Volumes: A large microservice system generates 1-10 GB of logs per minute. Traditional grep and the Kibana dashboard are reactive tools. The ML system operates proactively.

Log data structure:

# Неструктурированная строка лога
raw_log = "2024-01-15 14:23:45 ERROR ServiceA [req-id-123] Failed to connect to DB: Connection timeout after 30000ms, retry 3/5"

# После ML-парсинга:
parsed_log = {
    'timestamp': '2024-01-15T14:23:45Z',
    'level': 'ERROR',
    'service': 'ServiceA',
    'request_id': 'req-id-123',
    'event_template': 'Failed to connect to <*>: Connection timeout after <*>ms, retry <*/5>',
    'template_id': 'template_db_connection_timeout',
    'parameters': {
        'target': 'DB',
        'timeout_ms': 30000,
        'retry_attempt': 3
    }
}

Parsing and templating

Drain3 — online parsing:

from drain3 import TemplateMiner
from drain3.template_miner_config import TemplateMinerConfig

config = TemplateMinerConfig()
config.drain_sim_th = 0.4   # порог схожести шаблонов
config.drain_depth = 4       # глубина дерева

miner = TemplateMiner(config=config)

def process_log_stream(log_stream):
    template_stats = {}

    for log_line in log_stream:
        result = miner.add_log_message(log_line.message)

        cluster_id = result['cluster_id']
        if cluster_id not in template_stats:
            template_stats[cluster_id] = {
                'template': result['template_mined'],
                'count': 0,
                'timestamps': [],
                'log_level_dist': {}
            }

        template_stats[cluster_id]['count'] += 1
        template_stats[cluster_id]['timestamps'].append(log_line.timestamp)

    return template_stats

Alternatives to Drain3:

  • Spell: Streaming Parser for Execution Logs
  • IPLoM: Iterative Partitioning Log Mining
  • LLM-based: GPT/Claude for semantic parsing of complex, non-standard formats

Detection of abnormal patterns

Frequency anomaly (count-based):

import pandas as pd
from collections import deque
import numpy as np

class TemplateFrequencyMonitor:
    """
    Мониторинг частоты появления каждого template-шаблона
    Аномалия: резкий рост ERROR шаблона
    """
    def __init__(self, window_minutes=10, baseline_minutes=60):
        self.baseline_window = deque(maxlen=baseline_minutes)
        self.current_window = deque(maxlen=window_minutes)

    def update(self, template_counts_per_minute):
        self.baseline_window.append(template_counts_per_minute)
        self.current_window.append(template_counts_per_minute)

        if len(self.current_window) < self.current_window.maxlen:
            return {}  # недостаточно данных

        anomalies = {}
        current = pd.DataFrame(list(self.current_window)).mean()
        baseline = pd.DataFrame(list(self.baseline_window)).mean()

        for template_id in current.index:
            base_rate = baseline.get(template_id, 1)
            curr_rate = current[template_id]
            spike_ratio = curr_rate / (base_rate + 0.1)

            if spike_ratio > 5 and curr_rate > 10:  # 5x spike и минимальный объём
                anomalies[template_id] = {
                    'spike_ratio': spike_ratio,
                    'current_rate': curr_rate,
                    'baseline_rate': base_rate
                }

        return anomalies

Semantic anomaly (embedding-based):

from sentence_transformers import SentenceTransformer
from sklearn.ensemble import IsolationForest

class SemanticLogAnomalyDetector:
    """
    Embedding логов → Isolation Forest
    Обнаруживает семантически необычные сообщения
    даже если их частота нормальная
    """
    def __init__(self):
        self.encoder = SentenceTransformer('all-MiniLM-L6-v2')
        self.detector = IsolationForest(contamination=0.01)

    def fit(self, normal_log_templates):
        embeddings = self.encoder.encode(normal_log_templates)
        self.detector.fit(embeddings)

    def detect(self, new_log_line):
        embedding = self.encoder.encode([new_log_line])
        score = self.detector.score_samples(embedding)[0]
        is_anomaly = score < self.detector.threshold_
        return is_anomaly, score

Sequence-based anomaly:

def detect_sequence_anomalies(log_sequence, expected_sequences, n_gram=3):
    """
    Некоторые последовательности событий нормальны
    Нетипичная последовательность = аномалия в workflow системы
    """
    # N-gram модель нормальных последовательностей
    from collections import Counter
    normal_ngrams = Counter()

    for seq in expected_sequences:
        for i in range(len(seq) - n_gram + 1):
            ngram = tuple(seq[i:i+n_gram])
            normal_ngrams[ngram] += 1

    # Обнаружение необычных переходов в новой последовательности
    anomalous_transitions = []
    for i in range(len(log_sequence) - n_gram + 1):
        ngram = tuple(log_sequence[i:i+n_gram])
        if ngram not in normal_ngrams:
            anomalous_transitions.append({
                'position': i,
                'ngram': ngram,
                'context': log_sequence[max(0, i-2):i+n_gram+2]
            })

    return anomalous_transitions

ML criticality classification

Severity Classification:

from transformers import pipeline

class LogSeverityClassifier:
    def __init__(self):
        # Fine-tuned BERT на размеченных логах
        self.classifier = pipeline(
            'text-classification',
            model='path/to/finetuned-log-severity-bert'
        )

    def classify(self, log_message):
        """
        Классы: informational / warning / error / critical
        Не просто по уровню логирования (ERROR != critical impact),
        а по семантике сообщения
        """
        result = self.classifier(log_message[:512])[0]
        return {
            'severity': result['label'],
            'confidence': result['score'],
            'requires_immediate_action': result['label'] == 'critical' and result['score'] > 0.85
        }

Log aggregation and correlation

Cross-service Log Correlation:

def correlate_across_services(logs_by_service, request_id_field='request_id'):
    """
    Один request_id может проходить через 10+ сервисов
    Агрегация всех логов одного запроса → полная картина ошибки
    """
    request_logs = {}

    for service, logs in logs_by_service.items():
        for log in logs:
            req_id = log.get(request_id_field)
            if req_id:
                if req_id not in request_logs:
                    request_logs[req_id] = []
                request_logs[req_id].append({
                    'service': service,
                    'timestamp': log['timestamp'],
                    'level': log['level'],
                    'message': log['message']
                })

    # Сортировка по времени для каждого request
    for req_id in request_logs:
        request_logs[req_id].sort(key=lambda x: x['timestamp'])

    # Находим запросы с ERROR в цепочке
    failed_requests = {
        req_id: logs for req_id, logs in request_logs.items()
        if any(l['level'] == 'ERROR' for l in logs)
    }

    return failed_requests

Practical implementation

ELK Stack + ML Layer:

  • Elasticsearch: log storage and retrieval
  • Logstash/Fluent Bit: collection and formatting
  • Kibana: Basic Visualization
  • ML Layer (Python FastAPI): Drain3 + anomaly detection + severity classifier

Kafka for streaming:

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'logs-topic',
    bootstrap_servers=['kafka:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

template_monitor = TemplateFrequencyMonitor()
anomaly_detector = SemanticLogAnomalyDetector()

for message in consumer:
    log_entry = message.value
    # Парсинг
    parsed = miner.add_log_message(log_entry['message'])
    # Частотная аномалия
    freq_anomalies = template_monitor.update_single(parsed['template_mined'])
    # Семантическая аномалия
    is_anomaly, score = anomaly_detector.detect(log_entry['message'])

    if freq_anomalies or is_anomaly:
        send_to_alert_manager(log_entry, freq_anomalies, score)

Timeframe: Drain3 parsing + frequency anomaly + Elasticsearch integration — 3-4 weeks. Semantic anomaly (BERT), sequence detection, cross-service correlation, severity classification — 2-3 months.