Data Integrity Check Implementation After Migration
Data integrity verification is a mandatory step after any migration. Without it, you cannot know whether data was transferred completely and correctly.
What is Checked
- Number of records by each content type
- Checksums of critical fields
- Referential integrity (foreign keys, parent-child relationships)
- Presence of required fields
- URL correctness and page accessibility
- SEO metadata
Quantitative Verification
class MigrationValidator:
def __init__(self, source_db, target_db):
self.source = source_db
self.target = target_db
self.results = []
def check_counts(self):
tables = [
('posts', 'articles', "status='publish'", "status='published'"),
('users', 'users', None, None),
('comments', 'comments', "approved=1", "status='approved'"),
('categories', 'categories', None, None),
]
for src_table, tgt_table, src_where, tgt_where in tables:
src_count = self.source.count(src_table, src_where)
tgt_count = self.target.count(tgt_table, tgt_where)
status = 'OK' if src_count == tgt_count else 'MISMATCH'
self.results.append({
'check': f'count_{src_table}',
'status': status,
'source': src_count,
'target': tgt_count,
'diff': tgt_count - src_count
})
Checksum Verification
def checksum_check(source_db, target_db):
"""Compare checksums of critical fields"""
# PostgreSQL
source_hash = source_db.query_one("""
SELECT md5(string_agg(
md5(id::text || coalesce(email,'') || coalesce(slug,'')),
',' ORDER BY id
)) as hash
FROM articles
WHERE status = 'published'
""")
target_hash = target_db.query_one("""
SELECT md5(string_agg(
md5(legacy_id || coalesce(email,'') || coalesce(slug,'')),
',' ORDER BY CAST(legacy_id AS INTEGER)
)) as hash
FROM articles
WHERE status = 'published'
""")
return source_hash == target_hash
Referential Integrity Check
def check_referential_integrity(target_db):
issues = []
# Articles without author
orphaned_posts = target_db.query("""
SELECT a.id, a.title FROM articles a
LEFT JOIN users u ON a.author_id = u.id
WHERE a.author_id IS NOT NULL AND u.id IS NULL
""")
if orphaned_posts:
issues.append(f"Articles without valid author: {len(orphaned_posts)}")
# Comments to non-existent posts
orphaned_comments = target_db.query("""
SELECT c.id FROM comments c
LEFT JOIN articles a ON c.post_id = a.id
WHERE a.id IS NULL
""")
if orphaned_comments:
issues.append(f"Orphaned comments: {len(orphaned_comments)}")
# Child comments without parent
broken_threads = target_db.query("""
SELECT c.id FROM comments c
LEFT JOIN comments p ON c.parent_id = p.id
WHERE c.parent_id IS NOT NULL AND p.id IS NULL
""")
if broken_threads:
issues.append(f"Comments with missing parent: {len(broken_threads)}")
return issues
URL Accessibility Check
import asyncio
import aiohttp
async def check_urls(urls, base_url, concurrency=20):
errors = {'404': [], '500': [], 'redirect_chain': []}
semaphore = asyncio.Semaphore(concurrency)
async def check_one(session, path):
async with semaphore:
url = f"{base_url}{path}"
try:
async with session.get(url, allow_redirects=True) as resp:
if resp.status == 404:
errors['404'].append(path)
elif resp.status >= 500:
errors['500'].append(path)
elif len(resp.history) > 2:
errors['redirect_chain'].append(f"{path} ({len(resp.history)} redirects)")
except Exception as e:
errors['500'].append(f"{path} (error: {e})")
async with aiohttp.ClientSession() as session:
tasks = [check_one(session, url) for url in urls]
await asyncio.gather(*tasks)
return errors
# Run
urls_to_check = get_all_published_urls(target_db)
results = asyncio.run(check_urls(urls_to_check, 'https://new-site.com'))
SEO Metadata Check
def check_seo_completeness(target_db):
issues = []
# Pages without title
no_title = target_db.query("""
SELECT slug FROM articles
WHERE (seo_title IS NULL OR seo_title = '')
AND status = 'published'
""")
if no_title:
issues.append(f"Pages without SEO title: {len(no_title)}")
# Pages without meta description
no_desc = target_db.query("""
SELECT slug FROM articles
WHERE (seo_description IS NULL OR seo_description = '')
AND status = 'published'
""")
if no_desc:
issues.append(f"Pages without meta description: {len(no_desc)}")
# Duplicate titles
dup_titles = target_db.query("""
SELECT seo_title, COUNT(*) as count FROM articles
WHERE status = 'published'
GROUP BY seo_title
HAVING COUNT(*) > 1
""")
if dup_titles:
issues.append(f"Duplicate SEO titles: {len(dup_titles)} groups")
return issues
Verification Report
def generate_report(validator):
validator.check_counts()
validator.check_seo()
validator.check_integrity()
print("\n=== MIGRATION VALIDATION REPORT ===\n")
ok = [r for r in validator.results if r['status'] == 'OK']
fail = [r for r in validator.results if r['status'] != 'OK']
print(f"✓ Passed: {len(ok)}")
print(f"✗ Failed: {len(fail)}\n")
if fail:
print("FAILURES:")
for r in fail:
print(f" [{r['status']}] {r['check']}: source={r.get('source')}, target={r.get('target')}")
print("\nRECOMMENDATION:", "OK to proceed" if not fail else "DO NOT proceed — fix issues first")
Timeline
Development of integrity check suite + automatic report — 1–2 working days.







