评价系统(6)

MySQL -> Canal -> Kafka 架构解析->elasticsearch

一句话概括为什么这么设计

这个设计的核心目的是:以对业务数据库零侵入的方式,实现高性能、低延迟、稳定可靠的实时数据流捕获与分发,为下游各种异构系统提供一个统一格式的数据源。


1. 为什么要这么设计?为了解决什么问题?

这种架构主要为了解决以下几个核心问题:

1.1 解耦与零侵入 (Decoupling & Zero Intrusion)

  • 问题:如果让MySQL直接连接Kafka,通常需要在MySQL中编写触发器(Trigger)或调用Kafka的生产者API。这会:
    • 增加数据库负担:触发器消耗宝贵的数据库CPU和IO资源,可能影响核心线上业务的性能。
    • 造成业务耦合:业务代码或数据库结构需要感知并处理数据同步的逻辑,使系统变得复杂难以维护。
    • 难以维护:Kafka地址、Topic等配置硬编码在数据库端,变更起来非常麻烦。
  • 解决方案:Canal伪装成MySQL的从库,通过读取binlog来获取数据变更。这个过程是MySQL主从复制协议内置的,对主库(业务库)的性能影响极小,完全无需修改业务代码,实现了彻底的解耦。

1.2 高吞吐量与缓冲 (High Throughput & Buffering)

  • 问题:MySQL的写入速度可能瞬间爆发(例如,业务高峰或批量操作),而下游消费者(如ES、Hive、Flink作业)的消费速度可能跟不上,或者偶尔会故障、重启。
  • 解决方案:Kafka作为一个高吞吐量的分布式消息队列,起到了缓冲区和削峰填谷的作用。它接收Canal发送来的数据,并可以积压起来,允许下游消费者按照自己的能力来消费,避免了将压力直接传导回MySQL,保证了整个链条的稳定性。

1.3 数据分发给多个消费者 (Multiple Consumers)

  • 问题:一份数据变更通常需要被多个下游系统使用(例如:更新搜索索引、刷新缓存、大数据分析、审计等)。如果让MySQL直连每一个下游,需要建立无数个连接和同步链路,难以管理。
  • 解决方案:Canal将数据写入Kafka后,Kafka的发布-订阅模型完美解决了这个问题。任何下游系统只需要订阅自己感兴趣的Kafka Topic,就可以独立地获取同一份数据。新增一个消费者对MySQL和Canal毫无影响,极大地提升了系统的扩展性。

1.4 格式统一与标准化 (Format Standardization)

  • 问题:下游系统多种多样,它们期望接收的数据格式可能不同(JSON, Avro, Protobuf等)。让每个下游自己解析原始的binlog会非常痛苦。
  • 解决方案:Canal扮演了协议转换器的角色。它将二进制的binlog解析成结构化的、易于理解的数据(例如JSON对象),并发送到Kafka。下游系统从Kafka拿到就是“开箱即用”的数据,无需关心复杂的MySQL复制协议和binlog格式,大大降低了开发复杂度。

2. 为什么要翻译成二进制文件(Binlog)呢?

这个问题需要纠正一个理解:不是Canal“翻译成”二进制,而是MySQL本身就把数据变更“记录成”了二进制日志(Binlog)

  • Binlog是MySQL的核心机制:它用于主从复制和数据恢复。所有对数据库的增删改操作(DML)以及表结构变更(DDL)都会按顺序记录在Binlog文件中。
  • 二进制格式的优势
    • 高性能:二进制格式对计算机友好,写入和读取的效率极高,体积更小。
    • 完整性:它包含了变更前后的完整数据行、执行的SQL语句、时间戳、服务器ID等丰富的元信息。
    • 顺序性:Binlog是严格有序的,保证了数据变更的顺序与实际执行顺序一致,这对于数据同步至关重要。

所以,Canal的工作不是“创造”二进制,而是“解析”MySQL已经生成好的二进制Binlog。


3. 为什么不直接MySQL连Kafka呢?

