Kafka Connect setup for database integration

Our company is engaged in the development, support and maintenance of sites of any complexity. From simple one-page sites to large-scale cluster systems built on micro services. Experience of developers is confirmed by certificates from vendors.
Development and maintenance of all types of websites:
Informational websites or web applications
Business card websites, landing pages, corporate websites, online catalogs, quizzes, promo websites, blogs, news resources, informational portals, forums, aggregators
E-commerce websites or web applications
Online stores, B2B portals, marketplaces, online exchanges, cashback websites, exchanges, dropshipping platforms, product parsers
Business process management web applications
CRM systems, ERP systems, corporate portals, production management systems, information parsers
Electronic service websites or web applications
Classified ads platforms, online schools, online cinemas, website builders, portals for electronic services, video hosting platforms, thematic portals

These are just some of the technical types of websites we work with, and each of them can have its own specific features and functionality, as well as be customized to meet the specific needs and goals of the client.

Showing 1 of 1 servicesAll 2065 services
Kafka Connect setup for database integration
Complex
~3-5 business days
FAQ
Our competencies:
Development stages
Latest works
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    822
  • image_crm_chasseurs_493_0.webp
    CRM development for Chasseurs
    847
  • image_website-sbh_0.png
    Website development for SBH Partners
    999
  • image_website-_0.png
    Website development for Red Pear
    451

Setting up Kafka Connect for database integration

Kafka Connect is a framework for streaming data replication between Kafka and external systems without writing custom code. Two types of connectors: Source (data goes into Kafka) and Sink (data goes from Kafka to storage).

Practical use case: CDC (Change Data Capture) from PostgreSQL via Debezium → Kafka → ElasticSearch for search index. When a database row changes, search updates in seconds without queries to PostgreSQL.

Installing Kafka Connect

Kafka Connect is included in the Kafka distribution but requires separate execution in distributed mode:

# Distributed mode configuration
# /opt/kafka/config/connect-distributed.properties

bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092

# Internal topics for storing connector configuration
group.id=kafka-connect-cluster
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses

config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
offset.flush.interval.ms=10000

# REST API for management
rest.host.name=0.0.0.0
rest.port=8083
rest.advertised.host.name=connect-1.internal
rest.advertised.port=8083

# Plugins (downloaded connectors)
plugin.path=/opt/kafka/plugins

# Converters — Avro with Schema Registry
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081

# For simple cases without Schema Registry:
# key.converter=org.apache.kafka.connect.storage.StringConverter
# value.converter=org.apache.kafka.connect.json.JsonConverter
# value.converter.schemas.enable=true

Systemd unit:

/opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties

Debezium PostgreSQL Source Connector

Debezium intercepts PostgreSQL WAL (Write-Ahead Log) and translates each INSERT/UPDATE/DELETE into a Kafka event.

PostgreSQL pre-configuration:

-- postgresql.conf
ALTER SYSTEM SET wal_level = logical;
ALTER SYSTEM SET max_replication_slots = 10;
ALTER SYSTEM SET max_wal_senders = 10;

-- Create replication user
CREATE USER debezium WITH REPLICATION LOGIN PASSWORD 'secure_password';
GRANT CONNECT ON DATABASE myapp TO debezium;
GRANT USAGE ON SCHEMA public TO debezium;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium;

-- Create publication for needed tables
CREATE PUBLICATION debezium_pub FOR TABLE products, orders, users, categories;

Connector configuration via REST API:

curl -X POST http://connect-1:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "postgres-source-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "postgres.internal",
        "database.port": "5432",
        "database.user": "debezium",
        "database.password": "secure_password",
        "database.dbname": "myapp",
        "database.server.name": "myapp-pg",
        "topic.prefix": "myapp",
        "table.include.list": "public.products,public.orders,public.users",
        "plugin.name": "pgoutput",
        "publication.name": "debezium_pub",
        "slot.name": "debezium_slot",
        "snapshot.mode": "initial",
        "snapshot.isolation.mode": "read_committed",
        "decimal.handling.mode": "double",
        "time.precision.mode": "connect",
        "tombstones.on.delete": "true",
        "heartbeat.interval.ms": "10000",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.delete.handling.mode": "rewrite",
        "transforms.unwrap.add.fields": "op,ts_ms,source.ts_ms",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false"
    }
}'

