Implementing Product Import Queue (Background Processing)
Synchronous import of thousands of products — timeout, hung process, and broken user experience. Right solution: accept file, queue task, return response immediately. Processing happens in background, progress shown in interface.
Architecture
HTTP Upload (file/URL)
↓
Import Job (queue entry)
↓
Queue (Redis / SQS / RabbitMQ)
↓
Worker Process (separate process/container)
↓
Chunk Processing (batches of 500 products)
↓
Database (upsert)
↓
Progress Event (WebSocket / SSE → UI)
Laravel Implementation
// Controller — accepts file and queues task
class ProductImportController extends Controller
{
public function upload(Request $request): JsonResponse
{
$path = $request->file('file')->store('imports');
$import = ImportJob::create([
'file_path' => $path,
'status' => 'pending',
'total' => 0,
'processed' => 0,
'errors' => 0,
]);
ProcessProductImport::dispatch($import->id);
return response()->json(['import_id' => $import->id]);
}
}
// Job — background processing
class ProcessProductImport implements ShouldQueue
{
use Dispatchable, InteractsWithQueue;
public int $timeout = 3600; // 1 hour
public int $tries = 3;
public function handle(): void
{
$import = ImportJob::findOrFail($this->importId);
$import->update(['status' => 'processing', 'started_at' => now()]);
$reader = new CsvReader(storage_path('app/' . $import->file_path));
$total = $reader->count();
$import->update(['total' => $total]);
foreach ($reader->chunk(500) as $chunkIndex => $rows) {
try {
DB::transaction(function () use ($rows) {
foreach ($rows as $row) {
Product::updateOrCreate(
['sku' => $row['sku']],
$this->mapRow($row)
);
}
});
$processed = ($chunkIndex + 1) * 500;
$import->update(['processed' => min($processed, $total)]);
// Progress event
event(new ImportProgressUpdated($import->id, min($processed, $total), $total));
} catch (\Exception $e) {
$import->increment('errors');
Log::error("Import chunk failed", ['chunk' => $chunkIndex, 'error' => $e->getMessage()]);
}
}
$import->update(['status' => 'completed', 'finished_at' => now()]);
}
}
WebSocket / SSE for Progress
// Laravel Broadcasting: progress event
class ImportProgressUpdated implements ShouldBroadcast
{
public function broadcastOn(): Channel
{
return new PrivateChannel("import.{$this->importId}");
}
public function broadcastWith(): array
{
return [
'processed' => $this->processed,
'total' => $this->total,
'percent' => round($this->processed / $this->total * 100),
];
}
}
On frontend — subscribe via Laravel Echo or native EventSource (SSE).
Parallel Processing
For very large files (100,000+ products) — split into independent parts:
// Dispatch Fan-out: one coordinator → N workers
class DispatchImportChunks implements ShouldQueue
{
public function handle(): void
{
$chunks = $this->splitFile($this->filePath, chunkSize: 1000);
Bus::batch(
array_map(fn($chunk) => new ProcessImportChunk($chunk), $chunks)
)
->then(fn(Batch $batch) => $this->onComplete($batch))
->catch(fn(Batch $batch, Throwable $e) => $this->onError($batch, $e))
->dispatch();
}
}
Bus::batch() in Laravel processes chunks in parallel through multiple workers and tracks overall progress.
Error Handling
Error rows don't interrupt import but are saved to import_errors table:
CREATE TABLE import_errors (
id BIGSERIAL PRIMARY KEY,
import_id BIGINT,
row_number INT,
row_data JSONB,
error_msg TEXT,
created_at TIMESTAMPTZ DEFAULT NOW()
);
After completion, user downloads CSV with errors for correction.
Timeline
Product import queue system with real-time progress and error logging: 4–6 business days.







