Real-time data processing is a crucial aspect for data-driven organizations. Because relational database engines such as MySQL do not offer event-based interfaces, one way to read and consume the data changes is by leveraging a Change Data Capture (CDC) tool such as Debezium. However, this requires manually setting up and maintaining the infrastructure, such as a Virtual Machine (VM), on which Debezium and Kafka Connect must run.
In this blog post, I’ll leverage Azure HDInsight on Azure Kubernetes Service (Preview) with Apache Flink to perform CDC of a MySQL table on Azure Database for MySQL.
Before starting, be sure to:
mysql -h mysqlserver.mysql.database.azure.com -u myadmin -p --ssl-mode=REQUIRED --ssl-ca=DigiCertGlobalRootCA.crt.pem
mysql> CREATE DATABASE ‘sampledb’;
mysql> CREATE TABLE `shipments` (
`shipment_id` int NOT NULL
`order_id` int NOT NULL,
`origin` varchar(100) NOT NULL,
`destination` varchar(100) NOT NULL,
`is_arrived` tinyint(1) NOT NULL,
PRIMARY KEY (`shipment_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
mysql> INSERT INTO shipments VALUES (1, 1001, 'Memphis', 'Atlanta', false),
(2, 1002, 'Baltimore', 'Minneapolis', false),
(3, 1003, 'Denver', 'Salt Lake City', false),
(4, 1004, 'Seattle', 'Las Vegas', false),
(5, 1005, 'San Francisco', 'Grand Rapids', false),
(6, 1006, 'Phoenix', 'Dallas', false);
$ wget https://dlcdn.apache.org/maven/maven-3/3.9.5/binaries/apache-maven-3.9.5-bin.tar.gz
$ tar xzvf apache-maven-3.9.5-bin.tar.gz
$ export PATH=$PATH:/opt/flink-webssh/apache-maven-3.9.5/bin
$ mvn -vThe output should look similar to the following:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dep.download</groupId>
<artifactId>dep-download</artifactId>
<version>1.0-SNAPSHOT</version>
<!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc -->
<dependencies>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
</dependencies>
</project>
Here we are leveraging the MySQL CDC connector for reading initial snapshot and incremental data from the MySQL table.
$ mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
$ wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
$ /opt/flink-webssh/bin/sql-client.sh -j /opt/flink-webssh/target/flink-sql-connector-mysql-cdc-2.3.0.jar -j /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j /opt/flink-webssh/target/hamcrest-2.1.jar -j /opt/flink-webssh/target/flink-shaded-guava-30.1.1-jre-16.0.jar -j /opt/flink-webssh/target/awaitility-4.0.1.jar -j /opt/flink-webssh/target/jsr308-all-1.1.2.jar
SET 'sql-client.execution.result-mode' = 'tableau';
CREATE TABLE shipments (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (shipment_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysqlserver.mysql.database.azure.com',
'port' = '3306',
'username' = 'myadmin',
'password' = 'xxxxxxxx',
'database-name' = 'sampledb',
'table-name' = 'shipments');
Select * from shipments;
Overall, Azure HD Insight on AKS with Apache Flink integrates seamlessly with Azure Database for MySQL to provide a fully managed, scalable, and secure Change Data Capture (CDC) solution.
To learn more about the services used in this post, check out the following resources:
If you have any feedback or questions about the information provided above, please leave a comment below or email us at AskAzureDBforMySQL@service.microsoft.com or fill out the Get in touch – HDInsight on AKS form at aka.ms/askhdinsight. Thank you!
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.