AutoMQ 集成 Debezium

前言

在现代企业中,数据处理需求的不断增长使得实时监控和响应数据库变化变得尤为重要。无论是电商平台的订单处理、库存管理,还是金融系统的交易监控,都需要对数据库中的变化进行实时捕捉和处理。这不仅能提高系统响应速度,还能在数据发生变化时及时采取相应的业务操作。

为了实现这一目标,本文将介绍如何使用 Debezium [1] 监测 MySQL 数据库的变化,并将这些变化事件发送到 AutoMQ [2] 中。AutoMQ 作为一种高效、低成本的流处理系统,具有高弹性和高可用性,是企业实时数据处理的理想选择。通过这种方式,企业可以实现订单变化、库存变化等功能,并且可以设置告警规则,实现自动化监控。下游服务可以消费这些消息,从而实时掌握数据库的变化,及时响应业务需求,优化系统性能,确保业务的连续性和稳定性。

AutoMQ 概述

AutoMQ 是一种基于云重新设计的流处理系统,它在保持与 Apache Kafka 100% 兼容的前提下,通过将存储分离至对象存储,显著提升了系统的成本效益和弹性能力。具体来说,AutoMQ 通过构建在 S3 上的流存储库 S3Stream,将存储卸载至云厂商提供的共享云存储 EBS 和 S3,提供低成本、低延时、高可用、高可靠和无限容量的流存储能力。

与传统的 Shared Nothing 架构相比,AutoMQ 采用了 Shared Storage 架构,显著降低了存储和运维的复杂性,同时提升了系统的弹性和可靠性。AutoMQ 的设计理念和技术优势使其成为替换企业现有 Kafka 集群的理想选择。通过采用 AutoMQ,企业可以显著降低存储成本,简化运维,并实现集群的自动扩缩容和流量自平衡,从而更高效地应对业务需求的变化。此外,AutoMQ 的架构支持高效的冷读操作和服务零中断,确保系统在高负载和突发流量情况下的稳定运行。它的存储结构如下:

Debezium 概述

Debezium 是一个开源项目,为捕获数据更改(Change Data Capture,CDC)提供了低延迟的流式处理平台。通过安装和配置 Debezium,您可以监控数据库的变化,并将这些变化事件转化为 Kafka 消息。Debezium 支持多种数据库作为数据源,包括 MySQL、PostgreSQL 和 MongoDB 等,确保只有已提交的更改才是可见的,因此应用程序无需担心事务或回滚的问题。另外,由于 Debezium 用持久化的、有副本备份的日志来记录数据库数据变化的历史,因此,你的应用可以随时停止再重启,而不会错过它停止运行时发生的事件,保证了所有的事件都能被正确地、完全地处理掉。

Debezium 利用 Kafka 和 Kafka Connect [3] 的持久性、可靠性和容错性,每个连接器监控一个上游数据库服务器,捕获所有的数据库更改,并将其记录到 Kafka Topics 中。这样,多个客户端可以独立消费同样的数据更改事件,而对上游数据库的影响降到最小。Debezium 的常见使用场景包括缓存失效、简化单体应用、共享数据库和数据集成等。通过 Debezium,企业可以实现对数据库变化的实时监控和处理,满足各种业务场景的需求,如实时数据同步和事件驱动架构。它的架构图如下:

debezium structure [4]

前提条件

  • 可用的 Docker 环境。

  • 一个可用的 AutoMQ 节点,用于接收数据变更的 events 消息。

  • 开放了 binlog 功能的 MySQL 数据库。

  • 可用的 Kafka Connect 服务,并且能够连接到 AutoMQ 节点。

  • 注册 Debezium MySQL 插件到 Kafka Connect 进行监听和转换数据变化的操作。

快速部署

部署 AutoMQ

可以参考 AutoMQ 官网文档进行搭建:快速开始 | AutoMQ [5]。你将得到 AutoMQ 的服务访问地址,例如:192.168.123.41:9092,之后会通过 Kafka Connect 与 AutoMQ 进行连接。

部署 MySQL

可以通过 Docker 快速部署和配置 MySQL,镜像为 Debezium 官方的提供的镜像,该镜像中包含一定的初始数据库表,能够简化部署过程。可以通过如下命令创建名为 “mysql” 的容器:

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:2.7
  • MYSQL_ROOT_PASSWORD:设置 root 用户的密码。

  • MYSQL_USERMYSQL_PASSWORD:设置普通用户的用户名和密码

通过普通用户连接到 MySQL 客户端:

docker exec -it mysql mysql -u mysqluser -pmysqlpw

通过命令行工具验证数据,查看当前所有的数据库表:

mysql> show databases;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| inventory          |
| performance_schema |
+--------------------+

mysql> use inventory;

mysql> show tables;
+---------------------+
| Tables_in_inventory |
+---------------------+
| addresses           |
| customers           |
| geom                |
| orders              |
| products            |
| products_on_hand    |
+---------------------+

部署 Kafka Connect

使用以下命令拉取 Kafka Connect 镜像并启动容器,注意需要指定 AutoMQ 服务的地址:

docker run -it --rm --name connect -p 8083:8083 \
  -e GROUP_ID=1 \
  -e CONFIG_STORAGE_TOPIC=my_connect_configs \
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets \
  -e STATUS_STORAGE_TOPIC=my_connect_statuses \
  -e BOOTSTRAP_SERVERS=192.168.123.41:9092 \
  --link mysql:mysql quay.io/debezium/connect:2.7

参数描述:

