Mysql和Elasticsearch的数据同步

Elasticsearch的数据来自Mysql数据库中,所以当我们的MySQL发生改变时,Elasticsearch也要跟着改变,这时候我们的es的数据就要和mysql同步了

同步实现思路

常见的数据同步方案有三种:

  • 同步调用
  • 异步通知
  • 监听binlog

方案一:

  • hotel-demo对外提供接口,用来修改elasticsearch中的数据
  • 酒店管理服务在完成数据库操作后,直接调用hotel-demo提供的接口,

也就是说MySQL修改完去修改es的数据

  • 优点:实现简单,粗暴
  • 缺点:业务耦合度高

方案二

  • hotel-admin对mysql数据库数据完成增、删、改后,发送MQ消息

  • hotel-demo监听MQ,接收到消息后完成elasticsearch数据修改

  • 优点:低耦合,实现难度一般

  • 缺点:依赖mq的可靠性

  • 这个实现方式也就是使用mq进行操纵,当我们修改MySQL的服务器修改完以后会将信息发送给MQ,然后修改ES的会进行监听,当监听到了以后就进行修改es的操作

方式三:

  • 给mysql开启binlog功能
  • mysql完成增、删、改操作都会记录在binlog中
  • hotel-demo基于canal监听binlog变化,实时更新elasticsearch中的内容
  • 也就是监听mysql,如果MySQL的数据有变化那么就直接去改变es的数据
  • 优点:完全解除服务间耦合
  • 缺点:开启binlog增加数据库负担、实现复杂度高

在这里使用的是第二种实现方案:使用MQ来写

同步案例代码

用来操控ES的代码(负责监听MQ队列)

/**     * 监听增加和修改的队列     * 因为我们的ES中可以进行全量修改,当有这个id的数据的时候那么就先删除再新增,没有这个数据那么就直接新增     * 所以队列过来的id不管是新增还是修改es都可以判断如果有这个数据id那么就先删除再新增,如果没有这个数据就直接新增,所以新增和修改他俩用一个方法就行了     *     * @param id 队列中需要进行操作的id     */    @RabbitListener(bindings = @QueueBinding(            value = @Queue(name = MqConstants.HOTEL_INSERT_QUEUE),            exchange = @Exchange(name = MqConstants.HOTEL_EXCHANGE, type = ExchangeTypes.DIRECT),            key = MqConstants.HOTEL_INSERT_KEY    ))    public void insertAndUpdate(Long id) {        if (id == null) {            return;        }        log.info("入参:{}", id);        //监听到以后拿到id去数据库查询整个数据        Hotel hotel = iHotelService.getById(id);        //因为查的mysql数据和es的数据有些不一样所以需要做转换        HotelDoc hotelDoc = new HotelDoc(hotel);        //转换为json        String hotelDocJson = JSON.toJSONString(hotelDoc);        System.out.println("hotelDocJson = " + hotelDocJson);        //发送到ES中,因为我们的ES中可以进行全量修改,当有这个id的数据的时候那么就先删除再新增,没有这个数据那么就直接新增        //创建请求语义对象 添加文档数据        IndexRequest request = new IndexRequest("hotel");        //这个新增就是PUT在es中        request.id(hotel.getId().toString()).source(hotelDocJson, XContentType.JSON);        //发送请求        try {            IndexResponse response = client.index(request, RequestOptions.DEFAULT);            RestStatus status = response.status();            log.info("响应结果为:{}", status);        } catch (IOException e) {            e.printStackTrace();        }    }    /**     * 监听删除队列     *     * @param id 队列中需要进行操作的id     */    @RabbitListener(bindings = @QueueBinding(            value = @Queue(name = MqConstants.HOTEL_DELETE_QUEUE),            exchange = @Exchange(name = MqConstants.HOTEL_EXCHANGE, type = ExchangeTypes.DIRECT),            key = MqConstants.HOTEL_DELETE_KEY    ))    public void deleteByMqId(Long id) {        if (id == null) {            return;        }        log.info("入参:{}", id);        //先创建语义对象,直接就可以给里面写id的字段        DeleteRequest request = new DeleteRequest("hotel", id.toString());        //发送请求        try {            DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);            RestStatus status = response.status();            log.info("响应结果为:{}", status);        } catch (IOException e) {            e.printStackTrace();        }

用来操作MySQL的代码:

@RestController@RequestMapping("hotel")public class HotelController {    //注入和RabbitMQ链接    @Autowired    private RabbitTemplate rabbitTemplate;    @Autowired    private IHotelService hotelService;    @GetMapping("/{id}")    public Hotel queryById(@PathVariable("id") Long id) {        return hotelService.getById(id);    }    @GetMapping("/list")    public PageResult hotelList(            @RequestParam(value = "page", defaultValue = "1") Integer page,            @RequestParam(value = "size", defaultValue = "1") Integer size    ) {        Page<Hotel> result = hotelService.page(new Page<>(page, size));        return new PageResult(result.getTotal(), result.getRecords());    }    @PostMapping    public void saveHotel(@RequestBody Hotel hotel) {        hotelService.save(hotel);        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());    }    @PutMapping()    public void updateById(@RequestBody Hotel hotel) {        if (hotel.getId() == null) {            throw new InvalidParameterException("id不能为空");        }        hotelService.updateById(hotel);        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());    }    @DeleteMapping("/{id}")    public void deleteById(@PathVariable("id") Long id) {        hotelService.removeById(id);        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_DELETE_KEY, id);    }}

当然也是可以不使用注解来写,直接在配置文件中写队列绑定的交换机

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,240评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,328评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,182评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,121评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,135评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,093评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,013评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,854评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,295评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,513评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,398评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,989评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,636评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,657评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352

推荐阅读更多精彩内容