Multi-Supplier Product Import Implementation
When a catalog is formed from three or more suppliers simultaneously, simple file uploads stop working. Conflicts arise—identical SKUs with different prices, duplicates with different IDs, suppliers with different data formats. Multi-supplier importing requires a unified pipeline with explicit conflict resolution rules.
Pipeline Architecture
Supplier A (XML) ─┐
Supplier B (CSV) ─┤─► Normalizer ─► Deduplicator ─► Merger ─► Catalog DB
Supplier C (API) ─┘
Each supplier is a separate adapter that outputs normalized DTO on exit. After normalization, data passes through a single processing pipeline.
Data Model
The key decision is to store supplier source data separately from the final product card.
-- Source data from suppliers (raw)
CREATE TABLE supplier_products (
id BIGSERIAL PRIMARY KEY,
supplier_id INT NOT NULL REFERENCES suppliers(id),
external_id VARCHAR(255) NOT NULL, -- ID in supplier's system
sku VARCHAR(255),
barcode VARCHAR(50),
name TEXT NOT NULL,
price NUMERIC(12,2),
stock INT DEFAULT 0,
attributes JSONB DEFAULT '{}',
raw_data JSONB, -- original document in full
imported_at TIMESTAMP NOT NULL,
hash VARCHAR(64), -- SHA256 content hash for change detection
UNIQUE(supplier_id, external_id)
);
-- Final product cards
CREATE TABLE products (
id BIGSERIAL PRIMARY KEY,
master_sku VARCHAR(255) UNIQUE,
name TEXT,
price NUMERIC(12,2),
stock INT,
primary_supplier_id INT REFERENCES suppliers(id),
merged_from INT[], -- supplier_product_id[]
updated_at TIMESTAMP
);
Supplier Adapters
Each adapter implements a common interface:
interface SupplierAdapterInterface
{
public function fetch(): Generator; // yields SupplierProductDTO
public function getSupplierId(): int;
}
class SupplierProductDTO
{
public function __construct(
public readonly string $externalId,
public readonly string $name,
public readonly float $price,
public readonly int $stock,
public readonly ?string $sku = null,
public readonly ?string $barcode = null,
public readonly array $attributes = [],
) {}
}
XML Supplier Adapter:
class XmlSupplierAdapter implements SupplierAdapterInterface
{
public function __construct(
private readonly int $supplierId,
private readonly string $feedUrl,
) {}
public function fetch(): Generator
{
$reader = new XMLReader();
$reader->open($this->feedUrl);
while ($reader->read()) {
if ($reader->nodeType === XMLReader::ELEMENT && $reader->name === 'item') {
$node = new SimpleXMLElement($reader->readOuterXML());
yield new SupplierProductDTO(
externalId: (string) $node->id,
name: (string) $node->name,
price: (float) $node->price,
stock: (int) $node->quantity,
sku: (string) $node->article ?: null,
barcode: (string) $node->barcode ?: null,
);
}
}
}
public function getSupplierId(): int
{
return $this->supplierId;
}
}
CSV Adapter:
class CsvSupplierAdapter implements SupplierAdapterInterface
{
public function fetch(): Generator
{
$handle = fopen($this->filePath, 'r');
$headers = fgetcsv($handle, 0, ';');
$headers = array_map('trim', $headers);
while (($row = fgetcsv($handle, 0, ';')) !== false) {
$data = array_combine($headers, $row);
yield new SupplierProductDTO(
externalId: $data['ID'],
name: $data['Name'],
price: (float) str_replace(',', '.', $data['Price']),
stock: (int) $data['Stock'],
sku: $data['SKU'] ?? null,
barcode: $data['Barcode'] ?? null,
);
}
fclose($handle);
}
}
Import Service
class SupplierImportService
{
public function import(SupplierAdapterInterface $adapter): ImportResult
{
$supplierId = $adapter->getSupplierId();
$result = new ImportResult();
DB::transaction(function () use ($adapter, $supplierId, $result) {
foreach ($adapter->fetch() as $dto) {
$hash = hash('sha256', serialize($dto));
$existing = SupplierProduct::where([
'supplier_id' => $supplierId,
'external_id' => $dto->externalId,
])->first();
if ($existing && $existing->hash === $hash) {
$result->skipped++;
continue; // Data has not changed
}
SupplierProduct::updateOrCreate(
['supplier_id' => $supplierId, 'external_id' => $dto->externalId],
[
'name' => $dto->name,
'price' => $dto->price,
'stock' => $dto->stock,
'sku' => $dto->sku,
'barcode' => $dto->barcode,
'attributes' => $dto->attributes,
'imported_at' => now(),
'hash' => $hash,
]
);
$result->upserted++;
}
// Mark products that disappeared from the latest export
SupplierProduct::where('supplier_id', $supplierId)
->where('imported_at', '<', now()->subMinutes(30))
->update(['stock' => 0]);
});
return $result;
}
}
Queues and Parallel Import
Each supplier runs as a separate job:
class ImportSupplierJob implements ShouldQueue
{
public int $timeout = 1800; // 30 minutes
public int $tries = 3;
public function handle(SupplierImportService $service): void
{
$adapter = SupplierAdapterFactory::make($this->supplier);
$result = $service->import($adapter);
Log::info("Supplier {$this->supplier->name} imported", $result->toArray());
// Run merge after all active suppliers are imported
if ($this->isLastActiveImport()) {
MergeProductsJob::dispatch();
}
}
}
Scheduled execution:
$schedule->job(new ImportSupplierJob($supplierA))->everyTwoHours();
$schedule->job(new ImportSupplierJob($supplierB))->everyTwoHours()->delay(5);
$schedule->job(new ImportSupplierJob($supplierC))->everyFourHours();
Monitoring and Alerts
Important metrics to track:
- Number of imported/skipped/failed records
- Percentage of products where price changed by ±20% or more—likely supplier error
- Import execution time—growth indicates catalog expansion or source degradation
if ($result->priceAnomalies > $result->upserted * 0.05) {
Notification::send($admins, new SupplierPriceAnomalyAlert($supplier, $result));
}
Timeline
- Basic data model + 2 adapters (XML + CSV): 2 days
- Each additional adapter: +0.5–1 day
- REST API supplier adapter: +1–2 days
- Queues, logging, alerts: +1 day
- Merge rules and prioritization: +1–2 days (covered in separate service)
Typical project with three suppliers and basic merge: 5–7 working days.