结合上面的解释,现在我们可以直接回答这个最终问题:

  1. 资源与性能:在MySQL内直连Kafka(通过触发器或UDF)会严重消耗数据库资源,威胁核心业务的稳定性和性能。而Canal基于Binlog的解析是在数据库外部进行的,影响极小。
  2. 耦合与复杂度:直连方案将同步逻辑与业务逻辑强耦合,使数据库架构变得臃肿且难以维护。Canal+Kafka的方案是标准的解耦架构,职责清晰。
  3. 可靠性与回溯:直连方案如果Kafka挂掉,可能会拖垮MySQL或导致数据丢失。而Canal可以灵活地控制解析位点(binlog position/GTID),在Kafka恢复后从断点继续同步,可靠性更高。
  4. 扩展性:直连方案是为一个特定的下游服务的。每增加一个下游消费者,就需要在MySQL端增加一套逻辑,无法支持“一对多”的分发场景。而通过Kafka,可以轻松支持任意多个消费者。
  5. 功能单一性:数据库的核心职责是“安全地存储和处理数据”,而不是“将数据变更消息发布到消息队列”。让专业的组件做专业的事,是分布式系统设计的重要原则。


在配置canal的时候需要先配置mysql,使他支持binary log

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

紧接着添加名为canal账号,为账号赋予权限更新数据库。

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

接下来就是创建canal,下载的canal有以下几个文件夹

bin
conf
lib
logs

一般要修改的文件夹为conf,里面有canal-server/conf/example/instance.properties 该文件默认配置了一个实例,canal/conf/canal.properties 该文件定义了整个Canal的Canal 服务端 定义怎么跑:服务端口、如何加载实例、全局参数。
修改配置文件:canal-server/conf/example/instance.properties

canal.instance.master.address修改为你的MySQL地址。

canal.instance.tsdb.dbUsername修改为你上面授权的账号。

canal.instance.tsdb.dbPassword修改为你上面授权账号的密码。
之后可以启动canal sever使用canal client消费数据。
简要例子来自七米老师
Canal 1.1.1版本之后,默认支持将Canal Server接收到的binlog数据直接投递到MQ,目前默认支持的MQ系统有Kafka、RocketMQ、RabbitMQ、PulsarMQ。
instance.properties 中修改实例配置

# mq config
# 设置默认的topic
canal.mq.topic=example
# 针对库名或者表名发送动态topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#库名.表名: 唯一主键,多个表之间用逗号分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################

紧接着修改canal.properties

# ...
# 可选项: tcp(默认), kafka,RocketMQ,rabbitmq,pulsarmq
canal.serverMode = kafka
# ...

# 是否为flat json格式对象
canal.mq.flatMessage = true
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
canal.mq.accessChannel = local

...

##################################################
#########                    Kafka                   #############
##################################################
# 此处配置修改为你的Kafka环境地址
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0

kafka.kerberos.enable = false
kafka.kerberos.krb5.file = ../conf/kerberos/krb5.conf
kafka.kerberos.jaas.file = ../conf/kerberos/jaas.conf

# sasl demo
# kafka.sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \\n username=\"alice\" \\npassword="alice-secret\";
# kafka.sasl.mechanism = SCRAM-SHA-512
# kafka.security.protocol = SASL_PLAINTEXT

接着来学习kafka
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统。
Kafka 的基本术语
Messages And BatchesKafka 的基本数据单元被称为 message(消息),为减少网络开销,提⾼效率,多个消息会被放⼊同⼀批次 (Batch) 中后再写⼊。
Topic:⽤来对消息进⾏分类,每个进⼊到Kafka的信息都会被放到⼀个Topic
Broker:⽤来实现数据存储的主机服务器,kafka节点
Partition:每个Topic中的消息会被分为若干个Partition,以提高消息的处理效率
Producer:消息的⽣产者
Consumer:消息的消费者
Consumer Group:消息的消费群组


