前言
在现代企业中,数据处理需求的不断增长使得实时监控和响应数据库变化变得尤为重要。无论是电商平台的订单处理、库存管理,还是金融系统的交易监控,都需要对数据库中的变化进行实时捕捉和处理。这不仅能提高系统响应速度,还能在数据发生变化时及时采取相应的业务操作。
为了实现这一目标,本文将介绍如何使用 Debezium [1] 监测 MySQL 数据库的变化,并将这些变化事件发送到 AutoMQ [2] 中。AutoMQ 作为一种高效、低成本的流处理系统,具有高弹性和高可用性,是企业实时数据处理的理想选择。通过这种方式,企业可以实现订单变化、库存变化等功能,并且可以设置告警规则,实现自动化监控。下游服务可以消费这些消息,从而实时掌握数据库的变化,及时响应业务需求,优化系统性能,确保业务的连续性和稳定性。
AutoMQ 概述
AutoMQ 是一种基于云重新设计的流处理系统,它在保持与 Apache Kafka 100% 兼容的前提下,通过将存储分离至对象存储,显著提升了系统的成本效益和弹性能力。具体来说,AutoMQ 通过构建在 S3 上的流存储库 S3Stream,将存储卸载至云厂商提供的共享云存储 EBS 和 S3,提供低成本、低延时、高可用、高可靠和无限容量的流存储能力。
与传统的 Shared Nothing 架构相比,AutoMQ 采用了 Shared Storage 架构,显著降低了存储和运维的复杂性,同时提升了系统的弹性和可靠性。AutoMQ 的设计理念和技术优势使其成为替换企业现有 Kafka 集群的理想选择。通过采用 AutoMQ,企业可以显著降低存储成本,简化运维,并实现集群的自动扩缩容和流量自平衡,从而更高效地应对业务需求的变化。此外,AutoMQ 的架构支持高效的冷读操作和服务零中断,确保系统在高负载和突发流量情况下的稳定运行。它的存储结构如下:
Debezium 概述
Debezium 是一个开源项目,为捕获数据更改(Change Data Capture,CDC)提供了低延迟的流式处理平台。通过安装和配置 Debezium,您可以监控数据库的变化,并将这些变化事件转化为 Kafka 消息。Debezium 支持多种数据库作为数据源,包括 MySQL、PostgreSQL 和 MongoDB 等,确保只有已提交的更改才是可见的,因此应用程序无需担心事务或回滚的问题。另外,由于 Debezium 用持久化的、有副本备份的日志来记录数据库数据变化的历史,因此,你的应用可以随时停止再重启,而不会错过它停止运行时发生的事件,保证了所有的事件都能被正确地、完全地处理掉。
Debezium 利用 Kafka 和 Kafka Connect [3] 的持久性、可靠性和容错性,每个连接器监控一个上游数据库服务器,捕获所有的数据库更改,并将其记录到 Kafka Topics 中。这样,多个客户端可以独立消费同样的数据更改事件,而对上游数据库的影响降到最小。Debezium 的常见使用场景包括缓存失效、简化单体应用、共享数据库和数据集成等。通过 Debezium,企业可以实现对数据库变化的实时监控和处理,满足各种业务场景的需求,如实时数据同步和事件驱动架构。它的架构图如下:
前提条件
可用的 Docker 环境。
一个可用的 AutoMQ 节点,用于接收数据变更的 events 消息。
开放了 binlog 功能的 MySQL 数据库。
可用的 Kafka Connect 服务,并且能够连接到 AutoMQ 节点。
注册 Debezium MySQL 插件到 Kafka Connect 进行监听和转换数据变化的操作。
快速部署
部署 AutoMQ
可以参考 AutoMQ 官网文档进行搭建:快速开始 | AutoMQ [5]。你将得到 AutoMQ 的服务访问地址,例如:192.168.123.41:9092
,之后会通过 Kafka Connect 与 AutoMQ 进行连接。
部署 MySQL
可以通过 Docker 快速部署和配置 MySQL,镜像为 Debezium 官方的提供的镜像,该镜像中包含一定的初始数据库表,能够简化部署过程。可以通过如下命令创建名为 “mysql” 的容器:
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:2.7
MYSQL_ROOT_PASSWORD
:设置 root 用户的密码。MYSQL_USER
和MYSQL_PASSWORD
:设置普通用户的用户名和密码
通过普通用户连接到 MySQL 客户端:
docker exec -it mysql mysql -u mysqluser -pmysqlpw
通过命令行工具验证数据,查看当前所有的数据库表:
mysql> show databases;
+--------------------+
| Database |
+--------------------+
| information_schema |
| inventory |
| performance_schema |
+--------------------+
mysql> use inventory;
mysql> show tables;
+---------------------+
| Tables_in_inventory |
+---------------------+
| addresses |
| customers |
| geom |
| orders |
| products |
| products_on_hand |
+---------------------+
部署 Kafka Connect
使用以下命令拉取 Kafka Connect 镜像并启动容器,注意需要指定 AutoMQ 服务的地址:
docker run -it --rm --name connect -p 8083:8083 \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=my_connect_configs \
-e OFFSET_STORAGE_TOPIC=my_connect_offsets \
-e STATUS_STORAGE_TOPIC=my_connect_statuses \
-e BOOTSTRAP_SERVERS=192.168.123.41:9092 \
--link mysql:mysql quay.io/debezium/connect:2.7
参数描述:
GROUP_ID | Kafka Connect 集群的组 ID。 |
---|---|
CONFIG_STORAGE_TOPIC | 用于存储连接器配置的 AutoMQ 主题。 |
OFFSET_STORAGE_TOPIC | 用于存储连接器偏移量的 AutoMQ 主题。 |
STATUS_STORAGE_TOPIC | 用于存储连接器状态的 AutoMQ 主题。 |
--link mysql:mysql | 连接到名为 mysql 的容器。 |
-e BOOTSTRAP_SERVERS=192.168.123.41:9092 | 指定 AutoMQ 节点地址。 |
如果连接失败,请注意 AutoMQ 和 MySQL 服务是否成功启动,以及开放地址是否正确等问题。
创建并注册 Debezium MySQL 连接器
Debezium MySQL 连接器是作为 Kafka Connector 的插件来监测 MySQL 数据库中的数据变更的,因此这里可以通过 curl 的方式进行 MySQL 连接器的注册,可以执行如下命令:
# 进入某一文件夹下创建文件
cd /home
vim mysql-connector.json
Json 文件内容为:
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"topic.prefix": "dbserver1",
"database.include.list": "inventory",
"schema.history.internal.kafka.bootstrap.servers": "192.168.123.41:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory"
}
}
使用以下命令将连接器配置文件提交到 Kafka Connect:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @mysql-connector.json http://localhost:8083/connectors/
成功响应内容如下:
HTTP/1.1 201 Created
Date: Mon, 05 Aug 2024 01:51:43 GMT
Location: http://localhost:8083/connectors/inventory-connector
Content-Type: application/json
Content-Length: 518
Server: Jetty(9.4.53.v20231009)
{"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"mysql","database.port":"3306","database.user":"debezium","database.password":"dbz","database.server.id":"184054","topic.prefix":"dbserver1","database.include.list":"inventory","schema.history.internal.kafka.bootstrap.servers":"192.168.123.41:9092","schema.history.internal.kafka.topic":"schema-changes.inventory","name":"inventory-connector-new"},"tasks":[],"type":"source"}
验证数据变更捕捉
增删改数据库数据
通过在 MySQL 控制台中执行数据增删改操作,验证 Kafka Connector 是否能捕获到变化:
-- insert
INSERT INTO customers (first_name, last_name, email) VALUES ('John', 'Doe', 'john.doe@example.com');
-- update
UPDATE customers SET email='john.doe@newdomain.com' WHERE first_name='John' AND last_name='Doe';
-- delete
DELETE FROM customers WHERE first_name='John' AND last_name='Doe';
检验 AutoMQ 消息
由于 Kafka Connector 的日志不够明显,这里可以选择更加明显的方式验证数据变更的捕获:通过检查 AutoMQ 中的 Topic 数据来验证是否捕获成功。而验证 Topic 数据可以通过脚本或者可视化的监控工具查看,具体操作可以参考下面的内容。
通过 Kafka 脚本检测
获取 AutoMQ 项目二进制包: Github Releases [6],解压之后,在项目根目录下执行脚本命令,这条命令将获取到 customers 表的数据变化情况:
bin/kafka-console-consumer.sh --bootstrap-server 192.168.123.41:9092 --topic dbserver1.inventory.customers --from-beginning
注意 AutoMQ 服务地址的替换。
结果如下:
通过可视化工具检测
可以通过各种可视化工具查看 AutoMQ 节点状态,比如 Redpanda Console [7]、Kafdrop [8] 等。这里以 Redpanda Console 为例,可以查看当前 AutoMQ 节点的所有 Topics 数据以及详细信息等。具体的 Redpanda Console 部署教程可以参考 AutoMQ 官方文档:Redpanda Console | AutoMQ [9]。
可以看到,这里展示了连接器监测的所有的数据库表,数据库和连接器的配置文件、偏移量以及每张表都对应一个 Topic。
可以查看详细的数据变化信息,比如对 customers 的更新操作:
你可以通过使用更多自定义的数据更新 SQL 来验证数据的捕获情况。
清理环境
可以通过执行如下命令,快速清理 Docker 环境:
docker stop mysql connect
由于启动时指定了
--rm
参数,因此一旦容器停止就会被删除。
总结
通过本文的介绍,我们探讨了如何使用 Debezium 监测 MySQL 数据库的变化,并将这些变化事件发送到 AutoMQ 中进行处理。通过部署 MySQL 和 Kafka Connect,并配置 Debezium MySQL 连接器,企业能够实现对数据库变化的实时监控和处理,满足订单变化、库存管理等业务需求。AutoMQ 的高效性和弹性,以及 Debezium 的低延迟和可靠性,使其成为企业实时数据处理的理想选择。针对监听数据变化获取事件消息,有更多可扩展性可以去探索,可以参考:Debezium [10]。
引用
[1] Debezium: https://debezium.io/
[2] AutoMQ: https://www.automq.com/
[3] Kafka Connect: https://docs.confluent.io/platform/current/connect/index.html
[4] Debezium Structure: https://docs.redhat.com/zh_hans/documentation/red_hat_integration/2023.q2/html/debezium_user_guide/description-of-debezium-architecture
[5] 快速开始 AutoMQ: https://docs.automq.com/zh/automq/getting-started
[6] Github Release: https://github.com/AutoMQ/automq/releases
[7] Redpanda Console: https://redpanda.com/redpanda-console-kafka-ui
[8] Kafdrop: https://github.com/obsidiandynamics/kafdrop
[9] Redpanda Console | AutoMQ: https://docs.automq.com/zh/automq/integrations/kafka-ui/redpanda-console
[10] Debezium: https://debezium.io/