Performing Change Data Capture (CDC) of MySQL tables using FlinkSQL
Published Dec 06 2023 02:32 PM 4,205 Views
Microsoft

Real-time data processing is a crucial aspect for data-driven organizations. Because relational database engines such as MySQL do not offer event-based interfaces, one way to read and consume the data changes is by leveraging a Change Data Capture (CDC) tool such as Debezium. However, this requires manually setting up and maintaining the infrastructure, such as a Virtual Machine (VM), on which Debezium and Kafka Connect must run.

 

In this blog post, I’ll leverage Azure HDInsight on Azure Kubernetes Service (Preview) with Apache Flink to perform CDC of a MySQL table on Azure Database for MySQL.

  • HDInsight on Azure Kubernetes Service (AKS) is a modern, reliable, secure, and fully managed Platform as a Service (PaaS) offering that runs on AKS. HDInsight on AKS enables deploying popular Open-Source Analytics workloads like Apache Spark, Apache Flink, and Trino without the overhead of managing and monitoring containers.
  • Apache Flink enables enterprises to harness the value of data immediately, serving as a valuable tool for time-sensitive applications and scenarios that require up-to-the-minute insights. Flink doesn't have a single point of failure and has been proven to scale seamlessly by delivering high throughputs at low latencies, powering some of the world’s most demanding stream processing applications. Stream processing systems excel at handling high-velocity, unbounded data streams such as click streams, log streams, live sensor data, social media feeds, event streams, transactional data, and IoT device data.
  • Azure Database for MySQL - Flexible Server is a cloud-based solution that provides a fully managed MySQL database service. This service is built on top of Azure's infrastructure, offering greater flexibility. MySQL uses binary log (binlog) to record all the transactions in the order in which they are committed on the database. This includes changes to table schemas as well as changes to the rows in the tables. MySQL uses binlog mainly for purposes of replication and recovery.

 

Prerequisites

Before starting, be sure to:

 

Preparing a MySQL table

  1. Using either a MySQL Workbench or a MySQL CLI client, connect to the Azure Database for MySQL Flexible Server.
    mysql -h mysqlserver.mysql.database.azure.com -u myadmin -p --ssl-mode=REQUIRED --ssl-ca=DigiCertGlobalRootCA.crt.pem
  1. After connecting to the database server, create a database.
    mysql> CREATE DATABASE ‘sampledb’;
  2. Now create a table in this database.
    mysql> CREATE TABLE `shipments` (
    `shipment_id` int NOT NULL
    `order_id` int NOT NULL,
    `origin` varchar(100) NOT NULL,
    `destination` varchar(100) NOT NULL,
    `is_arrived` tinyint(1) NOT NULL,
    PRIMARY KEY (`shipment_id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
  1. Insert a few records into the shipments table.
    mysql> INSERT INTO shipments VALUES (1, 1001, 'Memphis', 'Atlanta', false),
    (2, 1002, 'Baltimore', 'Minneapolis', false),
    (3, 1003, 'Denver', 'Salt Lake City', false),
    (4, 1004, 'Seattle', 'Las Vegas', false),
    (5, 1005, 'San Francisco', 'Grand Rapids', false),
    (6, 1006, 'Phoenix', 'Dallas', false);

 

Set up an HDInsight on AKS Flink CLI client using SSH on Azure Portal

  1. Connect to your Azure HDInsight on AKS cluster using SSH. From portal, go to your Azure HD Insight on AKS Cluster and click on Secure Shell (SSH) blade, from there access the endpoint as highlighted in the figure below.

    JimToland_7-1701896520840.png
  2. After connecting to your webssh pod, download the latest stable version of maven using the following command.
    $ wget https://dlcdn.apache.org/maven/maven-3/3.9.5/binaries/apache-maven-3.9.5-bin.tar.gz
  3. Now, extract the downloaded maven distribution.
    $ tar xzvf apache-maven-3.9.5-bin.tar.gz
  1. Create a path for maven.
    $ export PATH=$PATH:/opt/flink-webssh/apache-maven-3.9.5/bin
  1. Now, check the version of maven.
    $ mvn -v
    The output should look similar to the following:
    JimToland_1-1701895526865.png
  1. After the output of the maven version looks similar to the above, create a file named pom.xml and include the following content:
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0  http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.dep.download</groupId>
        <artifactId>dep-download</artifactId>
        <version>1.0-SNAPSHOT</version>
            <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc -->
        <dependencies>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
            <version>2.3.0</version>
        </dependency>
        </dependencies>
    </project>


    Here we are leveraging the MySQL CDC connector for reading initial snapshot and incremental data from the MySQL table.

  1. Now, use maven command to download all the dependent jars.
    $ mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
  2. Download the jsr .jar file using the following command.
    $ wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
  1. After all the dependent jars are downloaded, start the FlinkSQL client, with these jars to be imported into the session, using the following command.
    $ /opt/flink-webssh/bin/sql-client.sh -j /opt/flink-webssh/target/flink-sql-connector-mysql-cdc-2.3.0.jar -j /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j /opt/flink-webssh/target/hamcrest-2.1.jar -j /opt/flink-webssh/target/flink-shaded-guava-30.1.1-jre-16.0.jar -j /opt/flink-webssh/target/awaitility-4.0.1.jar -j /opt/flink-webssh/target/jsr308-all-1.1.2.jar
  1. In the FlinkSQL client interface, you can optionally set the output format to tableau to view the operations being performed on the records by using the following command.
    SET 'sql-client.execution.result-mode' = 'tableau';
  2. Now, create a table in the FlinkSQL client.
    CREATE TABLE shipments (
       shipment_id INT,
       order_id INT,
       origin STRING,
       destination STRING,
       is_arrived BOOLEAN,
       PRIMARY KEY (shipment_id) NOT ENFORCED
     ) WITH (
       'connector' = 'mysql-cdc',
       'hostname' = 'mysqlserver.mysql.database.azure.com',
       'port' = '3306',
       'username' = 'myadmin',
       'password' = 'xxxxxxxx',
       'database-name' = 'sampledb',
       'table-name' = 'shipments');
  3. You can now run a select statement to see all the records from the shipments table stored in the MySQL server.
    Select * from shipments;

    MicrosoftTeams-image (4).png
  1. Perform changes on the MySQL side by performing a DML operation and validate this change in real time on FlinkSQL side.
    JimToland_3-1701895526870.png
  1. In real-time, you see this inserted record in FlinkSQL CLI.
    JimToland_0-1701901413219.png

     

  1. Similarly perform an update operation and check that the data is streaming in real time.

    JimToland_5-1701895526872.png

    JimToland_1-1701901722759.png

     

Summary

Overall, Azure HD Insight on AKS with Apache Flink integrates seamlessly with Azure Database for MySQL to provide a fully managed, scalable, and secure Change Data Capture (CDC) solution.

To learn more about the services used in this post, check out the following resources:

 

If you have any feedback or questions about the information provided above, please leave a comment below or email us at AskAzureDBforMySQL@service.microsoft.com or fill out the Get in touch – HDInsight on AKS form at aka.ms/askhdinsight. Thank you!

Co-Authors
Version history
Last update:
‎Dec 06 2023 05:42 PM
Updated by: