What is need of a custom converter in debezium

Debezium is a powerful CDC (Change Data Capture) tool that streams database changes as events. While it provides default converters for common data types, there are several scenarios where custom converters become necessary:

Key Reasons for Custom Converters

  1. Handling Specialised Data Types
  • Databases often have proprietary or complex data types that Debezium's default converters can't handle
  • Examples: Spatial/GIS data, custom enumerations, or domain-specific types , Date and Time type , Large Number types.

2. Data Transformation Requirements

  • When you need to transform data during capture (e.g., masking sensitive fields)
  • Converting between different formats (e.g., binary to hex string)

How Develop a Debezium custom converter ?

  1. Implement the io.debezium.spi.converter.CustomConverter class provided by debezium
@Slf4j
public class MySqlDateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {

2. overrride the Implementation of the converterFor method .

@Override
    public void converterFor(RelationalColumn relationalColumn, ConverterRegistration<SchemaBuilder> converterRegistration) {

3. Package your converter into a jar file . (refer the git repository : debezium-converter-extension

Host your jar file on a nexus server

  1. Make sure that your Openshift cluster has network connectivity with the nexus sever .
  2. Upload your jar with appropriate details to Nexus.
None

Deploy a mysql server on openshift

  1. Create the mysql server using the follwoing yamls
apiVersion: apps/v1 
kind: Deployment 
metadata: 
 name: mysql 
spec: 
 selector: 
   matchLabels: 
     app: mysql 
 strategy: 
   type: Recreate 
 template: 
   metadata: 
     labels: 
       app: mysql 
   spec: 
     containers: 
       - image: quay.io/debezium/example-mysql:2.4 
         name: mysql 
         env: 
           - name: MYSQL_ROOT_PASSWORD 
             value: debezium 
           - name: MYSQL_USER 
             value: mysqluser 
           - name: MYSQL_PASSWORD 
             value: mysqlpw 
         ports: 
           - containerPort: 3306 
             name: mysql
             
---
apiVersion: v1 
kind: Service 
metadata: 
 name: mysql 
spec: 
 ports: 
   - port: 3306 
 selector: 
   app: mysql 
 clusterIP: None

2. Use the following commands to add a new table

CREATE TABLE accounts (
    account_id VARCHAR(20) PRIMARY KEY,
    customer_id INT NOT NULL,
    account_type ENUM('Checking', 'Savings', 'Loan', 'Credit') NOT NULL,
    balance DECIMAL(15,2) NOT NULL DEFAULT 0.00,
    interest_rate DECIMAL(5,2),
    opened_date DATE NOT NULL,
    last_activity_date DATETIME,
    status ENUM('Active', 'Dormant', 'Closed') DEFAULT 'Active'
);

3. Add some Data to the new table

INSERT INTO accounts VALUES
    ('CHK10002345', 1001, 'Checking', 4520.75, NULL, '2020-05-15', '2023-08-10 14:30:22', 'Active'),
    ('SAV10005678', 1001, 'Savings', 12500.00, 1.25, '2020-05-15', '2023-08-01 09:15:43', 'Active'),
    ('CHK10006789', 1002, 'Checking', 780.50, NULL, '2021-02-20', '2023-08-12 16:22:18', 'Active'),
    ('LOAN10003456', 1003, 'Loan', -25000.00, 3.75, '2022-07-10', '2023-08-05 11:05:37', 'Active'),
    ('SAV10007890', 1004, 'Savings', 3400.25, 1.25, '2023-01-05', '2023-08-11 13:45:09', 'Active');

Create a Kafka Cluster :

We will use Strimzi to deploy a Kafka cluster in Kraft mode .

  1. Create namespace
oc create project debezium-demo

2. Create the Kafka node pools using Strmzi operator operator

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  labels:
    strimzi.io/cluster: my-cluster
  name: broker
  namespace: debezium-demo
spec:
  replicas: 3
  roles:
    - broker
  storage:
    type: jbod
    volumes:
      - deleteClaim: false
        id: 0
        kraftMetadata: shared
        size: 100Gi
        type: persistent-claim
      - deleteClaim: false
        id: 1
        size: 100Gi
        type: persistent-claim
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  labels:
    strimzi.io/cluster: my-cluster
  name: controller
  namespace: debezium-demo
spec:
  replicas: 3
  roles:
    - controller
  storage:
    type: jbod
    volumes:
      - deleteClaim: false
        id: 0
        kraftMetadata: shared
        size: 100Gi
        type: persistent-claim

3. Create a Kafka Cluster using Strimzi

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  annotations:
    strimzi.io/kraft: enabled
    strimzi.io/node-pools: enabled
  name: my-cluster
  namespace: debezium-demo
spec:
  entityOperator:
    topicOperator: {}
    userOperator: {}
  kafka:
    config:
      default.replication.factor: 3
      min.insync.replicas: 2
      offsets.topic.replication.factor: 3
      transaction.state.log.min.isr: 2
      transaction.state.log.replication.factor: 3
    listeners:
      - name: plain
        port: 9092
        tls: false
        type: internal
      - name: tls
        port: 9093
        tls: true
        type: internal
    metadataVersion: 3.9-IV0
    version: 3.9.0

Verify the pods running in the namespace

bash-5.1 ~ $ 
bash-5.1 ~ $ oc get pods -n debezium-demo | grep my-cluster
my-cluster-broker-0                                      1/1     Running     1              21h
my-cluster-broker-1                                      1/1     Running     1              21h
my-cluster-broker-2                                      1/1     Running     1              21h
my-cluster-controller-3                                  1/1     Running     1              21h
my-cluster-controller-4                                  1/1     Running     1              21h
my-cluster-controller-5                                  1/1     Running     1              21h
my-cluster-entity-operator-6474c76d6f-m66fp              2/2     Running     7 (4h ago)     21h
bash-5.1 ~ $

Create Kafka Connect and Debezium Connector

  1. Create a Kafka connect resource , and use the nexus url for the debezium-extension jar .
  2. The jar should be downloaded form the nexus repository to the /plugins directory inside the kafka-connect container .
apiVersion: kafka.strimzi.io/v1beta2 
kind: KafkaConnect 
metadata: 
 name: debezium-connect-cluster 
 annotations: 
   strimzi.io/use-connector-resources: "true" 
spec: 
 version: 4.0.0 
 replicas: 1 
 bootstrapServers: my-cluster-kafka-bootstrap:9092 
 config: 
   config.providers: secrets 
   config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider 
   group.id: connect-cluster 
   offset.storage.topic: connect-cluster-offsets 
   config.storage.topic: connect-cluster-configs 
   status.storage.topic: connect-cluster-status 
   # -1 means it will use the default replication factor configured in the broker 
   config.storage.replication.factor: -1 
   offset.storage.replication.factor: -1 
   status.storage.replication.factor: -1 
 build: 
   output: 
     type: docker 
     image: quay.io/abvishno/debezium-connect-mysql:latest 
     pushSecret: my-quay-secret 
   plugins: 
     - name: debezium-mysql-connector 
       artifacts: 
         - type: tgz 
           url: 'https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.4.0.Final/debezium-connector-mysql-2.4.0.Final-plugin.tar.gz'
        - type: jar
           url: 'https://nexus-nexus.apps.cluster-ws4fg.ws4fg.sandbox2838.opentlc.com/repository/maven-releases/com/av/debezium/debezium-extension/0.0.12/debezium-extension-0.0.12.jar'

3. Create a basic Kafka Debezium Mysql Connector

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  labels:
    strimzi.io/cluster: debezium-connect-cluster
  name: debezium-connector-mysql
  namespace: debezium-demo
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  config:
    tasks.max: 1
    database.hostname: mysql
    database.server.id: 184054
    database.port: 3306
    database.include.list: inventory
    schema.history.internal.kafka.topic: schema-changes.inventory
    database.user: '${secrets:debezium-demo/mysql-credentials:username}'
    database.password: '${secrets:debezium-demo/mysql-credentials:password}'
    schema.history.internal.kafka.bootstrap.servers: 'my-cluster-kafka-bootstrap:9092'
    topic.prefix: mysql
  tasksMax: 1

3. After you create DBZ connector , look for data in the kafka topics

None

4. Add your Converter details to the Kafka connect

converters: 'datetime,decimal'
    datetime.type: com.av.debezium.converter.MySqlDateTimeConverter
    datetime.format.time: 'HH:mm:ss'
    datetime.format.date: 'yyyy-MM-dd'
    datetime.format.timestamp: 'yyyy-MM-dd HH:mm:ss'
    datetime.format.timestamp.zone: UTC+8
    decimal.type: com.av.debezium.converter.MySqlDecimalConverter
    decimal.databaseType: mysql
    datetime.format.datetime: 'yyyy-MM-dd HH:mm:ss'

5. The up[dated connector yaml should like the following :

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  labels:
    strimzi.io/cluster: debezium-connect-cluster
  name: debezium-connector-mysql
  namespace: debezium-demo
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  config:
    tasks.max: 1
    database.hostname: mysql
    database.server.id: 184054
    database.port: 3306
    database.include.list: inventory
    schema.history.internal.kafka.topic: schema-changes.inventory
    database.user: '${secrets:debezium-demo/mysql-credentials:username}'
    database.password: '${secrets:debezium-demo/mysql-credentials:password}'
    schema.history.internal.kafka.bootstrap.servers: 'my-cluster-kafka-bootstrap:9092'
    topic.prefix: mysql
    converters: 'datetime,decimal'
    datetime.type: com.av.debezium.converter.MySqlDateTimeConverter
    datetime.format.time: 'HH:mm:ss'
    datetime.format.date: 'yyyy-MM-dd'
    datetime.format.timestamp: 'yyyy-MM-dd HH:mm:ss'
    datetime.format.timestamp.zone: UTC+8
    decimal.type: com.av.debezium.converter.MySqlDecimalConverter
    decimal.databaseType: mysql
  datetime.format.datetime: 'yyyy-MM-dd HH:mm:ss'
  tasksMax: 1

6. Now add some more data to the accounts table :

INSERT INTO accounts VALUES ('CHK10009244', 1008, 'Checking', 1501.00, NULL, '2023-08-15', CURRENT_TIMESTAMP(), 'Active');
INSERT INTO accounts VALUES ('CHK10009245', 1009, 'Checking', 1501.00, NULL, '2023-08-15', CURRENT_TIMESTAMP(), 'Active');
INSERT INTO accounts VALUES ('CHK10009247', 1020, 'Checking', 1501.00, NULL, '2023-08-15', CURRENT_TIMESTAMP(), 'Active');

7. You should see some more data in the kafka topics , now compare the messages in the kafka topics .

7.1. payload before the converter was deployed .

"payload": {
  "before": null,
  "after": {
   "account_id": "SAV10005678",
   "customer_id": 1001,
   "account_type": "Savings",
   "balance": "ExLQ",
   "interest_rate": "fQ==",
   "opened_date": 18397,
   "last_activity_date": 1690881343000,
   "status": "Active"
  },

7.2. payload after the converter was deployed .

"payload": {
  "before": null,
  "after": {
   "account_id": "CHK10009245",
   "customer_id": 1009,
   "account_type": "Checking",
   "balance": {
    "type": "java.lang.Number",
    "value": "1501.00"
   },
   "interest_rate": null,
   "opened_date": "2023-08-15",
   "last_activity_date": {
    "date": "2025-07-01 04:50:05",
    "type": "java.time.LocalDateTime"
   },
   "status": "Active"
  },

you can see that the fields balance [Number] and last_activity_date [DateTime] and opened_date [Date] are now transformed to more readable formats as we have implmented inthe Converter