People Movement Heatmap System

We design and deploy artificial intelligence systems: from prototype to production-ready solutions. Our team combines expertise in machine learning, data engineering and MLOps to make AI work not in the lab, but in real business.
Showing 1 of 1 servicesAll 1566 services
People Movement Heatmap System
Medium
~3-5 business days
FAQ
AI Development Areas
AI Solution Development Stages
Latest works
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    823
  • image_logo-aider_0.jpg
    AIDER company logo development
    762
  • image_crm_chasseurs_493_0.webp
    CRM development for Chasseurs
    848

System Development построения Heatmap перемещений людей

Heatmap перемещений — визуализация плотности присутствия и путей движения людей в пространстве. Применяется в ритейле для анализа «горячих» и «холодных» зон, в музеях для понимания маршрутов посетителей, в аэропортах для оптимизации навигации и размещения сервисов.

Накопительный heatmap в реальном времени

import cv2
import numpy as np
from collections import defaultdict

class MovementHeatmapSystem:
    def __init__(self, frame_shape: tuple,
                 decay_factor: float = 0.9999,
                 gaussian_kernel: int = 31):
        h, w = frame_shape[:2]
        self.accumulator = np.zeros((h, w), dtype=np.float64)
        self.decay = decay_factor
        self.kernel = gaussian_kernel
        self.frame_count = 0

        # Отдельные аккумуляторы по часам для time-based анализа
        self.hourly_accumulators = defaultdict(lambda: np.zeros((h, w)))

    def update(self, tracked_persons: list[dict], hour: int = 0):
        """Обновление heatmap позициями всех трекнутых людей"""
        self.frame_count += 1

        # Temporal decay — старые данные постепенно теряют вес
        self.accumulator *= self.decay

        for person in tracked_persons:
            cx, cy = person['center']
            if 0 <= cx < self.accumulator.shape[1] and \
               0 <= cy < self.accumulator.shape[0]:
                self.accumulator[cy, cx] += 1.0
                self.hourly_accumulators[hour][cy, cx] += 1.0

    def get_visualization(self, background: np.ndarray = None,
                           alpha: float = 0.6) -> np.ndarray:
        """Визуализация heatmap поверх фонового изображения"""
        # Нормализация
        acc_smooth = cv2.GaussianBlur(
            self.accumulator.astype(np.float32),
            (self.kernel, self.kernel), 0
        )
        normalized = cv2.normalize(acc_smooth, None, 0, 255,
                                    cv2.NORM_MINMAX).astype(np.uint8)
        heatmap_colored = cv2.applyColorMap(normalized, cv2.COLORMAP_JET)

        if background is not None:
            # Маска: показываем только там где было движение
            threshold = 10
            mask = (normalized > threshold).astype(np.uint8) * 255
            mask_3ch = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR)

            overlay = np.where(mask_3ch > 0, heatmap_colored, background)
            return cv2.addWeighted(background, 1 - alpha, overlay, alpha, 0)

        return heatmap_colored

    def get_top_zones(self, n: int = 5, zone_size: int = 50) -> list[dict]:
        """Нахождение самых посещаемых зон"""
        acc_smooth = cv2.GaussianBlur(
            self.accumulator.astype(np.float32),
            (self.kernel, self.kernel), 0
        )

        zones = []
        acc_copy = acc_smooth.copy()

        for _ in range(n):
            # Нахождение максимума
            _, max_val, _, max_loc = cv2.minMaxLoc(acc_copy)
            mx, my = max_loc

            zones.append({
                'center': (mx, my),
                'heat_value': float(max_val),
                'rank': len(zones) + 1
            })

            # Подавляем регион вокруг найденного максимума
            cv2.circle(acc_copy, (mx, my), zone_size, 0, -1)

        return zones

Анализ маршрутов (Path Analysis)

class PathAnalyzer:
    def __init__(self):
        self.complete_tracks = {}  # track_id -> list of positions

    def add_track(self, track_id: int, positions: list[tuple]):
        """Добавление завершённого трека"""
        if len(positions) > 10:  # минимальная длина трека
            self.complete_tracks[track_id] = positions

    def find_common_paths(self, n_clusters: int = 5) -> list:
        """Кластеризация треков для нахождения типичных маршрутов"""
        from sklearn.cluster import KMeans
        from scipy.interpolate import interp1d

        if len(self.complete_tracks) < n_clusters:
            return []

        # Ресемплируем треки до одинаковой длины
        resampled = []
        for positions in self.complete_tracks.values():
            t = np.linspace(0, 1, len(positions))
            t_new = np.linspace(0, 1, 50)  # 50 точек
            xs = interp1d(t, [p[0] for p in positions])(t_new)
            ys = interp1d(t, [p[1] for p in positions])(t_new)
            resampled.append(np.concatenate([xs, ys]))

        # Кластеризация треков
        X = np.array(resampled)
        kmeans = KMeans(n_clusters=n_clusters, random_state=42)
        labels = kmeans.fit_predict(X)

        # Центроидный маршрут для каждого кластера
        cluster_paths = []
        for cluster_id in range(n_clusters):
            cluster_tracks = X[labels == cluster_id]
            centroid = cluster_tracks.mean(axis=0)
            n = len(centroid) // 2
            path = list(zip(centroid[:n].tolist(), centroid[n:].tolist()))
            cluster_paths.append({
                'path': path,
                'count': int((labels == cluster_id).sum()),
                'percentage': float((labels == cluster_id).mean() * 100)
            })

        return sorted(cluster_paths, key=lambda x: x['count'], reverse=True)

Аналитика по часам и дням

def generate_analytics_report(heatmap_system: MovementHeatmapSystem) -> dict:
    peak_hours = {}
    for hour, acc in heatmap_system.hourly_accumulators.items():
        peak_hours[hour] = float(acc.sum())

    return {
        'peak_hour': max(peak_hours, key=peak_hours.get),
        'quiet_hour': min(peak_hours, key=peak_hours.get),
        'hourly_traffic': peak_hours,
        'top_zones': heatmap_system.get_top_zones(n=10),
        'total_dwell_events': int(heatmap_system.accumulator.sum())
    }

Integration с retail аналитикой

Heatmap интегрируется с данными продаж: зоны с высоким footfall, но низкими продажами — проблема с товаром или выкладкой. Зоны с высоким dwell time — места для размещения промо-материалов.

Применение Точность Срок
Retail footfall + heatmap 94–97% 4–6 недель
Музей + маршруты посетителей 92–96% 5–7 недель
Аэропорт + операционная аналитика 90–95% 8–14 недель