上一篇 <<<Redis与MySQL的数据同步解决方案
下一篇 >>>阿里云的Canal框架配置
git地址:https://github.com/alibaba/canal
Canal目前支持三种监听通讯模式 TCP、KAFKA/Rocketmq
核心原理
a.canal会启动一个Server端作为一个mysql从节点拉取mysql主的节点最新的bin文件
b.只要mysql主节点的bin文件发送变化都会增量的形式通知给我们的canalServer端
c.canalServer在通知给canalServerClient,canalServerClient自己手动配置刷新Redis、kafka等各种客户端。
Canal通过TCP实现同步
Mysql配置:
server_id=177 ###服务器id
log-bin=mysql-bin ###开启日志文件
binlog-format=ROW #选择row模式
Canal服务端配置: canal.properties
#通讯端口
canal.port = 11111
#目标支持:client、rocketmq、kafka
#可以使用单独的账号信息
drop user 'canal'@'%';
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
grant all privileges on *.* to 'canal'@'%' identified by 'canal';
flush privileges;
主数据源配置:example/ instance.properties
canal.instance.master.address=10.211.55.26:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=root
public class CanalClient {
public static void main(String args[]) {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("10.211.55.16",
11111), "example", "", "");
int batchSize = 100;
try {
connector.connect();
connector.subscribe("test.*");
connector.rollback();
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
printEntry(message.getEntries());
}
// 提交确认
connector.ack(batchId);
// connector.rollback(batchId); // 处理失败, 回滚数据
}
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
redisDelete(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
redisInsert(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
redisUpdate(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
private static void redisInsert(List<Column> columns) {
JSONObject json = new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if (columns.size() > 0) {
RedisUtil.stringSet(columns.get(0).getValue(), json.toJSONString());
}
}
private static void redisUpdate(List<Column> columns) {
JSONObject json = new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if (columns.size() > 0) {
RedisUtil.stringSet(columns.get(0).getValue(), json.toJSONString());
}
}
private static void redisDelete(List<Column> columns) {
JSONObject json = new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if (columns.size() > 0) {
RedisUtil.delKey(columns.get(0).getValue());
}
}
}
结果打印:
================> binlog[mysql-bin.000015:457] , name[test,my_tb] , eventType : CREATE
================> binlog[mysql-bin.000015:324] , name[test,ttt] , eventType : INSERT
================> binlog[mysql-bin.000015:757] , name[test,ttt] , eventType : UPDATE
================> binlog[mysql-bin.000015:1025] , name[test,ttt] , eventType : DELETE
Canal通过kafka实现同步
配置信息修改
1.修改 example/instance.properties
canal.mq.topic=maikt-topic
2.修改 canal.properties
# tcp, kafka, RocketMQ
canal.serverMode = kafka
canal.mq.servers = 127.0.0.1:9092
<!-- springBoot集成kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
# kafka
spring:
kafka:
# kafka服务器地址(可以多个)
bootstrap-servers: 127.0.0.1:9092
consumer:
# 指定一个默认的组名
group-id: kafka2
# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
auto-offset-reset: earliest
# key/value的反序列化
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
# key/value的序列化
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 批量抓取
batch-size: 65536
# 缓存容量
buffer-memory: 524288
redis:
host: 47.96.105.42
# password:
port: 6369
database: 10
@KafkaListener(topics = "jarye-topic")
public void receive(ConsumerRecord<?, ?> consumer) {
System.out.println("topic名称:" + consumer.topic() + ",key:" +
consumer.key() + "," +
"分区位置:" + consumer.partition()
+ ", 下标" + consumer.offset()+","+consumer.value());
}
推荐阅读:
<<<分布式缓存与本地缓存的区别
<<<Ehcache基础知识
<<<SpringBoot整合Ehcache
<<<Redis的5种数据类型
<<<Redis存放实体对象的方式及区别
<<<Redis的应用场景汇总
<<<Redis高效及线程安全的真正原因
<<<Redis为啥要分为16个库
<<<RDB和AOF持久化方式的区别
<<<Redis与数据库的一致性解决方案
<<<SpringBoot整合Redis的注解版本完成数据缓存
<<<Redis的淘汰策略
<<<Redis的事务操作(Mult和Watch)知识点
<<<Redis的过期机制使用场景示例
<<<Redis实现分布式锁的原理分析
<<<Redis分布式锁的实现代码示例
<<<使用Redisson工具实现分布式锁
<<<Redis集群模式之主从复制原理及存在的缺陷
<<<Redis集群模式之哨兵模式
<<<Redis集群模式之Cluster去中心化分片集群
<<<Linux环境下安装单机Redis
<<<Redis Cluster集群环境搭建
<<<Redis Cluster如何动态扩容与缩容
<<<Redis Cluster主从节点自动切换
<<<Redis集群模式的类型和缺陷汇总
<<<Redis缓存的穿透、击穿和雪崩效应
<<<Redis解决穿透击穿问题时使用的布隆过滤器知识点
<<<Redis与MySQL的数据同步解决方案
<<<阿里云的Canal框架配置
<<<Redis官方提出的redlock分布式锁
<<<Redis的调优设置
<<<Redis常见问题汇总