The value of data is time sensitive. Real-time processing makes data-driven decisions accurate and actionable in seconds or minutes instead of hours or days. Change data capture (CDC) refers to the process of identifying and capturing changes made to data in a database and then delivering those changes in real time to a downstream system. Capturing every change from transactions in a source database and moving them to the target in real time keeps the systems synchronized, and helps with real-time analytics use cases and zero-downtime database migrations. The following are a few benefits of CDC:

  • It eliminates the need for bulk load updating and inconvenient batch windows by enabling incremental loading or real-time streaming of data changes into your target repository.
  • It ensures that data in multiple systems stays in sync. This is especially important if you’re making time-sensitive decisions in a high-velocity data environment.

Kafka Connect is an open-source component of Apache Kafka that works as a centralized data hub for simple data integration between databases, key-value stores, search indexes, and file systems. The AWS Glue Schema Registry allows you to centrally discover, control, and evolve data stream schemas. Kafka Connect and Schema Registry integrate to capture schema information from connectors. Kafka Connect provides a mechanism for converting data from the internal data types used by Kafka Connect to data types represented as Avro, Protobuf, or JSON Schema. AvroConverter, ProtobufConverter, and JsonSchemaConverter automatically register schemas generated by Kafka connectors (source) that produce data to Kafka. Connectors (sink) that consume data from Kafka receive schema information in addition to the data for each message. This allows sink connectors to know the structure of the data to provide capabilities like maintaining a database table schema in a data catalog.

The post demonstrates how to build an end-to-end CDC using Amazon MSK Connect, an AWS managed service to deploy and run Kafka Connect applications and AWS Glue Schema Registry, which allows you to centrally discover, control, and evolve data stream schemas.

Solution overview

On the producer side, for this example we choose a MySQL-compatible Amazon Aurora database as the data source, and we have a Debezium MySQL connector to perform CDC. The Debezium connector continuously monitors the databases and pushes row-level changes to a Kafka topic. The connector fetches the schema from the database to serialize the records into a binary form. If the schema doesn’t already exist in the registry, the schema will be registered. If the schema exists but the serializer is using a new version, the schema registry checks the compatibility mode of the schema before updating the schema. In this solution, we use backward compatibility mode. The schema registry returns an error if a new version of the schema is not backward compatible, and we can configure Kafka Connect to send incompatible messages to the dead-letter queue.

On the consumer side, we use an Amazon Simple Storage Service (Amazon S3) sink connector to deserialize the record and store changes to Amazon S3. We build and deploy the Debezium connector and the Amazon S3 sink using MSK Connect.

Example schema

For this post, we use the following schema as the first version of the table:

{ “Database Name”: “sampledatabase”, “Table Name”: “movies”, “Fields”: [ { “name”: “movie_id”, “type”: “INTEGER” }, { “name”: “title”, “type”: “STRING” }, { “name”: “release_year”, “type”: “INTEGER” } ] }

Prerequisites

Before configuring the MSK producer and consumer connectors, we need to first set up a data source, MSK cluster, and new schema registry. We provide an AWS CloudFormation template to generate the supporting resources needed for the solution:

  • A MySQL-compatible Aurora database as the data source. To perform CDC, we turn on binary logging in the DB cluster parameter group.
  • An MSK cluster. To simplify the network connection, we use the same VPC for the Aurora database and the MSK cluster.
  • Two schema registries to handle schemas for message key and message value.
  • One S3 bucket as the data sink.
  • MSK Connect plugins and worker configuration needed for this demo.
  • One Amazon Elastic Compute Cloud (Amazon EC2) instance to run database commands.

To set up resources in your AWS account, complete the following steps in an AWS Region that supports Amazon MSK, MSK Connect, and the AWS Glue Schema Registry:

  1. Choose Launch Stack:
  2. Choose Next.
  3. For Stack name, enter suitable name.
  4. For Database Password, enter the password you want for the database user.
  5. Keep other values as default.
  6. Choose Next.
  7. On the next page, choose Next.
  8. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  9. Choose Create stack.

Custom plugin for the source and destination connector

A custom plugin is a set of JAR files that contain the implementation of one or more connectors, transforms, or converters. Amazon MSK will install the plugin on the workers of the MSK Connect cluster where the connector is running. As part of this demo, for the source connector we use open-source Debezium MySQL connector JARs, and for the destination connector we use the Confluent community licensed Amazon S3 sink connector JARs. Both the plugins are also added with libraries for Avro Serializers and Deserializers of the AWS Glue Schema Registry. These custom plugins are already created as part of the CloudFormation template deployed in the previous step.

Use the AWS Glue Schema Registry with the Debezium connector on MSK Connect as the MSK producer