GROUP_ID Kafka Connect 集群的组 ID。
CONFIG_STORAGE_TOPIC 用于存储连接器配置的 AutoMQ 主题。
OFFSET_STORAGE_TOPIC 用于存储连接器偏移量的 AutoMQ 主题。
STATUS_STORAGE_TOPIC 用于存储连接器状态的 AutoMQ 主题。
--link mysql:mysql 连接到名为 mysql 的容器。
-e BOOTSTRAP_SERVERS=192.168.123.41:9092 指定 AutoMQ 节点地址。

如果连接失败,请注意 AutoMQ 和 MySQL 服务是否成功启动,以及开放地址是否正确等问题。

创建并注册 Debezium MySQL 连接器

Debezium MySQL 连接器是作为 Kafka Connector 的插件来监测 MySQL 数据库中的数据变更的,因此这里可以通过 curl 的方式进行 MySQL 连接器的注册,可以执行如下命令:

# 进入某一文件夹下创建文件
cd /home
vim mysql-connector.json

Json 文件内容为:

{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "topic.prefix": "dbserver1",
    "database.include.list": "inventory",
    "schema.history.internal.kafka.bootstrap.servers": "192.168.123.41:9092",
    "schema.history.internal.kafka.topic": "schema-changes.inventory"
  }
}

使用以下命令将连接器配置文件提交到 Kafka Connect:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @mysql-connector.json http://localhost:8083/connectors/

成功响应内容如下:

HTTP/1.1 201 Created
Date: Mon, 05 Aug 2024 01:51:43 GMT
Location: http://localhost:8083/connectors/inventory-connector
Content-Type: application/json
Content-Length: 518
Server: Jetty(9.4.53.v20231009)

{"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"mysql","database.port":"3306","database.user":"debezium","database.password":"dbz","database.server.id":"184054","topic.prefix":"dbserver1","database.include.list":"inventory","schema.history.internal.kafka.bootstrap.servers":"192.168.123.41:9092","schema.history.internal.kafka.topic":"schema-changes.inventory","name":"inventory-connector-new"},"tasks":[],"type":"source"}

验证数据变更捕捉

增删改数据库数据

通过在 MySQL 控制台中执行数据增删改操作,验证 Kafka Connector 是否能捕获到变化:

-- insert
INSERT INTO customers (first_name, last_name, email) VALUES ('John', 'Doe', 'john.doe@example.com');

-- update
UPDATE customers SET email='john.doe@newdomain.com' WHERE first_name='John' AND last_name='Doe';

-- delete
DELETE FROM customers WHERE first_name='John' AND last_name='Doe';

检验 AutoMQ 消息

由于 Kafka Connector 的日志不够明显,这里可以选择更加明显的方式验证数据变更的捕获:通过检查 AutoMQ 中的 Topic 数据来验证是否捕获成功。而验证 Topic 数据可以通过脚本或者可视化的监控工具查看,具体操作可以参考下面的内容。

通过 Kafka 脚本检测

获取 AutoMQ 项目二进制包: Github Releases [6],解压之后,在项目根目录下执行脚本命令,这条命令将获取到 customers 表的数据变化情况:

bin/kafka-console-consumer.sh --bootstrap-server 192.168.123.41:9092 --topic dbserver1.inventory.customers --from-beginning

注意 AutoMQ 服务地址的替换。

结果如下:

通过可视化工具检测

可以通过各种可视化工具查看 AutoMQ 节点状态,比如 Redpanda Console [7]、Kafdrop [8] 等。这里以 Redpanda Console 为例,可以查看当前 AutoMQ 节点的所有 Topics 数据以及详细信息等。具体的 Redpanda Console 部署教程可以参考 AutoMQ 官方文档:Redpanda Console | AutoMQ [9]。

可以看到,这里展示了连接器监测的所有的数据库表,数据库和连接器的配置文件、偏移量以及每张表都对应一个 Topic。

可以查看详细的数据变化信息,比如对 customers 的更新操作:

你可以通过使用更多自定义的数据更新 SQL 来验证数据的捕获情况。

清理环境

可以通过执行如下命令,快速清理 Docker 环境:

docker stop mysql connect

由于启动时指定了--rm 参数,因此一旦容器停止就会被删除。

总结

通过本文的介绍,我们探讨了如何使用 Debezium 监测 MySQL 数据库的变化,并将这些变化事件发送到 AutoMQ 中进行处理。通过部署 MySQL 和 Kafka Connect,并配置 Debezium MySQL 连接器,企业能够实现对数据库变化的实时监控和处理,满足订单变化、库存管理等业务需求。AutoMQ 的高效性和弹性,以及 Debezium 的低延迟和可靠性,使其成为企业实时数据处理的理想选择。针对监听数据变化获取事件消息,有更多可扩展性可以去探索,可以参考:Debezium [10]。

引用

[1] Debezium: https://debezium.io/

[2] AutoMQ: https://www.automq.com/

[3] Kafka Connect: https://docs.confluent.io/platform/current/connect/index.html

[4] Debezium Structure: https://docs.redhat.com/zh_hans/documentation/red_hat_integration/2023.q2/html/debezium_user_guide/description-of-debezium-architecture

[5] 快速开始 AutoMQ: https://docs.automq.com/zh/automq/getting-started

[6] Github Release: https://github.com/AutoMQ/automq/releases

[7] Redpanda Console: https://redpanda.com/redpanda-console-kafka-ui

[8] Kafdrop: https://github.com/obsidiandynamics/kafdrop

[9] Redpanda Console | AutoMQ: https://docs.automq.com/zh/automq/integrations/kafka-ui/redpanda-console

[10] Debezium: https://debezium.io/

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,492评论 6 513
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,048评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,927评论 0 358
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,293评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,309评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,024评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,638评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,546评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,073评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,188评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,321评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,998评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,678评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,186评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,303评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,663评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,330评论 2 358

推荐阅读更多精彩内容