java实现mysql的binlog监听

github地址:

https://github.com/osheroff/mysql-binlog-connector-java

前提是mysql需要开启binlog

1.引入依赖

        <dependency>
            <groupId>com.github.shyiko</groupId>
            <artifactId>mysql-binlog-connector-java</artifactId>
            <version>0.21.0</version>
        </dependency>

2.JAVA代码

package com.mf.core.config;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;
import com.mf.core.utils.JsonUtils;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @Description
 * @Author ligeng
 * @Date 2021-08-25
 * OA binlong 同步
 */
@Component
public class BinlogAutoSyn implements CommandLineRunner {
    @Override
    public void run(String... args) throws Exception {
        try {
            BinaryLogClient client = new BinaryLogClient("127.0.0.1", 3306,"test", "", "");
            client.setServerId(2);

            client.registerEventListener(event -> {
                EventData data = event.getData();
                if (data instanceof TableMapEventData) {
                    System.out.println("Table:");
                    TableMapEventData tableMapEventData = (TableMapEventData) data;
                    System.out.println(tableMapEventData.getTableId()+": ["+tableMapEventData.getDatabase() + "-" + tableMapEventData.getTable()+"]");
                    System.out.println(tableMapEventData);
                }
                if (data instanceof UpdateRowsEventData) {
                    System.out.println("Update:");
                    System.out.println(data.toString());
                    UpdateRowsEventData  updateRowsEventData = (UpdateRowsEventData ) data;
                    for (Map.Entry<Serializable[], Serializable[]> row : updateRowsEventData.getRows()) {
                        List<Serializable> entries = Arrays.asList(row.getValue());
                        System.out.println(entries);
                        String dataObject = getDataObject(entries);
                        System.out.println(dataObject);
                    }
                } else if (data instanceof WriteRowsEventData) {
                    System.out.println("Insert:");
                    System.out.println(data.toString());
                    System.out.println("Update:");
                    WriteRowsEventData writeRowsEventData =(WriteRowsEventData) data;
                    List<Serializable[]> rows = writeRowsEventData.getRows();
                    for (Serializable[] row : rows) {
                        List<Serializable> entries = Arrays.asList(row);
                        String dataObject = getDataObject(entries);
                        System.out.println(dataObject);
                    }

                } else if (data instanceof DeleteRowsEventData) {
                    System.out.println("Delete:");
                    System.out.println(data.toString());
                    DeleteRowsEventData writeRowsEventData =(DeleteRowsEventData) data;
                    List<Serializable[]> rows = writeRowsEventData.getRows();
                    for (Serializable[] row : rows) {
                        List<Serializable> entries = Arrays.asList(row);
                        String dataObject = getDataObject(entries);
                        System.out.println(dataObject);
                    }
                }
            });


            client.connect();
        } catch (IOException e) {
            e.printStackTrace();
        }finally {

        }
    }


    private static String getDataObject(List message) {
        Map<Object,Object> resultObject = new HashMap<>();
        String format = "{\"id\":\"0\",\"name\":\"1\",\"ip\":\"2\"}";
        //JSONObject json = JSON.parseObject(format);
        Map json = JsonUtils.jsonToPojo(format, Map.class);
        System.out.println("message");
        System.out.println(message);

        for (Object key : json.keySet()) {
            int i = Integer.parseInt(json.get(key).toString());
            resultObject.put(key, message.get(i));
        }
        return JsonUtils.objectToJson(resultObject);
    }
}

3.注意

这种方式的表名需要自定义,代码中记得改:

        String format = "{\"id\":\"0\",\"name\":\"1\",\"ip\":\"2\"}";

4.其他方式

可以使用阿里的canal

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容