Setting up Kafka Connect for database integration
Kafka Connect is a framework for streaming data replication between Kafka and external systems without writing custom code. Two types of connectors: Source (data goes into Kafka) and Sink (data goes from Kafka to storage).
Practical use case: CDC (Change Data Capture) from PostgreSQL via Debezium → Kafka → ElasticSearch for search index. When a database row changes, search updates in seconds without queries to PostgreSQL.
Installing Kafka Connect
Kafka Connect is included in the Kafka distribution but requires separate execution in distributed mode:
# Distributed mode configuration
# /opt/kafka/config/connect-distributed.properties
bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092
# Internal topics for storing connector configuration
group.id=kafka-connect-cluster
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
offset.flush.interval.ms=10000
# REST API for management
rest.host.name=0.0.0.0
rest.port=8083
rest.advertised.host.name=connect-1.internal
rest.advertised.port=8083
# Plugins (downloaded connectors)
plugin.path=/opt/kafka/plugins
# Converters — Avro with Schema Registry
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
# For simple cases without Schema Registry:
# key.converter=org.apache.kafka.connect.storage.StringConverter
# value.converter=org.apache.kafka.connect.json.JsonConverter
# value.converter.schemas.enable=true
Systemd unit:
/opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
Debezium PostgreSQL Source Connector
Debezium intercepts PostgreSQL WAL (Write-Ahead Log) and translates each INSERT/UPDATE/DELETE into a Kafka event.
PostgreSQL pre-configuration:
-- postgresql.conf
ALTER SYSTEM SET wal_level = logical;
ALTER SYSTEM SET max_replication_slots = 10;
ALTER SYSTEM SET max_wal_senders = 10;
-- Create replication user
CREATE USER debezium WITH REPLICATION LOGIN PASSWORD 'secure_password';
GRANT CONNECT ON DATABASE myapp TO debezium;
GRANT USAGE ON SCHEMA public TO debezium;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium;
-- Create publication for needed tables
CREATE PUBLICATION debezium_pub FOR TABLE products, orders, users, categories;
Connector configuration via REST API:
curl -X POST http://connect-1:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "postgres-source-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres.internal",
"database.port": "5432",
"database.user": "debezium",
"database.password": "secure_password",
"database.dbname": "myapp",
"database.server.name": "myapp-pg",
"topic.prefix": "myapp",
"table.include.list": "public.products,public.orders,public.users",
"plugin.name": "pgoutput",
"publication.name": "debezium_pub",
"slot.name": "debezium_slot",
"snapshot.mode": "initial",
"snapshot.isolation.mode": "read_committed",
"decimal.handling.mode": "double",
"time.precision.mode": "connect",
"tombstones.on.delete": "true",
"heartbeat.interval.ms": "10000",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,ts_ms,source.ts_ms",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}'
Topics are created automatically: myapp.public.products, myapp.public.orders.
JDBC Sink Connector — from Kafka to PostgreSQL
Opposite case: events from Kafka write to PostgreSQL (analytics DB, Data Warehouse).
# Download JDBC Connector
wget https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/10.7.4/kafka-connect-jdbc-10.7.4.jar \
-O /opt/kafka/plugins/kafka-connect-jdbc.jar
curl -X POST http://connect-1:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "postgres-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "4",
"topics": "myapp.analytics.events",
"connection.url": "jdbc:postgresql://analytics-pg:5432/analytics",
"connection.user": "kafka_writer",
"connection.password": "secure_password",
"auto.create": "false",
"auto.evolve": "false",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": "id",
"table.name.format": "analytics.${topic}",
"batch.size": "1000",
"db.timezone": "UTC",
"transforms": "dropPrefix",
"transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.dropPrefix.exclude": "__deleted,__op,__ts_ms"
}
}'
Elasticsearch Sink Connector
Synchronize product catalog from Kafka to Elasticsearch:
curl -X POST http://connect-1:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "4",
"topics": "myapp.public.products",
"connection.url": "https://es-node-1:9200,https://es-node-2:9200",
"connection.username": "elastic",
"connection.password": "elastic_pass",
"type.name": "_doc",
"key.ignore": "false",
"schema.ignore": "true",
"behavior.on.null.values": "delete",
"batch.size": "500",
"flush.timeout.ms": "10000",
"max.retries": "5",
"retry.backoff.ms": "100",
"linger.ms": "1000",
"transforms": "extractKey",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field": "id"
}
}'
Management and monitoring
# List of connectors
curl http://connect-1:8083/connectors | jq .
# Connector status
curl http://connect-1:8083/connectors/postgres-source-connector/status | jq .
# Restart failed task
curl -X POST http://connect-1:8083/connectors/postgres-source-connector/tasks/0/restart
# Pause/resume
curl -X PUT http://connect-1:8083/connectors/postgres-source-connector/pause
curl -X PUT http://connect-1:8083/connectors/postgres-source-connector/resume
# Update configuration
curl -X PUT http://connect-1:8083/connectors/postgres-source-connector/config \
-H "Content-Type: application/json" \
-d '{"heartbeat.interval.ms": "5000", ...}'
Prometheus JMX metrics via JMX Exporter:
KAFKA_OPTS="-javaagent:/opt/jmx-exporter.jar=9404:/opt/kafka/config/jmx-connect.yml" \
/opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
Typical problems
WAL bloat — Debezium cannot keep up with PostgreSQL: if replication slot doesn't advance, WAL accumulates. Configure max_slot_wal_keep_size in PostgreSQL and alert on WAL size.
Schema evolution — when adding new column to PostgreSQL, Debezium automatically updates schema in Schema Registry. Sink connector must be ready for new fields (auto.evolve=true or manual management).
Tombstone messages — on DELETE, Debezium sends two messages: DELETE event and tombstone (null value). For compact topics, tombstone removes entry from log.
Timeline
Day 1 — PostgreSQL setup for logical replication, Kafka Connect installation in distributed mode on 2–3 nodes.
Day 2 — Debezium installation, initial snapshot (can take hours for large tables), connector configuration, CDC event verification.
Day 3 — Sink connector setup (ES or PostgreSQL), transformations via SMT (Single Message Transform), full pipeline testing INSERT/UPDATE/DELETE.
Day 4 — monitoring, alerts on lag and errors, topic schema documentation, load testing with peak change flow.







