AI-система контроля качества на производстве
Визуальный контроль качества — одна из наиболее зрелых промышленных CV-задач. Но внедрение в production регулярно спотыкается на трёх проблемах: нехватке примеров дефектов (дефекты редки по определению), высоких требованиях к чувствительности (пропустить дефект = рекламация), и вариативности условий освещения и позиционирования.
Anomaly detection при дефиците дефектных примеров
Классический supervised подход требует 500+ примеров каждого типа дефекта. На реальном производстве их бывает 20–50. Решение: обучаемся только на нормальных изображениях, дефект = аномалия.
PatchCore — state-of-the-art unsupervised anomaly detection (MVTec benchmark AUROC 99.1%):
import torch
import torch.nn.functional as F
import torchvision.transforms as T
import timm
import numpy as np
from sklearn.neighbors import NearestNeighbors
class PatchCoreDetector:
"""
PatchCore: строим банк нормальных патчей,
дефект = патч далеко от всех нормальных.
Training только на хороших изображениях — дефекты не нужны.
"""
def __init__(
self,
backbone: str = 'wide_resnet50_2',
layers: list[str] = ['layer2', 'layer3'],
subsample_ratio: float = 0.1 # случайная выборка из банка патчей
):
self.model = timm.create_model(
backbone, pretrained=True, features_only=True
).eval().cuda()
self.layers = layers
self.subsample_ratio = subsample_ratio
self.memory_bank = None
self.nn_index = None
self.transform = T.Compose([
T.Resize(256),
T.CenterCrop(224),
T.ToTensor(),
T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
def _extract_patches(
self, image_tensor: torch.Tensor
) -> torch.Tensor:
"""Извлечение multi-scale признаков патчей"""
with torch.no_grad():
features_list = self.model(image_tensor.unsqueeze(0).cuda())
patches = []
for i, (feat, layer_name) in enumerate(
zip(features_list, self.model.feature_info.module_name())
):
if layer_name in self.layers:
# Усредняем соседние патчи (locally aware patch features)
feat = F.avg_pool2d(feat, kernel_size=3, stride=1, padding=1)
patches.append(feat.squeeze(0))
# Ресайзим к одному размеру и конкатенируем
target_h, target_w = patches[0].shape[1], patches[0].shape[2]
aligned = []
for p in patches:
if p.shape[1] != target_h:
p = F.interpolate(
p.unsqueeze(0), size=(target_h, target_w),
mode='bilinear', align_corners=False
).squeeze(0)
aligned.append(p)
combined = torch.cat(aligned, dim=0) # (C, H, W)
# Reshape в список патчей: (H*W, C)
C, H, W = combined.shape
return combined.reshape(C, -1).T.cpu() # (H*W, C)
def fit(self, normal_images: list) -> None:
"""Training: строим банк патчей из нормальных изображений"""
all_patches = []
for img in normal_images:
img_t = self.transform(img)
patches = self._extract_patches(img_t)
all_patches.append(patches)
bank = torch.cat(all_patches, dim=0) # (N_patches, C)
# Subsampling для управления размером памяти
n_keep = int(len(bank) * self.subsample_ratio)
indices = torch.randperm(len(bank))[:n_keep]
self.memory_bank = bank[indices].numpy()
# kNN-индекс
self.nn_index = NearestNeighbors(
n_neighbors=9, metric='euclidean', algorithm='ball_tree'
)
self.nn_index.fit(self.memory_bank)
print(f'Memory bank: {len(self.memory_bank)} patches '
f'from {len(normal_images)} normal images')
def predict(
self, image, return_heatmap: bool = True
) -> dict:
"""Детекция аномалии: высокий score → дефект"""
img_t = self.transform(image)
patches = self._extract_patches(img_t)
# Расстояние каждого патча до ближайших нормальных
distances, _ = self.nn_index.kneighbors(patches.numpy())
patch_scores = distances[:, 0] # расстояние до ближайшего нормального
anomaly_score = float(patch_scores.max())
result = {'anomaly_score': anomaly_score,
'is_defect': anomaly_score > self.threshold_}
if return_heatmap:
# Reshape обратно в spatial карту
side = int(np.sqrt(len(patch_scores)))
heatmap = patch_scores.reshape(side, side)
import cv2
heatmap_resized = cv2.resize(
heatmap, (224, 224), interpolation=cv2.INTER_LINEAR
)
result['heatmap'] = heatmap_resized
return result
def set_threshold(
self, normal_images: list, fpr_target: float = 0.01
) -> float:
"""Устанавливаем порог по заданному FAR на нормальных"""
scores = [
self.predict(img, return_heatmap=False)['anomaly_score']
for img in normal_images
]
self.threshold_ = float(np.percentile(scores, (1 - fpr_target) * 100))
return self.threshold_
Integration с конвейером через PLC/OPC-UA
import asyncio
from asyncua import Client as OPCClient
import cv2
import threading
class ConveyorQCSystem:
"""
Real-time контроль качества: сигнал от ПЛК → захват кадра → анализ → решение.
"""
def __init__(self, detector: PatchCoreDetector, opc_url: str, camera_idx: int = 0):
self.detector = detector
self.opc_url = opc_url
self.camera = cv2.VideoCapture(camera_idx)
self.camera.set(cv2.CAP_PROP_BUFFERSIZE, 1)
async def run(self):
async with OPCClient(url=self.opc_url) as opc:
# Узлы ПЛК
trigger_node = opc.get_node('ns=2;i=1001') # сигнал детали
reject_node = opc.get_node('ns=2;i=1002') # команда отбраковки
while True:
trigger = await trigger_node.read_value()
if trigger:
# Захват кадра с минимальной задержкой
self.camera.grab()
_, frame = self.camera.retrieve()
from PIL import Image
pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
result = self.detector.predict(pil_img, return_heatmap=False)
# Решение об отбраковке
await reject_node.write_value(result['is_defect'])
if result['is_defect']:
print(f'DEFECT detected: score={result["anomaly_score"]:.3f}')
await asyncio.sleep(0.01)
Результаты на реальных кейсах
Производство пластиковых компонентов: 8 типов дефектов, 120 нормальных изображений для обучения PatchCore, 0 дефектных при обучении.
- AUROC: 0.974
- Sensitivity при FAR=1%: 91.3%
- Latency на RTX 3060: 38ms (на конвейере скорость 1 деталь/2 секунды)
Сроки
| Задача | Срок |
|---|---|
| Anomaly detection (PatchCore) для одного типа продукта | 3–5 недель |
| Supervised дефект-классификатор (500+ примеров) | 4–6 недель |
| Полная система с конвейерной интеграцией | 8–14 недель |







