Implementing Data Migration Between DB Types (PostgreSQL ↔ MySQL ↔ MongoDB)
Transitioning between different DBMS systems is one of most complex migration types. Differences in data types, NULL behavior, auto-increments, JSON support and transaction semantics require careful mapping and testing.
Common Scenarios
- MySQL → PostgreSQL (escape vendor lock-in, better JSON/JSONB support, window functions)
- MongoDB → PostgreSQL (normalize data, ACID transactions)
- PostgreSQL → MongoDB (sharding, flexible schema for certain entities)
- MySQL → MySQL different major version (via dump/restore with transformation)
MySQL → PostgreSQL
Tool: pgloader
# Installation
apt install pgloader
# Basic migration
pgloader mysql://user:pass@mysql-host/myapp \
postgresql://user:pass@pg-host/myapp
pgloader automatically:
- Converts data types
- Migrates indexes and primary keys
- Transfers foreign keys
- Handles
AUTO_INCREMENT→SERIAL/BIGSERIAL
Custom pgloader configuration:
LOAD DATABASE
FROM mysql://user:pass@mysql-host/myapp
INTO postgresql://user:pass@pg-host/myapp
WITH include no drop,
create tables,
create indexes,
reset sequences
SET work_mem to '256MB',
maintenance_work_mem to '512MB'
CAST type datetime to timestamptz using midnight-in-utc,
type tinyint(1) to boolean using tinyint-to-boolean,
type enum to text,
column orders.status to text
ALTER SCHEMA 'myapp' RENAME TO 'public'
EXCLUDING TABLE NAMES MATCHING 'cache_*', 'sessions'
;
Issues with MySQL → PostgreSQL
| Issue | MySQL | PostgreSQL | Solution |
|---|---|---|---|
| Case sensitivity | user = User |
user ≠ User |
Normalize case |
| ENUM | Native type | No native | Convert to text + CHECK |
| Datetime | No timezone | timestamptz | Specify timezone explicitly |
| GROUP BY | Flexible | Strict (ONLY_FULL_GROUP_BY) | Rewrite queries |
| Zero values | 0000-00-00 |
Not supported | Convert to NULL |
| Backtick quotes | Allowed | Not allowed | Replace with " |
MongoDB → PostgreSQL
ETL Approach with Python
from pymongo import MongoClient
import psycopg2
from psycopg2.extras import execute_batch
import json
mongo = MongoClient('mongodb://localhost:27017')
pg = psycopg2.connect('host=pg-host dbname=myapp user=app')
# Source MongoDB collection
source = mongo.myapp.users
cursor = pg.cursor()
batch = []
for doc in source.find():
batch.append((
str(doc['_id']),
doc.get('email'),
doc.get('name'),
json.dumps(doc.get('metadata', {})), # JSONB in PG
doc.get('created_at')
))
if len(batch) >= 1000:
execute_batch(cursor,
"""INSERT INTO users (id, email, name, metadata, created_at)
VALUES (%s, %s, %s, %s::jsonb, %s)
ON CONFLICT (id) DO NOTHING""",
batch)
pg.commit()
batch = []
# Last batch
if batch:
execute_batch(cursor, query, batch)
pg.commit()
Normalizing Nested Documents
MongoDB document:
{
"_id": "user_123",
"email": "[email protected]",
"addresses": [
{ "type": "home", "city": "Moscow", "zip": "101000" },
{ "type": "work", "city": "St. Petersburg", "zip": "190000" }
]
}
PostgreSQL schema:
CREATE TABLE users (
id TEXT PRIMARY KEY,
email TEXT UNIQUE NOT NULL
);
CREATE TABLE user_addresses (
id BIGSERIAL PRIMARY KEY,
user_id TEXT REFERENCES users(id),
type TEXT,
city TEXT,
zip TEXT
);
Transformation during migration:
for doc in source.find():
cursor.execute("INSERT INTO users (id, email) VALUES (%s, %s)",
(str(doc['_id']), doc['email']))
for addr in doc.get('addresses', []):
cursor.execute(
"INSERT INTO user_addresses (user_id, type, city, zip) VALUES (%s, %s, %s, %s)",
(str(doc['_id']), addr.get('type'), addr.get('city'), addr.get('zip'))
)
PostgreSQL → MySQL
No direct path via mysqldump with transformation exists. Use:
- Export to CSV:
COPY table TO '/tmp/table.csv' CSV HEADER - Create schema manually accounting for MySQL types
- Import:
LOAD DATA INFILE '/tmp/table.csv' INTO TABLE table FIELDS TERMINATED BY ',' ENCLOSED BY '"'
Or via ETL tool Airbyte / dbt / Apache Nifi.
Zero-Downtime Strategy for DB Type Change
- Launch new DB in parallel with old
- Write dual-write layer in application: every write goes to both DBs
- Run background sync process for historical data
- After alignment — switch reads to new DB
- One week after stable operation — disable dual-write and old DB
class DualWriteRepository:
def __init__(self, primary, secondary):
self.primary = primary
self.secondary = secondary
def create_user(self, data):
result = self.primary.create_user(data)
try:
self.secondary.create_user(data)
except Exception as e:
logger.error(f"Secondary write failed: {e}")
# Don't interrupt request — put in queue for retry
queue.put(('create_user', data))
return result
Post-Migration Verification
-- Compare record count
SELECT 'users' as table_name, COUNT(*) FROM users
UNION ALL
SELECT 'orders', COUNT(*) FROM orders
UNION ALL
SELECT 'products', COUNT(*) FROM products;
-- Compare checksums (PostgreSQL)
SELECT md5(array_agg(md5(id::text || email))::text)
FROM (SELECT id, email FROM users ORDER BY id) t;
Execution Time
MySQL → PostgreSQL for database up to 50GB — 3–5 working days. MongoDB → PostgreSQL with schema normalization — 1–2 weeks.







