Development of AI for Warehouse Inventory (Cameras and Drones)
Manual warehouse inventory takes 2–5 days of downtime, with 1–3% counting errors. AI systems based on fixed cameras or drones reduce time to just hours with 98–99% counting accuracy. Drones are especially effective for tall racks (10+ meters) inaccessible to manual scanners. Tasks: pallet counting, cell occupancy control, SKU identification via barcodes and visual features.
Pallet Counting System via Cameras
import numpy as np
import cv2
from ultralytics import YOLO
import sqlite3
from datetime import datetime
class WarehouseInventorySystem:
def __init__(self, detector_path: str,
db_path: str = 'warehouse.db',
device: str = 'cuda'):
self.detector = YOLO(detector_path)
self.device = device
self.db = sqlite3.connect(db_path, check_same_thread=False)
self._init_db()
def _init_db(self):
self.db.execute('''
CREATE TABLE IF NOT EXISTS inventory_snapshots (
id INTEGER PRIMARY KEY,
camera_id TEXT,
zone_id TEXT,
timestamp TIMESTAMP,
pallet_count INTEGER,
occupancy_pct REAL,
detections TEXT
)
''')
self.db.commit()
def count_pallets(self, image: np.ndarray,
camera_id: str,
zone_id: str,
total_slots: int = None) -> dict:
"""
Count pallets in storage zone.
total_slots: known total number of slots for occupancy %.
"""
results = self.detector(image, conf=0.45)
detections = []
for box in results[0].boxes:
cls_name = self.detector.model.names[int(box.cls)]
detections.append({
'type': cls_name,
'bbox': box.xyxy[0].tolist(),
'conf': float(box.conf)
})
pallet_count = sum(
1 for d in detections
if d['type'] in ['pallet', 'loaded_pallet', 'empty_pallet']
)
loaded = sum(1 for d in detections if d['type'] == 'loaded_pallet')
empty = sum(1 for d in detections if d['type'] == 'empty_pallet')
occupancy = (pallet_count / total_slots * 100) if total_slots else None
result = {
'camera_id': camera_id,
'zone_id': zone_id,
'pallet_count': pallet_count,
'loaded_pallets': loaded,
'empty_pallets': empty,
'occupancy_pct': occupancy,
'timestamp': datetime.now().isoformat()
}
# Save to database
self.db.execute(
'INSERT INTO inventory_snapshots VALUES (NULL, ?, ?, ?, ?, ?, ?)',
(camera_id, zone_id, result['timestamp'],
pallet_count, occupancy, str(detections))
)
self.db.commit()
return result
def get_zone_history(self, zone_id: str,
hours: int = 24) -> list[dict]:
"""Zone occupancy change history"""
rows = self.db.execute('''
SELECT timestamp, pallet_count, occupancy_pct
FROM inventory_snapshots
WHERE zone_id = ?
AND timestamp > datetime('now', ?)
ORDER BY timestamp DESC
''', (zone_id, f'-{hours} hours')).fetchall()
return [
{'timestamp': r[0], 'pallet_count': r[1], 'occupancy': r[2]}
for r in rows
]
Drone System for Tall Racks
class DroneInventorySystem:
"""
Autonomous rack traversal with camera:
1. Route planning based on known warehouse map
2. Image capture of each slot
3. OCR of barcodes and QR codes
4. Verification with WMS (Warehouse Management System)
"""
def __init__(self, barcode_reader, wms_client,
rack_detector: YOLO):
self.barcode_reader = barcode_reader
self.wms = wms_client
self.rack_detector = rack_detector
def inventory_rack(self, rack_images: list[dict]) -> dict:
"""
rack_images: [{'image': np.ndarray, 'rack_id': str, 'shelf': int, 'slot': int}]
Analyze all photos of a specific rack.
"""
rack_inventory = []
discrepancies = []
for item in rack_images:
image = item['image']
location = f"{item['rack_id']}-{item['shelf']}-{item['slot']}"
# Detect slot occupancy
results = self.rack_detector(image, conf=0.5)
is_occupied = len(results[0].boxes) > 0
# OCR barcode/QR code
barcodes = self._read_barcodes(image)
slot_data = {
'location': location,
'occupied': is_occupied,
'barcodes': barcodes,
'sku_ids': [self._barcode_to_sku(b) for b in barcodes]
}
rack_inventory.append(slot_data)
# Verify with WMS
if barcodes:
for sku_id in slot_data['sku_ids']:
wms_expected = self.wms.get_expected_sku(location)
if sku_id != wms_expected:
discrepancies.append({
'location': location,
'found_sku': sku_id,
'expected_sku': wms_expected,
'discrepancy_type': 'wrong_sku' if wms_expected else 'unexpected_item'
})
return {
'total_slots_checked': len(rack_inventory),
'occupied_slots': sum(1 for s in rack_inventory if s['occupied']),
'empty_slots': sum(1 for s in rack_inventory if not s['occupied']),
'barcodes_read': sum(len(s['barcodes']) for s in rack_inventory),
'discrepancies': discrepancies,
'discrepancy_rate': len(discrepancies) / len(rack_inventory) if rack_inventory else 0,
'inventory': rack_inventory
}
def _read_barcodes(self, image: np.ndarray) -> list[str]:
"""Read barcodes via pyzbar + QR via cv2"""
from pyzbar import pyzbar
decoded = pyzbar.decode(image)
barcodes = [d.data.decode('utf-8') for d in decoded]
# Try QR via OpenCV
qr = cv2.QRCodeDetector()
data, _, _ = qr.detectAndDecode(image)
if data:
barcodes.append(data)
return barcodes
def _barcode_to_sku(self, barcode: str) -> str:
"""Barcode → SKU mapping via WMS"""
return self.wms.lookup_barcode(barcode) or barcode
Inventory Report Generation
def generate_inventory_report(inventory_data: list[dict],
wms_data: dict,
output_format: str = 'json') -> dict:
"""Summary report with discrepancies for accounting/logistics"""
total_items = sum(s.get('quantity', 1) for s in inventory_data)
occupied = sum(1 for s in inventory_data if s.get('occupied'))
all_discrepancies = []
for slot in inventory_data:
for disc in slot.get('discrepancies', []):
all_discrepancies.append({**disc, 'location': slot['location']})
return {
'report_date': datetime.now().isoformat(),
'summary': {
'total_slots_scanned': len(inventory_data),
'occupied': occupied,
'empty': len(inventory_data) - occupied,
'occupancy_rate_pct': occupied / len(inventory_data) * 100,
'total_items': total_items,
'discrepancy_count': len(all_discrepancies)
},
'discrepancies': all_discrepancies,
'accuracy_rate': 1 - len(all_discrepancies) / max(len(inventory_data), 1)
}
| Method |
Counting Accuracy |
Speed |
Coverage |
| Fixed cameras |
95–98% |
Realtime |
Camera views |
| Mobile robot-AGV |
97–99% |
2–4 km/h |
Entire floor |
| Drone quadcopter |
96–99% |
5–8 km/h |
3D, tall racks |
| RFID (no CV) |
99%+ |
Realtime |
RFID tags only |
| Task |
Timeline |
| Pallet counting based on existing cameras |
4–6 weeks |
| Drone rack inventory + WMS integration |
12–20 weeks |
| Full autonomous inventory system (AGV + AI) |
24–40 weeks |