Implementation of Incremental Product Import (Changes Only)
Full catalog reload every time is wasteful. With 100,000 items, 1–5% of data changes per hour, yet the system processes all 100,000. Incremental import solves this: only products that actually changed are transmitted and processed.
Strategies for Detecting Changes
1. Timestamp updated_at
Supplier supports filtering by change date — most common approach:
GET /api/products?updated_after=2024-01-15T10:00:00Z&page=1&per_page=200
System remembers the time of last successful sync and passes it on next request.
2. Cursor / change log
Supplier maintains change log with increasing ID:
GET /api/changes?since_id=48291&limit=500
More reliable than timestamp: doesn't miss changes that occurred during processing.
3. Hash Comparison
When source doesn't support filtering by changes — download full file, but process only changed items:
$hash = md5(serialize([
$row['price'], $row['qty'], $row['name'], $row['description']
]));
Row is processed only if hash changed.
4. Diff Files
Supplier publishes hourly diff-file instead of full price:
<changes>
<updated id="SKU-123"><price>4990</price><qty>15</qty></updated>
<updated id="SKU-456"><qty>0</qty></updated>
<deleted id="SKU-789"/>
<created id="SKU-999"><!-- full data --></created>
</changes>
State Tracker Implementation
Sync state stored in DB:
CREATE TABLE import_sync_state (
source_id int PRIMARY KEY REFERENCES import_sources(id),
last_sync_at timestamptz,
last_cursor varchar(200), -- for cursor-based
last_change_id bigint, -- for changelog-based
items_synced bigint DEFAULT 0,
updated_at timestamptz DEFAULT now()
);
class SyncStateManager
{
public function getLastSyncAt(int $sourceId): ?\DateTimeInterface
{
return ImportSyncState::find($sourceId)?->last_sync_at;
}
public function markSyncStarted(int $sourceId): void
{
// Remember time of sync START, not END
// Critical: new changes may appear during processing
Cache::put("sync_start_{$sourceId}", now(), 3600);
}
public function markSyncCompleted(int $sourceId): void
{
ImportSyncState::updateOrCreate(
['source_id' => $sourceId],
['last_sync_at' => Cache::get("sync_start_{$sourceId}")]
);
}
}
Important: record the start time of sync, not the end. If new changes appeared during processing — they'll fall into next cycle.
Incremental Import Pipeline
class IncrementalImportJob implements ShouldQueue
{
public function handle(
SyncStateManager $state,
SupplierApiClient $client,
IncrementalProductSync $sync,
): void {
$since = $state->getLastSyncAt($this->sourceId);
$state->markSyncStarted($this->sourceId);
$stats = ['created' => 0, 'updated' => 0, 'deleted' => 0, 'skipped' => 0];
foreach ($client->fetchUpdatedSince($since) as $item) {
$result = $sync->process($item, $this->sourceId);
$stats[$result]++;
}
$state->markSyncCompleted($this->sourceId);
$this->logResult($stats);
}
}
Determining Change Type
class IncrementalProductSync
{
public function process(array $item, int $sourceId): string
{
// Deleted items
if ($item['deleted'] ?? false) {
Product::where('sku', $item['sku'])
->where('source_id', $sourceId)
->update(['deleted_at' => now()]);
return 'deleted';
}
$product = Product::where('sku', $item['sku'])
->where('source_id', $sourceId)
->first();
if (!$product) {
// New product
Product::create($this->buildProductData($item, $sourceId));
return 'created';
}
// Check hash — update only if something changed
$newHash = $this->computeHash($item);
if ($product->content_hash === $newHash) {
return 'skipped';
}
$product->update($this->buildProductData($item, $sourceId) + [
'content_hash' => $newHash,
]);
return 'updated';
}
private function computeHash(array $item): string
{
return md5(json_encode([
$item['price'] ?? null,
$item['qty'] ?? null,
$item['name'] ?? null,
]));
}
}
Hash Comparison When Delta-API is Absent
When source doesn't support filtering — download full file, but process only changes:
class HashBasedDeltaProcessor
{
public function process(iterable $allItems, int $sourceId): DeltaResult
{
// Load all current hashes from DB (one query)
$storedHashes = Product::where('source_id', $sourceId)
->pluck('content_hash', 'sku')
->all();
$toCreate = $toUpdate = $unchanged = 0;
$createBatch = $updateBatch = [];
foreach ($allItems as $item) {
$newHash = $this->computeHash($item);
$sku = $item['sku'];
if (!isset($storedHashes[$sku])) {
$createBatch[] = $item;
$toCreate++;
} elseif ($storedHashes[$sku] !== $newHash) {
$updateBatch[] = $item;
$toUpdate++;
} else {
$unchanged++;
}
}
// Batch insert/update only changed items
if ($createBatch) Product::upsert($this->prepareRows($createBatch, $sourceId), ['sku'], [...]);
if ($updateBatch) Product::upsert($this->prepareRows($updateBatch, $sourceId), ['sku'], [...]);
return new DeltaResult($toCreate, $toUpdate, $unchanged);
}
}
Detection of Deleted Items
If source doesn't send explicit deletion signals — use "anti-join" approach:
public function detectDeleted(array $currentSkus, int $sourceId): int
{
// SKUs that were in previous import but missing in current one
$deletedCount = Product::where('source_id', $sourceId)
->whereNotIn('sku', $currentSkus)
->whereNull('deleted_at')
->update(['deleted_at' => now()]);
return $deletedCount;
}
For 100,000 SKUs WHERE sku NOT IN (...) is inefficient. Better to use temp table:
CREATE TEMP TABLE current_import_skus (sku varchar(100));
COPY current_import_skus FROM STDIN;
-- load all SKUs of current import
UPDATE products
SET deleted_at = now()
WHERE source_id = $1
AND deleted_at IS NULL
AND sku NOT IN (SELECT sku FROM current_import_skus);
DROP TABLE current_import_skus;
Concurrency: Protection from Double Run
public function handle(): void
{
$lock = Cache::lock("import_sync_{$this->sourceId}", 3600);
if (!$lock->get()) {
Log::info("Import for source {$this->sourceId} already running, skipping");
return;
}
try {
$this->runSync();
} finally {
$lock->release();
}
}
Implementation Timeline
- Timestamp-based increment, state manager, hash comparison — 2 days
- Detection of deleted items (temp table), lock against double run — +1 day
- Cursor-based sync + diff-files + delta reporting — +1–2 days







