What is need of a custom converter in debezium
Debezium is a powerful CDC (Change Data Capture) tool that streams database changes as events. While it provides default converters for common data types, there are several scenarios where custom converters become necessary:
Key Reasons for Custom Converters
- Handling Specialised Data Types
- Databases often have proprietary or complex data types that Debezium's default converters can't handle
- Examples: Spatial/GIS data, custom enumerations, or domain-specific types , Date and Time type , Large Number types.
2. Data Transformation Requirements
- When you need to transform data during capture (e.g., masking sensitive fields)
- Converting between different formats (e.g., binary to hex string)
How Develop a Debezium custom converter ?
- Implement the
io.debezium.spi.converter.CustomConverterclass provided by debezium
@Slf4j
public class MySqlDateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {2. overrride the Implementation of the converterFor method .
@Override
public void converterFor(RelationalColumn relationalColumn, ConverterRegistration<SchemaBuilder> converterRegistration) {3. Package your converter into a jar file . (refer the git repository : debezium-converter-extension
Host your jar file on a nexus server
- Make sure that your Openshift cluster has network connectivity with the nexus sever .
- Upload your jar with appropriate details to Nexus.

Deploy a mysql server on openshift
- Create the mysql server using the follwoing yamls
apiVersion: apps/v1
kind: Deployment
metadata:
name: mysql
spec:
selector:
matchLabels:
app: mysql
strategy:
type: Recreate
template:
metadata:
labels:
app: mysql
spec:
containers:
- image: quay.io/debezium/example-mysql:2.4
name: mysql
env:
- name: MYSQL_ROOT_PASSWORD
value: debezium
- name: MYSQL_USER
value: mysqluser
- name: MYSQL_PASSWORD
value: mysqlpw
ports:
- containerPort: 3306
name: mysql
---
apiVersion: v1
kind: Service
metadata:
name: mysql
spec:
ports:
- port: 3306
selector:
app: mysql
clusterIP: None2. Use the following commands to add a new table
CREATE TABLE accounts (
account_id VARCHAR(20) PRIMARY KEY,
customer_id INT NOT NULL,
account_type ENUM('Checking', 'Savings', 'Loan', 'Credit') NOT NULL,
balance DECIMAL(15,2) NOT NULL DEFAULT 0.00,
interest_rate DECIMAL(5,2),
opened_date DATE NOT NULL,
last_activity_date DATETIME,
status ENUM('Active', 'Dormant', 'Closed') DEFAULT 'Active'
);3. Add some Data to the new table
INSERT INTO accounts VALUES
('CHK10002345', 1001, 'Checking', 4520.75, NULL, '2020-05-15', '2023-08-10 14:30:22', 'Active'),
('SAV10005678', 1001, 'Savings', 12500.00, 1.25, '2020-05-15', '2023-08-01 09:15:43', 'Active'),
('CHK10006789', 1002, 'Checking', 780.50, NULL, '2021-02-20', '2023-08-12 16:22:18', 'Active'),
('LOAN10003456', 1003, 'Loan', -25000.00, 3.75, '2022-07-10', '2023-08-05 11:05:37', 'Active'),
('SAV10007890', 1004, 'Savings', 3400.25, 1.25, '2023-01-05', '2023-08-11 13:45:09', 'Active');Create a Kafka Cluster :
We will use Strimzi to deploy a Kafka cluster in Kraft mode .
- Create namespace
oc create project debezium-demo2. Create the Kafka node pools using Strmzi operator operator
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
labels:
strimzi.io/cluster: my-cluster
name: broker
namespace: debezium-demo
spec:
replicas: 3
roles:
- broker
storage:
type: jbod
volumes:
- deleteClaim: false
id: 0
kraftMetadata: shared
size: 100Gi
type: persistent-claim
- deleteClaim: false
id: 1
size: 100Gi
type: persistent-claim
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
labels:
strimzi.io/cluster: my-cluster
name: controller
namespace: debezium-demo
spec:
replicas: 3
roles:
- controller
storage:
type: jbod
volumes:
- deleteClaim: false
id: 0
kraftMetadata: shared
size: 100Gi
type: persistent-claim3. Create a Kafka Cluster using Strimzi
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
annotations:
strimzi.io/kraft: enabled
strimzi.io/node-pools: enabled
name: my-cluster
namespace: debezium-demo
spec:
entityOperator:
topicOperator: {}
userOperator: {}
kafka:
config:
default.replication.factor: 3
min.insync.replicas: 2
offsets.topic.replication.factor: 3
transaction.state.log.min.isr: 2
transaction.state.log.replication.factor: 3
listeners:
- name: plain
port: 9092
tls: false
type: internal
- name: tls
port: 9093
tls: true
type: internal
metadataVersion: 3.9-IV0
version: 3.9.0Verify the pods running in the namespace
bash-5.1 ~ $
bash-5.1 ~ $ oc get pods -n debezium-demo | grep my-cluster
my-cluster-broker-0 1/1 Running 1 21h
my-cluster-broker-1 1/1 Running 1 21h
my-cluster-broker-2 1/1 Running 1 21h
my-cluster-controller-3 1/1 Running 1 21h
my-cluster-controller-4 1/1 Running 1 21h
my-cluster-controller-5 1/1 Running 1 21h
my-cluster-entity-operator-6474c76d6f-m66fp 2/2 Running 7 (4h ago) 21h
bash-5.1 ~ $Create Kafka Connect and Debezium Connector
- Create a Kafka connect resource , and use the nexus url for the debezium-extension jar .
- The jar should be downloaded form the nexus repository to the /plugins directory inside the kafka-connect container .
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: debezium-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 4.0.0
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9092
config:
config.providers: secrets
config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
# -1 means it will use the default replication factor configured in the broker
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
build:
output:
type: docker
image: quay.io/abvishno/debezium-connect-mysql:latest
pushSecret: my-quay-secret
plugins:
- name: debezium-mysql-connector
artifacts:
- type: tgz
url: 'https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.4.0.Final/debezium-connector-mysql-2.4.0.Final-plugin.tar.gz'
- type: jar
url: 'https://nexus-nexus.apps.cluster-ws4fg.ws4fg.sandbox2838.opentlc.com/repository/maven-releases/com/av/debezium/debezium-extension/0.0.12/debezium-extension-0.0.12.jar'3. Create a basic Kafka Debezium Mysql Connector
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
labels:
strimzi.io/cluster: debezium-connect-cluster
name: debezium-connector-mysql
namespace: debezium-demo
spec:
class: io.debezium.connector.mysql.MySqlConnector
config:
tasks.max: 1
database.hostname: mysql
database.server.id: 184054
database.port: 3306
database.include.list: inventory
schema.history.internal.kafka.topic: schema-changes.inventory
database.user: '${secrets:debezium-demo/mysql-credentials:username}'
database.password: '${secrets:debezium-demo/mysql-credentials:password}'
schema.history.internal.kafka.bootstrap.servers: 'my-cluster-kafka-bootstrap:9092'
topic.prefix: mysql
tasksMax: 13. After you create DBZ connector , look for data in the kafka topics

4. Add your Converter details to the Kafka connect
converters: 'datetime,decimal'
datetime.type: com.av.debezium.converter.MySqlDateTimeConverter
datetime.format.time: 'HH:mm:ss'
datetime.format.date: 'yyyy-MM-dd'
datetime.format.timestamp: 'yyyy-MM-dd HH:mm:ss'
datetime.format.timestamp.zone: UTC+8
decimal.type: com.av.debezium.converter.MySqlDecimalConverter
decimal.databaseType: mysql
datetime.format.datetime: 'yyyy-MM-dd HH:mm:ss'5. The up[dated connector yaml should like the following :
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
labels:
strimzi.io/cluster: debezium-connect-cluster
name: debezium-connector-mysql
namespace: debezium-demo
spec:
class: io.debezium.connector.mysql.MySqlConnector
config:
tasks.max: 1
database.hostname: mysql
database.server.id: 184054
database.port: 3306
database.include.list: inventory
schema.history.internal.kafka.topic: schema-changes.inventory
database.user: '${secrets:debezium-demo/mysql-credentials:username}'
database.password: '${secrets:debezium-demo/mysql-credentials:password}'
schema.history.internal.kafka.bootstrap.servers: 'my-cluster-kafka-bootstrap:9092'
topic.prefix: mysql
converters: 'datetime,decimal'
datetime.type: com.av.debezium.converter.MySqlDateTimeConverter
datetime.format.time: 'HH:mm:ss'
datetime.format.date: 'yyyy-MM-dd'
datetime.format.timestamp: 'yyyy-MM-dd HH:mm:ss'
datetime.format.timestamp.zone: UTC+8
decimal.type: com.av.debezium.converter.MySqlDecimalConverter
decimal.databaseType: mysql
datetime.format.datetime: 'yyyy-MM-dd HH:mm:ss'
tasksMax: 16. Now add some more data to the accounts table :
INSERT INTO accounts VALUES ('CHK10009244', 1008, 'Checking', 1501.00, NULL, '2023-08-15', CURRENT_TIMESTAMP(), 'Active');
INSERT INTO accounts VALUES ('CHK10009245', 1009, 'Checking', 1501.00, NULL, '2023-08-15', CURRENT_TIMESTAMP(), 'Active');
INSERT INTO accounts VALUES ('CHK10009247', 1020, 'Checking', 1501.00, NULL, '2023-08-15', CURRENT_TIMESTAMP(), 'Active');7. You should see some more data in the kafka topics , now compare the messages in the kafka topics .
7.1. payload before the converter was deployed .
"payload": {
"before": null,
"after": {
"account_id": "SAV10005678",
"customer_id": 1001,
"account_type": "Savings",
"balance": "ExLQ",
"interest_rate": "fQ==",
"opened_date": 18397,
"last_activity_date": 1690881343000,
"status": "Active"
},7.2. payload after the converter was deployed .
"payload": {
"before": null,
"after": {
"account_id": "CHK10009245",
"customer_id": 1009,
"account_type": "Checking",
"balance": {
"type": "java.lang.Number",
"value": "1501.00"
},
"interest_rate": null,
"opened_date": "2023-08-15",
"last_activity_date": {
"date": "2025-07-01 04:50:05",
"type": "java.time.LocalDateTime"
},
"status": "Active"
},you can see that the fields balance [Number] and last_activity_date [DateTime] and opened_date [Date] are now transformed to more readable formats as we have implmented inthe Converter