A few days ago, we were approached by a client with a particular objective but a difficult barrier. Their goal? Real-Time Data Replication: PostgreSQL 16 → Kafka → SQL Server 2022 Express. They needed their data to be transferred effortlessly from PostgreSQL 16 to SQL Server 2022 Express in real time.
Their expanding operations demanded improved system collaboration, and delays caused by outdated data workflows could no longer be tolerated. This challenge set the stage for an innovative and efficient solution.
System Configuration
Linux VM (CentOS 8.5)
- PostgreSQL 16 (Source Database)
- Kafka 3.9.0 (Message Broker)
- Debezium Connector for PostgreSQL 3.0.6
- Kafka Connect (Handles Source and Sink Connectors)
- Kafka JDBC Sink Connector (For SQL Server Integration)
Windows Machine
- SQL Server 2022 Express (Target Database)
Steps to Configure Real-Time Replication
Install and Configure PostgreSQL 16
Enable Logical Replication in PostgreSQL
Edit postgresql.conf:
1 | sudo vi /var/lib/pgsql/16/data/postgresql.conf |
Ensure these settings:
1 2 3 | wal_level = logical max_replication_slots = 10 max_wal_senders = 10 |
Restart PostgreSQL:
1 | sudo systemctl restart postgresql-16 |
Created a logical replication slot and a publication for the required tables.
Create Replication Slot & Publication
Create a logical replication slot to read the transaction logs, create publication with tables need to be replicated.
1 2 | SELECT pg_create_logical_replication_slot('debezium_slot', 'pgoutput'); CREATE PUBLICATION dbz_publication FOR TABLE public.employee; |
Install and Configure Kafka 3.9.0
Download & Extract Kafka
1 2 3 | wget https://downloads.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz tar -xvzf kafka_2.13-3.9.0.tgz cd kafka_2.13-3.9.0 |
Start Zookeeper & Kafka Broker
Started both Zookeeper and the Kafka broker to handle the data pipeline.
1 2 | bin/zookeeper-server-start.sh config/zookeeper.properties & bin/kafka-server-start.sh config/server.properties & |
Install Debezium PostgreSQL Connector
Download and Place in Plugins Directory
1 2 3 4 | mkdir -p kafka_2.13-3.9.0/plugins/debezium-postgres cd kafka_2.13-3.9.0/plugins/debezium-postgres wget https://repo1.maven.org/maven2/io/debezium/debezium-connector- postgres/3.0.6.Final/debezium-connector-postgres-3.0.6.Final.jar |
Register Debezium PostgreSQL Source Connector
Registered the Debezium PostgreSQL source connector, linking PostgreSQL to Kafka.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | Create register-postgres.json: { "name": "postgres-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "localhost", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname": "postgres", "database.server.name": "postgres", "plugin.name": "pgoutput", "slot.name": "debezium_slot", "publication.name": "dbz_publication", "table.include.list": "public.employee", "topic.prefix": "postgres", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "schema-changes.postgres" } } |
Register the connector:
1 2 | curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" \ http://localhost:8083/connectors/ -d @register-postgres.json |
Setup SQL Server 2022 Express
Enable SQL Server Authentication
1. Open SQL Server Management Studio (SSMS).
2. Right-click Server Name → Properties → Security.
3. Change Server Authentication to SQL Server and Windows Authentication mode.
4. Restart SQL Server.
Enable sa Login
Enabled the sa login to provide full access
1 2 | ALTER LOGIN sa ENABLE; ALTER LOGIN sa WITH PASSWORD = 'YourNewStrongPassword' |
Downloaded and installed the JDBC connector.
Install and Configure Kafka JDBC Sink Connector for SQL Server
Download JDBC Connector
1 2 3 4 | mkdir -p kafka_2.13-3.9.0/plugins/jdbc-sink cd kafka_2.13-3.9.0/plugins/jdbc-sink wget https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/10.8.0/kafka-connect- jdbc-10.8.0.jar |
Register Kafka JDBC Sink Connector
Create a table called staff in the destination SQL server database and give “table.name.format” for
the same. This will replicate the data from employee table in PostgreSQL to staff in SQL server.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | Create register-mssql-sink.json: { "name": "mssql-sink-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "postgres.public.employee", "connection.url": "jdbc:sqlserver://192.168.0.102:1433;databaseName=kafka_db;encrypt=false", "connection.user": "sa", "connection.password": "YourNewStrongPassword", "auto.create": "true", "auto.evolve": "true", "insert.mode": "upsert", "pk.fields": "id", "pk.mode": "record_value", "table.name.format": "staff", "dialect.name": "SqlServerDatabaseDialect", "batch.size": "1000", "delete.enabled": "true", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "transforms.unwrap.delete.handling.mode": "drop" } } |
Register the connector:
Registered the Kafka JDBC sink connector, mapping the data flow to SQL Server.
1 2 | curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" \ http://localhost:8083/connectors/ -d @register-mssql-sink.json |
Start Zookeeper, Kafka Broker, and Kafka Connect:
Launched Zookeeper, the Kafka Broker, and Kafka Connect, ensuring all components worked in harmony.
1 2 3 | bin/zookeeper-server-start.sh config/zookeeper.properties & bin/kafka-server-start.sh config/server.properties & bin/connect-distributed.sh config/connect-distributed.properties & |
What we achieved was nothing short of extraordinary: a real-time replication system that seamlessly integrated PostgreSQL and SQL Server. The once-tedious manual operations were automated and streamlined. By connecting these technologies, we provided a solution that not only met, but fulfilled, the client’s expectations.