Incremental Product Import (Changes Only)

Our company is engaged in the development, support and maintenance of sites of any complexity. From simple one-page sites to large-scale cluster systems built on micro services. Experience of developers is confirmed by certificates from vendors.
Development and maintenance of all types of websites:
Informational websites or web applications
Business card websites, landing pages, corporate websites, online catalogs, quizzes, promo websites, blogs, news resources, informational portals, forums, aggregators
E-commerce websites or web applications
Online stores, B2B portals, marketplaces, online exchanges, cashback websites, exchanges, dropshipping platforms, product parsers
Business process management web applications
CRM systems, ERP systems, corporate portals, production management systems, information parsers
Electronic service websites or web applications
Classified ads platforms, online schools, online cinemas, website builders, portals for electronic services, video hosting platforms, thematic portals

These are just some of the technical types of websites we work with, and each of them can have its own specific features and functionality, as well as be customized to meet the specific needs and goals of the client.

Showing 1 of 1 servicesAll 2065 services
Incremental Product Import (Changes Only)
Medium
~3-5 business days
FAQ
Our competencies:
Development stages
Latest works
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    822
  • image_crm_chasseurs_493_0.webp
    CRM development for Chasseurs
    847
  • image_website-sbh_0.png
    Website development for SBH Partners
    999
  • image_website-_0.png
    Website development for Red Pear
    451

Implementation of Incremental Product Import (Changes Only)

Full catalog reload every time is wasteful. With 100,000 items, 1–5% of data changes per hour, yet the system processes all 100,000. Incremental import solves this: only products that actually changed are transmitted and processed.

Strategies for Detecting Changes

1. Timestamp updated_at

Supplier supports filtering by change date — most common approach:

GET /api/products?updated_after=2024-01-15T10:00:00Z&page=1&per_page=200

System remembers the time of last successful sync and passes it on next request.

2. Cursor / change log

Supplier maintains change log with increasing ID:

GET /api/changes?since_id=48291&limit=500

More reliable than timestamp: doesn't miss changes that occurred during processing.

3. Hash Comparison

When source doesn't support filtering by changes — download full file, but process only changed items:

$hash = md5(serialize([
    $row['price'], $row['qty'], $row['name'], $row['description']
]));

Row is processed only if hash changed.

4. Diff Files

Supplier publishes hourly diff-file instead of full price:

<changes>
  <updated id="SKU-123"><price>4990</price><qty>15</qty></updated>
  <updated id="SKU-456"><qty>0</qty></updated>
  <deleted id="SKU-789"/>
  <created id="SKU-999"><!-- full data --></created>
</changes>

State Tracker Implementation

Sync state stored in DB:

CREATE TABLE import_sync_state (
    source_id       int PRIMARY KEY REFERENCES import_sources(id),
    last_sync_at    timestamptz,
    last_cursor     varchar(200),   -- for cursor-based
    last_change_id  bigint,         -- for changelog-based
    items_synced    bigint DEFAULT 0,
    updated_at      timestamptz DEFAULT now()
);
class SyncStateManager
{
    public function getLastSyncAt(int $sourceId): ?\DateTimeInterface
    {
        return ImportSyncState::find($sourceId)?->last_sync_at;
    }

    public function markSyncStarted(int $sourceId): void
    {
        // Remember time of sync START, not END
        // Critical: new changes may appear during processing
        Cache::put("sync_start_{$sourceId}", now(), 3600);
    }

    public function markSyncCompleted(int $sourceId): void
    {
        ImportSyncState::updateOrCreate(
            ['source_id' => $sourceId],
            ['last_sync_at' => Cache::get("sync_start_{$sourceId}")]
        );
    }
}

Important: record the start time of sync, not the end. If new changes appeared during processing — they'll fall into next cycle.

Incremental Import Pipeline

