一、导包:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.6</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.common</artifactId>
<version>1.1.6</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.6</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.9.6</version>
</dependency>
二、配置文件
server:
port: 7070 #服务端口
#spring相关配置
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver #数据库驱动包
url: jdbc:mysql://192.168.72.128:3306/test?autoReconnect=true&useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false
username: root
password: 123456
# redis配置
redis:
database: 0
host: 172.27.3.74
jedis:
pool:
#最大连接数据库连接数,设 0 为没有限制
max-active: 8
#最大等待连接中的数量,设 0 为没有限制
max-idle: 8
#最大建立连接等待时间。如果超过此时间将接到异常。设为-1表示无限制。
max-wait: -1ms
#最小等待连接中的数量,设 0 为没有限制
min-idle: 0
lettuce:
pool:
max-active: 8
max-idle: 8
max-wait: -1ms
min-idle: 0
shutdown-timeout: 100ms
password:
port: 6379
elasticsearch:
rest:
uris: 172.27.4.151:9200
#mybatis:配置
mybatis-plus:
mapper-locations: classpath:dao/*.xml
typeAliasesPackage: com.ikingtec.eastern.mongolia.api.common.pojo # 所有pojo别名类所在包
global-config:
#sql-injector: com.baomidou.mybatisplus.mapper.LogicSqlInjector
db-config:
id-type: auto
field-strategy: not_null
logic-delete-value: -1
logic-not-delete-value: 0
refresh: true
configuration:
map-underscore-to-camel-case: true
cache-enabled: false
call-setters-on-nulls: true
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
三、canal实例化类
package com.example.demo;
import com.alibaba.google.common.collect.Lists;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
@Component
public class CanalClient implements DisposableBean{
private CanalConnector canalConnector;
@Bean
public CanalConnector getCanalConnector(){
canalConnector = CanalConnectors.newClusterConnector(Lists.newArrayList(
new InetSocketAddress("192.168.72.128", 11111)),
"example",
"root",
"123456"
);
canalConnector.connect();
// 指定filter,格式{database}.{table},不传参数就是 subscribe 所有的内容
canalConnector.subscribe();
// 回滚寻找上次中断的位置
canalConnector.rollback();
return canalConnector;
}
/**
* 容器销毁时调用
* @throws Exception
*/
@Override
public void destroy() throws Exception {
if(canalConnector != null){
canalConnector.disconnect();
}
}
}
四、canal调度类
package com.example.demo;
import com.alibaba.fastjson2.JSON;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
public class CanalScheduling implements Runnable, ApplicationContextAware {
private ApplicationContext applicationContext;
@Resource
private CanalConnector canalConnector;
@Override
@Scheduled(fixedDelay = 100)
public void run() {
// System.out.println("run");
long batchId = -1;
try{
int batchSize = 1000;
Message message = canalConnector.getWithoutAck(batchSize);
batchId = message.getId();
List<CanalEntry.Entry> entries = message.getEntries();
if(batchId != -1 && entries.size() > 0){
entries.forEach(entry -> {
if(entry.getEntryType() == CanalEntry.EntryType.ROWDATA){
// 解析处理
publishCanalEvent(entry);
}
});
}
canalConnector.ack(batchId);
}catch(Exception e){
e.printStackTrace();
canalConnector.rollback(batchId);
}
}
/**
* 将 binlog 中的一条(entry),
* 解析成受影响的记录(change),再逐条解析受影响的记录(change),
* 将记录(rowData)的数据结构从 List 转成 Map,
* 完了交给 indexES 方式索引进 ElasticSearch;
* @param entry binlog 中的一条;
*/
private void publishCanalEvent(CanalEntry.Entry entry){
CanalEntry.EventType eventType = entry.getHeader().getEventType();
String database = entry.getHeader().getSchemaName();
String table = entry.getHeader().getTableName();
CanalEntry.RowChange change = null;
try {
change = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
return;
}
change.getRowDatasList().forEach(rowData -> {
List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
String primaryKey = "id";
CanalEntry.Column idColumn = columns.stream().filter(column -> column.getIsKey()
&& primaryKey.equals(column.getName())).findFirst().orElse(null);
Map<String,Object> dataMap = parseColumnsToMap(columns);
System.out.println(JSON.toJSONString(dataMap));
});
}
Map<String,Object> parseColumnsToMap(List<CanalEntry.Column> columns){
Map<String,Object> jsonMap = new HashMap<>();
columns.forEach(column -> {
if(column == null){
return;
}
jsonMap.put(column.getName(), column.getValue());
});
return jsonMap;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
五、启动类
package com.example.demo;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication(scanBasePackages = {"com.example.demo"})
@EnableAspectJAutoProxy(proxyTargetClass = true)
@EnableScheduling
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}