We first deploy the source connector using the Debezium MySQL plugin to stream data from an Amazon Aurora MySQL-Compatible Edition database to Amazon MSK. Complete the following steps:

  1. On the Amazon MSK console, in the navigation pane, under MSK Connect, choose Connectors.
  2. Choose Create connector.
  3. Choose Use existing custom plugin and then pick the custom plugin with name starting msk-blog-debezium-source-plugin.
  4. Choose Next.
  5. Enter a suitable name like debezium-mysql-connector and an optional description.
  6. For Apache Kafka cluster, choose MSK cluster and choose the cluster created by the CloudFormation template.
  7. In Connector configuration, delete the default values and use the following configuration key-value pairs and with the appropriate values:
    • name – The name used for the connector.
    • database.hostsname – The CloudFormation output for Database Endpoint.
    • database.user and database.password – The parameters passed in the CloudFormation template.
    • database.history.kafka.bootstrap.servers – The CloudFormation output for Kafka Bootstrap.
    • key.converter.region and value.converter.region – Your Region.
name=<Connector-name>
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname=<DBHOST>
database.port=3306
database.user=<DBUSER>
database.password=<DBPASSWORD>
database.server.id=42
database.server.name=db1
table.whitelist=sampledatabase.movies
database.history.kafka.bootstrap.servers=<MSK-BOOTSTRAP>
database.history.kafka.topic=dbhistory.demo1
key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
key.converter.region=<REGION>
value.converter.region=<REGION>
key.converter.registry.name=msk-connect-blog-keys
value.converter.registry.name=msk-connect-blog-values
key.converter.compatibility=FORWARD
value.converter.compatibility=FORWARD
key.converter.schemaAutoRegistrationEnabled=true
value.converter.schemaAutoRegistrationEnabled=true
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=rewrite
transforms.unwrap.add.fields=op,source.ts_ms
tasks.max=1

Some of these settings are generic and should be specified for any connector. For example:

  • connector.class is the Java class of the connector
  • tasks.max is the maximum number of tasks that should be created for this connector

Some settings (database.*, transforms.*) are specific to the Debezium MySQL connector. Refer to Debezium MySQL Source Connector Configuration Properties for more information.

Some settings (key.converter.* and value.converter.*) are specific to the Schema Registry. We use the AWSKafkaAvroConverter from the AWS Glue Schema Registry Library as the format converter. To configure AWSKafkaAvroConverter, we use the value of the string constant properties in the AWSSchemaRegistryConstants class:

  • key.converter and value.converter control the format of the data that will be written to Kafka for source connectors or read from Kafka for sink connectors. We use AWSKafkaAvroConverter for Avro format.
  • key.converter.registry.name and value.converter.registry.name define which schema registry to use.
  • key.converter.compatibility and value.converter.compatibility define the compatibility model.

Refer to Using Kafka Connect with AWS Glue Schema Registry for more information.

  1. Next, we configure Connector capacity. We can choose Provisioned and leave other properties as default
  2. For Worker configuration, choose the custom worker configuration with name starting msk-gsr-blog created as part of the CloudFormation template.
  3. For Access permissions, use the AWS Identity and Access Management (IAM) role generated by the CloudFormation template MSKConnectRole.
  4. Choose Next.
  5. For Security, choose the defaults.
  6. Choose Next.
  7. For Log delivery, select Deliver to Amazon CloudWatch Logs and browse for the log group created by the CloudFormation template (msk-connector-logs).
  8. Choose Next.
  9. Review the settings and choose Create connector.

After a few minutes, the connector changes to running status.

Use the AWS Glue Schema Registry with the Confluent S3 sink connector running on MSK Connect as the MSK consumer

We deploy the sink connector using the Confluent S3 sink plugin to stream data from Amazon MSK to Amazon S3. Complete the following steps:

    1. On the Amazon MSK console, in the navigation pane, under MSK Connect, choose Connectors.
    2. Choose Create connector.
    3. Choose Use existing custom plugin and choose the custom plugin with name starting msk-blog-S3sink-plugin.
    4. Choose Next.
    5. Enter a suitable name like s3-sink-connector and an optional description.
    6. For Apache Kafka cluster, choose MSK cluster and select the cluster created by the CloudFormation template.
    7. In Connector configuration, delete the default values provided and use the following configuration key-value pairs with appropriate values:
        • name – The same name used for the connector.
        • s3.bucket.name – The CloudFormation output for Bucket Name.
        • s3.region, key.converter.region, and value.converter.region – Your Region.
name=<CONNERCOR-NAME>
connector.class=io.confluent.connect.s3.S3SinkConnector
s3.bucket.name=<BUCKET-NAME>
key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
s3.region=<REGION>
storage.class=io.confluent.connect.s3.storage.S3Storage
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
format.class=io.confluent.connect.s3.format.parquet.ParquetFormat
flush.size=10
tasks.max=1
key.converter.schemaAutoRegistrationEnabled=true
value.converter.schemaAutoRegistrationEnabled=true
key.converter.region=<REGION>
value.converter.region=<REGION>
value.converter.avroRecordType=GENERIC_RECORD
key.converter.avroRecordType=GENERIC_RECORD
value.converter.compatibility=NONE
key.converter.compatibility=NONE
store.kafka.keys=false
schema.compatibility=NONE
topics=db1.sampledatabase.movies
value.converter.registry.name=msk-connect-blog-values
key.converter.registry.name=msk-connect-blog-keys
store.kafka.headers=false

  1. Next, we configure Connector capacity. We can choose Provisioned and leave other properties as default
  2. For Worker configuration, choose the custom worker configuration with name starting msk-gsr-blog created as part of the CloudFormation template.
  3. For Access permissions, use the IAM role generated by the CloudFormation template MSKConnectRole.
  4. Choose Next.
  5. For Security, choose the defaults.
  6. Choose Next.
  7. For Log delivery, select Deliver to Amazon CloudWatch Logs and browse for the log group created by the CloudFormation template msk-connector-logs.
  8. Choose Next.
  9. Review the settings and choose Create connector.

