github地址:
https://github.com/osheroff/mysql-binlog-connector-java
前提是mysql需要开启binlog
1.引入依赖
<dependency>
<groupId>com.github.shyiko</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.21.0</version>
</dependency>
2.JAVA代码
package com.mf.core.config;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;
import com.mf.core.utils.JsonUtils;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @Description
* @Author ligeng
* @Date 2021-08-25
* OA binlong 同步
*/
@Component
public class BinlogAutoSyn implements CommandLineRunner {
@Override
public void run(String... args) throws Exception {
try {
BinaryLogClient client = new BinaryLogClient("127.0.0.1", 3306,"test", "", "");
client.setServerId(2);
client.registerEventListener(event -> {
EventData data = event.getData();
if (data instanceof TableMapEventData) {
System.out.println("Table:");
TableMapEventData tableMapEventData = (TableMapEventData) data;
System.out.println(tableMapEventData.getTableId()+": ["+tableMapEventData.getDatabase() + "-" + tableMapEventData.getTable()+"]");
System.out.println(tableMapEventData);
}
if (data instanceof UpdateRowsEventData) {
System.out.println("Update:");
System.out.println(data.toString());
UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData ) data;
for (Map.Entry<Serializable[], Serializable[]> row : updateRowsEventData.getRows()) {
List<Serializable> entries = Arrays.asList(row.getValue());
System.out.println(entries);
String dataObject = getDataObject(entries);
System.out.println(dataObject);
}
} else if (data instanceof WriteRowsEventData) {
System.out.println("Insert:");
System.out.println(data.toString());
System.out.println("Update:");
WriteRowsEventData writeRowsEventData =(WriteRowsEventData) data;
List<Serializable[]> rows = writeRowsEventData.getRows();
for (Serializable[] row : rows) {
List<Serializable> entries = Arrays.asList(row);
String dataObject = getDataObject(entries);
System.out.println(dataObject);
}
} else if (data instanceof DeleteRowsEventData) {
System.out.println("Delete:");
System.out.println(data.toString());
DeleteRowsEventData writeRowsEventData =(DeleteRowsEventData) data;
List<Serializable[]> rows = writeRowsEventData.getRows();
for (Serializable[] row : rows) {
List<Serializable> entries = Arrays.asList(row);
String dataObject = getDataObject(entries);
System.out.println(dataObject);
}
}
});
client.connect();
} catch (IOException e) {
e.printStackTrace();
}finally {
}
}
private static String getDataObject(List message) {
Map<Object,Object> resultObject = new HashMap<>();
String format = "{\"id\":\"0\",\"name\":\"1\",\"ip\":\"2\"}";
//JSONObject json = JSON.parseObject(format);
Map json = JsonUtils.jsonToPojo(format, Map.class);
System.out.println("message");
System.out.println(message);
for (Object key : json.keySet()) {
int i = Integer.parseInt(json.get(key).toString());
resultObject.put(key, message.get(i));
}
return JsonUtils.objectToJson(resultObject);
}
}
3.注意
这种方式的表名需要自定义,代码中记得改:
String format = "{\"id\":\"0\",\"name\":\"1\",\"ip\":\"2\"}";
4.其他方式
可以使用阿里的canal