Implementing Storage and Processing of Scraped Data
Saving data to CSV is not a storage system. An industrial system must support incremental updates, change versioning, full-text search, and efficient filtering queries.
PostgreSQL Storage Schema
-- Main table with change history
CREATE TABLE scraped_items (
id BIGSERIAL PRIMARY KEY,
source_id INTEGER REFERENCES sources(id),
external_id TEXT NOT NULL, -- ID on the source side
url TEXT NOT NULL,
data JSONB NOT NULL, -- flexible schema for different sources
data_hash CHAR(64) NOT NULL, -- SHA-256 of data for change detection
first_seen TIMESTAMPTZ DEFAULT NOW(),
last_seen TIMESTAMPTZ DEFAULT NOW(),
changed_at TIMESTAMPTZ,
UNIQUE (source_id, external_id)
);
-- Change history
CREATE TABLE scraped_items_history (
id BIGSERIAL PRIMARY KEY,
item_id BIGINT REFERENCES scraped_items(id),
data JSONB NOT NULL,
recorded_at TIMESTAMPTZ DEFAULT NOW()
);
-- Indexes
CREATE INDEX ON scraped_items USING GIN (data); -- JSONB search
CREATE INDEX ON scraped_items (source_id, last_seen);
CREATE INDEX ON scraped_items USING GIN (
to_tsvector('english', data->>'title' || ' ' || COALESCE(data->>'description', ''))
);
Update Logic
def upsert_item(source_id, external_id, url, data):
data_hash = hashlib.sha256(
json.dumps(data, sort_keys=True).encode()
).hexdigest()
existing = db.query(
'SELECT id, data_hash FROM scraped_items WHERE source_id=%s AND external_id=%s',
(source_id, external_id)
).fetchone()
if existing is None:
# new item
db.execute(
'INSERT INTO scraped_items (source_id, external_id, url, data, data_hash) '
'VALUES (%s, %s, %s, %s, %s)',
(source_id, external_id, url, json.dumps(data), data_hash)
)
elif existing['data_hash'] != data_hash:
# data changed — save to history
db.execute(
'INSERT INTO scraped_items_history (item_id, data) '
'SELECT id, data FROM scraped_items WHERE id=%s',
(existing['id'],)
)
db.execute(
'UPDATE scraped_items SET data=%s, data_hash=%s, last_seen=NOW(), changed_at=NOW() '
'WHERE id=%s',
(json.dumps(data), data_hash, existing['id'])
)
else:
# data unchanged — update only last_seen
db.execute(
'UPDATE scraped_items SET last_seen=NOW() WHERE id=%s',
(existing['id'],)
)
Post-Processing Pipeline
After saving raw data, processing pipelines are executed:
- Normalization — convert prices to single currency, phone numbers to E.164 format, addresses to standard format
- Enrichment — geocode addresses, determine category via classifier
- Aggregation — calculate statistics by source: average price, category distribution
Archiving and TTL
Old data (not seen for more than 90 days) is moved to archive status or deleted — depends on requirements. Change history is kept longer than main data.
Export
-
CSV/XLSX — via
pandas.to_excel()orcsv.DictWriter - REST API — FastAPI/Laravel with filtering, pagination, sorting
- Webhook — send new/changed records to external system in real time
Timeline for implementing storage system with change history and API: 4–6 business days.