Topics are created automatically: myapp.public.products, myapp.public.orders.

JDBC Sink Connector — from Kafka to PostgreSQL

Opposite case: events from Kafka write to PostgreSQL (analytics DB, Data Warehouse).

# Download JDBC Connector
wget https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/10.7.4/kafka-connect-jdbc-10.7.4.jar \
    -O /opt/kafka/plugins/kafka-connect-jdbc.jar

curl -X POST http://connect-1:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "postgres-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "4",
        "topics": "myapp.analytics.events",
        "connection.url": "jdbc:postgresql://analytics-pg:5432/analytics",
        "connection.user": "kafka_writer",
        "connection.password": "secure_password",
        "auto.create": "false",
        "auto.evolve": "false",
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "pk.fields": "id",
        "table.name.format": "analytics.${topic}",
        "batch.size": "1000",
        "db.timezone": "UTC",
        "transforms": "dropPrefix",
        "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
        "transforms.dropPrefix.exclude": "__deleted,__op,__ts_ms"
    }
}'

Elasticsearch Sink Connector

Synchronize product catalog from Kafka to Elasticsearch:

curl -X POST http://connect-1:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "elasticsearch-sink",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "tasks.max": "4",
        "topics": "myapp.public.products",
        "connection.url": "https://es-node-1:9200,https://es-node-2:9200",
        "connection.username": "elastic",
        "connection.password": "elastic_pass",
        "type.name": "_doc",
        "key.ignore": "false",
        "schema.ignore": "true",
        "behavior.on.null.values": "delete",
        "batch.size": "500",
        "flush.timeout.ms": "10000",
        "max.retries": "5",
        "retry.backoff.ms": "100",
        "linger.ms": "1000",
        "transforms": "extractKey",
        "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.extractKey.field": "id"
    }
}'

Management and monitoring

# List of connectors
curl http://connect-1:8083/connectors | jq .

# Connector status
curl http://connect-1:8083/connectors/postgres-source-connector/status | jq .

# Restart failed task
curl -X POST http://connect-1:8083/connectors/postgres-source-connector/tasks/0/restart

# Pause/resume
curl -X PUT http://connect-1:8083/connectors/postgres-source-connector/pause
curl -X PUT http://connect-1:8083/connectors/postgres-source-connector/resume

# Update configuration
curl -X PUT http://connect-1:8083/connectors/postgres-source-connector/config \
  -H "Content-Type: application/json" \
  -d '{"heartbeat.interval.ms": "5000", ...}'

Prometheus JMX metrics via JMX Exporter:

KAFKA_OPTS="-javaagent:/opt/jmx-exporter.jar=9404:/opt/kafka/config/jmx-connect.yml" \
    /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties

Typical problems

WAL bloat — Debezium cannot keep up with PostgreSQL: if replication slot doesn't advance, WAL accumulates. Configure max_slot_wal_keep_size in PostgreSQL and alert on WAL size.

Schema evolution — when adding new column to PostgreSQL, Debezium automatically updates schema in Schema Registry. Sink connector must be ready for new fields (auto.evolve=true or manual management).

Tombstone messages — on DELETE, Debezium sends two messages: DELETE event and tombstone (null value). For compact topics, tombstone removes entry from log.

Timeline

Day 1 — PostgreSQL setup for logical replication, Kafka Connect installation in distributed mode on 2–3 nodes.

Day 2 — Debezium installation, initial snapshot (can take hours for large tables), connector configuration, CDC event verification.

Day 3 — Sink connector setup (ES or PostgreSQL), transformations via SMT (Single Message Transform), full pipeline testing INSERT/UPDATE/DELETE.

Day 4 — monitoring, alerts on lag and errors, topic schema documentation, load testing with peak change flow.