Kafka 的消息通过 Topics(主题) 进⾏分类,⼀个主题可以被分为若⼲个 Partitions(分区),⼀个分区就是⼀个提交⽇志 (commit log)。消息以追加的⽅式写⼊分区,然后以先⼊先出的顺序读取。Kafka 通过分区来实现数据的冗余和伸缩性,分区可以分布在不同的服务器上,这意味着⼀个 Topic 可以横跨多个服务器,以提供⽐单个服务器更强⼤的性能。

生产者负责创建消息。⼀般情况下,⽣产者在把消息均衡地分布到在主题的所有分区上,⽽并不关⼼消息会被写到哪个分区。如果我们想要把消息写到指定的分区,可以通过⾃定义分区器来实现。
消费者是消费者群组的⼀部分,消费者负责消费消息。消费者可以订阅⼀个或者多个主题,并按照消息⽣成的顺序来读取它们。消费者通过检查消息的偏移量 (offset) 来区分读取过的消息。偏移量是⼀个不断递增的数值,在创建消息时,Kafka 会把它添加到其中,在给定的分区⾥,每个消息的偏移量都是唯⼀的。消费者把每个分区最后读取的偏移量保存在 Zookeeper 或 Kafka 上,如果消费者关闭或者重启,它还可以重新获取该偏移量,以保证读取状态不会丢失。

⼀个分区只能被同⼀个消费者群组⾥⾯的⼀个消费者读取,但可以被不同消费者群组中所组成的多个消费者共同读取。多个消费者群组中消费者共同读取同⼀个主题时,彼此之间互不影响。

⼀个独⽴的 Kafka 服务器被称为 Broker。Broker 接收来⾃⽣产者的消息,为消息设置偏移量,并提交消息到磁盘保存。Broker 为消费者提供服务,对读取分区的请求做出响应,返回已经提交到磁盘的消息。Broker 是集群 (Cluster) 的组成部分。每⼀个集群都会选举出⼀个 Broker 作为集群控制器 (Controller),集群控制器负责管理⼯作,包括将分区分配给 Broker 和监控 Broker。在集群中,⼀个分区 (Partition) 从属⼀个 Broker,该 Broker 被称为分区的⾸领 (Leader)。⼀个分区可以分配给多个 Brokers,这个时候会发⽣分区复制。这种复制机制为分区提供了消息冗余,如果有⼀个 Broker 失效,其他Broker 可以接管领导权。


elasticsearch

Elasticsearch 是一个高度可扩展的开源实时搜索和分析引擎,Elasticsearch 的分布式特性能够横向扩展至数以百计的服务器存储以及处理PB级的数据,同时可以在极短的时间内索引、搜索和分析大量的数据。
因此可以使用mysql进行数据增删改,elasticsearch进行查询。
CQRS模式

正向索引

CQRS 的核心是将读和写分开,读用一个模型,写用另一个模型。这样做的好处是,读和写可以各自优化:写模型保证业务逻辑和数据一致性,读模型可以针对性能和展示需求做专门的优化。
它特别适合在读写比例悬殊、查询复杂或者对高并发有要求的场景,比如电商的商品浏览和下单分离、报表系统等。但缺点是架构复杂度提高,需要处理最终一致性的问题。
elasticsearch是基于mysql的正向索引提出的倒排索引
正向索引

倒排索引

正向索引是最传统的,根据id索引的方式。但根据词条查询时,必须先逐条获取每个文档,然后判断文档中是否包含所需要的词条,是根据文档找词条的过程。

而倒排索引则相反,是先找到用户要搜索的词条,根据词条得到保护词条的文档的id,然后根据id获取文档。是根据词条找文档的过程。
ES数据库基本概念

MySQL Elasticsearch 说明
Table Index 索引(index),就是文档的集合,类似数据库的表(table)
Row Document 文档(Document),就是一条条的数据,类似数据库中的行(Row),文档是 JSON 格式
Column Field 字段(Field),就是 JSON 文档中的字段,类似数据库中的列(Column)
Schema Mapping Mapping(映射),定义索引中文档的约束,例如字段类型约束,类似数据库的表结构(Schema)
SQL DSL DSL 是 Elasticsearch 提供的 JSON 风格的请求语句,用来操作 Elasticsearch,实现 CRUD
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容