System Development построения Heatmap перемещений людей
Heatmap перемещений — визуализация плотности присутствия и путей движения людей в пространстве. Применяется в ритейле для анализа «горячих» и «холодных» зон, в музеях для понимания маршрутов посетителей, в аэропортах для оптимизации навигации и размещения сервисов.
Накопительный heatmap в реальном времени
import cv2
import numpy as np
from collections import defaultdict
class MovementHeatmapSystem:
def __init__(self, frame_shape: tuple,
decay_factor: float = 0.9999,
gaussian_kernel: int = 31):
h, w = frame_shape[:2]
self.accumulator = np.zeros((h, w), dtype=np.float64)
self.decay = decay_factor
self.kernel = gaussian_kernel
self.frame_count = 0
# Отдельные аккумуляторы по часам для time-based анализа
self.hourly_accumulators = defaultdict(lambda: np.zeros((h, w)))
def update(self, tracked_persons: list[dict], hour: int = 0):
"""Обновление heatmap позициями всех трекнутых людей"""
self.frame_count += 1
# Temporal decay — старые данные постепенно теряют вес
self.accumulator *= self.decay
for person in tracked_persons:
cx, cy = person['center']
if 0 <= cx < self.accumulator.shape[1] and \
0 <= cy < self.accumulator.shape[0]:
self.accumulator[cy, cx] += 1.0
self.hourly_accumulators[hour][cy, cx] += 1.0
def get_visualization(self, background: np.ndarray = None,
alpha: float = 0.6) -> np.ndarray:
"""Визуализация heatmap поверх фонового изображения"""
# Нормализация
acc_smooth = cv2.GaussianBlur(
self.accumulator.astype(np.float32),
(self.kernel, self.kernel), 0
)
normalized = cv2.normalize(acc_smooth, None, 0, 255,
cv2.NORM_MINMAX).astype(np.uint8)
heatmap_colored = cv2.applyColorMap(normalized, cv2.COLORMAP_JET)
if background is not None:
# Маска: показываем только там где было движение
threshold = 10
mask = (normalized > threshold).astype(np.uint8) * 255
mask_3ch = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR)
overlay = np.where(mask_3ch > 0, heatmap_colored, background)
return cv2.addWeighted(background, 1 - alpha, overlay, alpha, 0)
return heatmap_colored
def get_top_zones(self, n: int = 5, zone_size: int = 50) -> list[dict]:
"""Нахождение самых посещаемых зон"""
acc_smooth = cv2.GaussianBlur(
self.accumulator.astype(np.float32),
(self.kernel, self.kernel), 0
)
zones = []
acc_copy = acc_smooth.copy()
for _ in range(n):
# Нахождение максимума
_, max_val, _, max_loc = cv2.minMaxLoc(acc_copy)
mx, my = max_loc
zones.append({
'center': (mx, my),
'heat_value': float(max_val),
'rank': len(zones) + 1
})
# Подавляем регион вокруг найденного максимума
cv2.circle(acc_copy, (mx, my), zone_size, 0, -1)
return zones
Анализ маршрутов (Path Analysis)
class PathAnalyzer:
def __init__(self):
self.complete_tracks = {} # track_id -> list of positions
def add_track(self, track_id: int, positions: list[tuple]):
"""Добавление завершённого трека"""
if len(positions) > 10: # минимальная длина трека
self.complete_tracks[track_id] = positions
def find_common_paths(self, n_clusters: int = 5) -> list:
"""Кластеризация треков для нахождения типичных маршрутов"""
from sklearn.cluster import KMeans
from scipy.interpolate import interp1d
if len(self.complete_tracks) < n_clusters:
return []
# Ресемплируем треки до одинаковой длины
resampled = []
for positions in self.complete_tracks.values():
t = np.linspace(0, 1, len(positions))
t_new = np.linspace(0, 1, 50) # 50 точек
xs = interp1d(t, [p[0] for p in positions])(t_new)
ys = interp1d(t, [p[1] for p in positions])(t_new)
resampled.append(np.concatenate([xs, ys]))
# Кластеризация треков
X = np.array(resampled)
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
labels = kmeans.fit_predict(X)
# Центроидный маршрут для каждого кластера
cluster_paths = []
for cluster_id in range(n_clusters):
cluster_tracks = X[labels == cluster_id]
centroid = cluster_tracks.mean(axis=0)
n = len(centroid) // 2
path = list(zip(centroid[:n].tolist(), centroid[n:].tolist()))
cluster_paths.append({
'path': path,
'count': int((labels == cluster_id).sum()),
'percentage': float((labels == cluster_id).mean() * 100)
})
return sorted(cluster_paths, key=lambda x: x['count'], reverse=True)
Аналитика по часам и дням
def generate_analytics_report(heatmap_system: MovementHeatmapSystem) -> dict:
peak_hours = {}
for hour, acc in heatmap_system.hourly_accumulators.items():
peak_hours[hour] = float(acc.sum())
return {
'peak_hour': max(peak_hours, key=peak_hours.get),
'quiet_hour': min(peak_hours, key=peak_hours.get),
'hourly_traffic': peak_hours,
'top_zones': heatmap_system.get_top_zones(n=10),
'total_dwell_events': int(heatmap_system.accumulator.sum())
}
Integration с retail аналитикой
Heatmap интегрируется с данными продаж: зоны с высоким footfall, но низкими продажами — проблема с товаром или выкладкой. Зоны с высоким dwell time — места для размещения промо-материалов.
| Применение | Точность | Срок |
|---|---|---|
| Retail footfall + heatmap | 94–97% | 4–6 недель |
| Музей + маршруты посетителей | 92–96% | 5–7 недель |
| Аэропорт + операционная аналитика | 90–95% | 8–14 недель |