After a few minutes, the connector is running.

Test the end-to-end CDC log stream

Now that both the Debezium and S3 sink connectors are up and running, complete the following steps to test the end-to-end CDC:

  1. On the Amazon EC2 console, navigate to the Security groups page.
  2. Select the security group ClientInstanceSecurityGroup and choose Edit inbound rules.
  3. Add an inbound rule allowing SSH connection from your local network.
  4. On the Instances page, select the instance ClientInstance and choose Connect.
  5. On the EC2 Instance Connect tab, choose Connect.
  6. Ensure your current working directory is /home/ec2-user and it has the files create_table.sql, alter_table.sql , initial_insert.sql, and insert_data_with_new_column.sql.
  7. Create a table in your MySQL database by running the following command (provide the database host name from the CloudFormation template outputs):
mysql -h <DATABASE-HOST> -u master -p < create_table.sql

  1. When prompted for a password, enter the password from the CloudFormation template parameters.
  2. Insert some sample data into the table with the following command:
mysql -h <DATABASE-HOST> -u master -p < initial_insert.sql

  1. When prompted for a password, enter the password from the CloudFormation template parameters.
  2. On the AWS Glue console, choose Schema registries in the navigation pane, then choose Schemas.
  3. Navigate to db1.sampledatabase.movies version 1 to check the new schema created for the movies table:
{ "type": "record", "name": "Value", "namespace": "db1.sampledatabase.movies", "fields": [ { "name": "movie_id", "type": "int" }, { "name": "title", "type": "string" }, { "name": "release_year", "type": "int" }, { "name": "__op", "type": [ "null", "string" ], "default": null }, { "name": "__source_ts_ms", "type": [ "null", "long" ], "default": null }, { "name": "__deleted", "type": [ "null", "string" ], "default": null } ], "connect.name": "db1.sampledatabase.movies.Value"
}

A separate S3 folder is created for each partition of the Kafka topic, and data for the topic is written in that folder.

  1. On the Amazon S3 console, check for data written in Parquet format in the folder for your Kafka topic.

Schema evolution

After the initial schema is defined, applications may need to evolve it over time. When this happens, it’s critical for the downstream consumers to be able to handle data encoded with both the old and the new schema seamlessly. Compatibility modes allow you to control how schemas can or can’t evolve over time. These modes form the contract between applications producing and consuming data. For detailed information about different compatibility modes available in the AWS Glue Schema Registry, refer to AWS Glue Schema Registry. In our example, we use backward combability to ensure consumers can read both the current and previous schema versions. Complete the following steps:

  1. Add a new column to the table by running the following command:
mysql -h <DATABASE-HOST> -u master -p < alter_table.sql

  1. Insert new data into the table by running the following command:
mysql -h <DATABASE-HOST> -u master -p < insert_data_with_new_column.sql

  1. On the AWS Glue console, choose Schema registries in the navigation pane, then choose Schemas.
  2. Navigate to the schema db1.sampledatabase.movies version 2 to check the new version of the schema created for the movies table movies including the country column that you added:
{ "type": "record", "name": "Value", "namespace": "db1.sampledatabase.movies", "fields": [ { "name": "movie_id", "type": "int" }, { "name": "title", "type": "string" }, { "name": "release_year", "type": "int" }, { "name": "COUNTRY", "type": "string" }, { "name": "__op", "type": [ "null", "string" ], "default": null }, { "name": "__source_ts_ms", "type": [ "null", "long" ], "default": null }, { "name": "__deleted", "type": [ "null", "string" ], "default": null } ], "connect.name": "db1.sampledatabase.movies.Value"
}

  1. On the Amazon S3 console, check for data written in Parquet format in the folder for the Kafka topic.

Clean up

To help prevent unwanted charges to your AWS account, delete the AWS resources that you used in this post:

  1. On the Amazon S3 console, navigate to the S3 bucket created by the CloudFormation template.
  2. Select all files and folders and choose Delete.
  3. Enter permanently delete as directed and choose Delete objects.
  4. On the AWS CloudFormation console, delete the stack you created.
  5. Wait for the stack status to change to DELETE_COMPLETE.

Conclusion

This post demonstrated how to use Amazon MSK, MSK Connect, and the AWS Glue Schema Registry to build a CDC log stream and evolve schemas for data streams as business needs change. You can apply this architecture pattern to other data sources with different Kafka connecters. For more information, refer to the MSK Connect examples.


About the Author

Kalyan Janaki is Senior Big Data & Analytics Specialist with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS.

Source: https://aws.amazon.com/blogs/big-data/build-an-end-to-end-change-data-capture-with-amazon-msk-connect-and-aws-glue-schema-registry/