class IncrementalImportJob implements ShouldQueue
{
    public function handle(
        SyncStateManager       $state,
        SupplierApiClient      $client,
        IncrementalProductSync $sync,
    ): void {
        $since = $state->getLastSyncAt($this->sourceId);
        $state->markSyncStarted($this->sourceId);

        $stats = ['created' => 0, 'updated' => 0, 'deleted' => 0, 'skipped' => 0];

        foreach ($client->fetchUpdatedSince($since) as $item) {
            $result = $sync->process($item, $this->sourceId);
            $stats[$result]++;
        }

        $state->markSyncCompleted($this->sourceId);
        $this->logResult($stats);
    }
}

Determining Change Type

class IncrementalProductSync
{
    public function process(array $item, int $sourceId): string
    {
        // Deleted items
        if ($item['deleted'] ?? false) {
            Product::where('sku', $item['sku'])
                ->where('source_id', $sourceId)
                ->update(['deleted_at' => now()]);
            return 'deleted';
        }

        $product = Product::where('sku', $item['sku'])
            ->where('source_id', $sourceId)
            ->first();

        if (!$product) {
            // New product
            Product::create($this->buildProductData($item, $sourceId));
            return 'created';
        }

        // Check hash — update only if something changed
        $newHash = $this->computeHash($item);
        if ($product->content_hash === $newHash) {
            return 'skipped';
        }

        $product->update($this->buildProductData($item, $sourceId) + [
            'content_hash' => $newHash,
        ]);
        return 'updated';
    }

    private function computeHash(array $item): string
    {
        return md5(json_encode([
            $item['price'] ?? null,
            $item['qty']   ?? null,
            $item['name']  ?? null,
        ]));
    }
}

Hash Comparison When Delta-API is Absent

When source doesn't support filtering — download full file, but process only changes:

class HashBasedDeltaProcessor
{
    public function process(iterable $allItems, int $sourceId): DeltaResult
    {
        // Load all current hashes from DB (one query)
        $storedHashes = Product::where('source_id', $sourceId)
            ->pluck('content_hash', 'sku')
            ->all();

        $toCreate = $toUpdate = $unchanged = 0;
        $createBatch = $updateBatch = [];

        foreach ($allItems as $item) {
            $newHash = $this->computeHash($item);
            $sku     = $item['sku'];

            if (!isset($storedHashes[$sku])) {
                $createBatch[] = $item;
                $toCreate++;
            } elseif ($storedHashes[$sku] !== $newHash) {
                $updateBatch[] = $item;
                $toUpdate++;
            } else {
                $unchanged++;
            }
        }

        // Batch insert/update only changed items
        if ($createBatch) Product::upsert($this->prepareRows($createBatch, $sourceId), ['sku'], [...]);
        if ($updateBatch) Product::upsert($this->prepareRows($updateBatch, $sourceId), ['sku'], [...]);

        return new DeltaResult($toCreate, $toUpdate, $unchanged);
    }
}

Detection of Deleted Items

If source doesn't send explicit deletion signals — use "anti-join" approach:

public function detectDeleted(array $currentSkus, int $sourceId): int
{
    // SKUs that were in previous import but missing in current one
    $deletedCount = Product::where('source_id', $sourceId)
        ->whereNotIn('sku', $currentSkus)
        ->whereNull('deleted_at')
        ->update(['deleted_at' => now()]);

    return $deletedCount;
}

For 100,000 SKUs WHERE sku NOT IN (...) is inefficient. Better to use temp table:

CREATE TEMP TABLE current_import_skus (sku varchar(100));
COPY current_import_skus FROM STDIN;
-- load all SKUs of current import

UPDATE products
SET deleted_at = now()
WHERE source_id = $1
  AND deleted_at IS NULL
  AND sku NOT IN (SELECT sku FROM current_import_skus);

DROP TABLE current_import_skus;

Concurrency: Protection from Double Run

public function handle(): void
{
    $lock = Cache::lock("import_sync_{$this->sourceId}", 3600);

    if (!$lock->get()) {
        Log::info("Import for source {$this->sourceId} already running, skipping");
        return;
    }

    try {
        $this->runSync();
    } finally {
        $lock->release();
    }
}

Implementation Timeline

  • Timestamp-based increment, state manager, hash comparison — 2 days
  • Detection of deleted items (temp table), lock against double run — +1 day
  • Cursor-based sync + diff-files + delta reporting — +1–2 days