聊聊多个节点实例数据同步如何触发

前言

之前写过一篇文章聊聊在集群环境中本地缓存如何进行同步,今天聊的话题看着和那篇文章有点雷同,不过我们今天重点会放在方法论上,也不会拘泥于具体实现。在聊这个话题之前,大家可以思考一下,如果要实现多个实例数据同步触发,大家会怎么做?脑海里,是会浮现,我可以用消息队列或者定时器来实现?这种已经具象化的技术细节?还是进一步进行拆解?

假设大家已经思考好,我来说下我个人的思考逻辑。今天标题的内容,主要讲同步如何触发?内容已经圈定死,因此就不谈数据同步涉及的一致性,只谈如何触发这个动作。多节点实例触发的关键是,一旦触发,各个节点都要通知到位。那如何进行多个节点通知呢?答案就是通过广播。那如何感知是否通知到位呢?这个还真不好搞,那我们换个思路,如果通知不到位,我们的措施会是啥?正常我们的思路,会是通过补偿机制

今天我们聚焦在广播这个动作,补偿机制暂不在本文讨论。下面通过一个案例实操下

本案例核心流程图

703eca7227f4beb57f0e6052321c9b2e_4dc98ddc5a9209dc2bbeeaecde33dc4b.png

从图中,我们会发现本案例是通过一个中间件来实现。那这个中间件是啥?是rocketmq、kafka还是其他具有广播功能的组件或者服务?答案是也不是。怎么说?我们这个中间件,其实是一层高层广播抽象,而非具体实现

实现步骤

1、定义高层广播抽象接口

@FunctionalInterface
public interface DataSyncTrigger {

    void broadcast(Object data);
}

2、定义通知事件类

注: 本文会采用spring的事件监听模式实现

public class DataSyncTriggerEvent extends ApplicationEvent {
    /**
     * Create a new ApplicationEvent.
     *
     * @param source the object on which the event initially occurred (never {@code null})
     */
    public DataSyncTriggerEvent(Object source) {
        super(source);
    }
}

3、定义高层抽象广播的模板基类

@RequiredArgsConstructor
public abstract class BaseDataSyncTrigger implements DataSyncTrigger, ApplicationContextAware {
    protected ApplicationContext applicationContext;
    
    protected final DataSyncTriggerProperty dataSyncTriggerProperty;


    @Override
    public void broadcast(Object data) {
        DataSyncTriggerEvent dataSyncTriggerEvent = new DataSyncTriggerEvent(data);
        applicationContext.publishEvent(dataSyncTriggerEvent);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    private Collection<DataSyncTriggerCallBack> listDataSyncTriggerCallBacks(){
        try {
            Map<String, DataSyncTriggerCallBack> dataSyncTriggerCallBackMap = applicationContext.getBeansOfType(DataSyncTriggerCallBack.class);
            return Collections.unmodifiableList(dataSyncTriggerCallBackMap.values().stream().collect(Collectors.toList()));
        } catch (BeansException e) {

        }

        return Collections.emptyList();
    }
    
    public void callBack(Object data){
        Collection<DataSyncTriggerCallBack> dataSyncTriggerCallBacks = listDataSyncTriggerCallBacks();
        if(CollectionUtil.isNotEmpty(dataSyncTriggerCallBacks)){
            if(dataSyncTriggerProperty.isTriggerCallBackAsync()){
                callbackAsync(data, dataSyncTriggerCallBacks);
            }else{
                callbackSync(data, dataSyncTriggerCallBacks);
            }
          
        }
    }

    private  void callbackSync(Object data, Collection<DataSyncTriggerCallBack> dataSyncTriggerCallBacks) {
        for (DataSyncTriggerCallBack dataSyncTriggerCallBack : dataSyncTriggerCallBacks) {
            dataSyncTriggerCallBack.execute(data);
        }
    }

    private  void callbackAsync(Object data, Collection<DataSyncTriggerCallBack> dataSyncTriggerCallBacks) {
        for (DataSyncTriggerCallBack dataSyncTriggerCallBack : dataSyncTriggerCallBacks) {
            ThreadUtil.execAsync(()->{
                dataSyncTriggerCallBack.execute(data);
            });
        }
    }
}

4、定义抽象回调接口【扩展点】

当业务收到通知,可以通过该回调接口进行具体业务操作

@FunctionalInterface
public interface DataSyncTriggerCallBack {

    void execute(Object data);
}

5、定义具体广播实现类

注: 这个广播的具体实现方案就很多了,只要天生具备广播能力或者基于原来特性扩展出广播的组件都可以,比如rocketmq的广播机制、redis的pubsub机制、zookeeper的分布式协调能力、基于注册中心服务发现能力改造出来的广播能力等。本文就以redis的pubsub机制为例

Slf4j
public class RedisDataSyncTrigger extends BaseDataSyncTrigger implements CommandLineRunner {


    private final RedisTemplate redisTemplate;
    

    public RedisDataSyncTrigger(RedisTemplate redisTemplate, DataSyncTriggerProperty dataSyncTriggerProperty) {
        super(dataSyncTriggerProperty);
        this.redisTemplate = redisTemplate;
    }

    @EventListener
    public void listener(DataSyncTriggerEvent dataSyncTriggerEvent){
        SyncDataDTO syncDataDTO = SyncDataDTO.builder()
                .data(dataSyncTriggerEvent.getSource())
                        .timeStamp(System.currentTimeMillis())
                                .build();
        try {
            redisTemplate.convertAndSend(REDIS_CHANNEL_KEY, syncDataDTO);
        } catch (Exception e) {
           log.error("redis publish channel 【" + REDIS_CHANNEL_KEY + "】 fail,cause:" + e.getMessage(),e);
        }
    }


    @Override
    public void run(String... args) throws Exception {
        doSubscribe();
    }

    @SneakyThrows
    private void doSubscribe() {
        RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
        RedisMessageListener redisMessageListener = applicationContext.getBean(RedisMessageListener.class);
        connection.subscribe(redisMessageListener,REDIS_CHANNEL_KEY.getBytes("utf-8"));
        log.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Register listen channel : 【{}】",REDIS_CHANNEL_KEY);
    }
}

具体redis订阅监听实现

@RequiredArgsConstructor
@Slf4j
public class RedisMessageListener implements MessageListener{

    private final BaseDataSyncTrigger baseDataSyncTrigger;

    private final RedisTemplate redisTemplate;


    @Override
    public void onMessage(Message message, byte[] pattern) {
        byte[] body = message.getBody();
        String dataJson = StrUtil.str(body, "utf-8");
        if(JSONUtil.isJson(dataJson)){
            try {
                SyncDataDTO dataDTO = (SyncDataDTO) redisTemplate.getHashValueSerializer().deserialize(body);
                baseDataSyncTrigger.callBack(dataDTO.getData());
            } catch (Exception e) {
                log.error(e.getMessage(),e);
            }
        }else{
            log.warn(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Data 【{}】 is not match json format !!!",dataJson);
        }

    }
}

6、测试验证

a、编写业务逻辑类

@Service
@RequiredArgsConstructor
@Slf4j
public class DataService {

    private List<Object> dataList = new CopyOnWriteArrayList<>();

    private final RedisTemplate redisTemplate;

    private final BaseDataSyncTrigger dataSyncTrigger;

    public boolean add(String data){
        try {
            Long count = redisTemplate.opsForList().leftPush(RedisConstant.REDIS_LIST_KEY, data);
            if(count > 0){
                dataSyncTrigger.broadcast(data);
                return true;
            }
        } catch (Exception e) {
           log.error("add fail:" + e.getMessage(),e);
        }

        return false;

    }

    public List<Object> getDataList(){
        return dataList;
    }
}

b、编写业务控制器

@RestController
@RequestMapping("data")
@RequiredArgsConstructor
public class DataController {
    


    private final DataService dataService;

    @GetMapping("add/{data}")
    public String syncData(@PathVariable("data") String data){
        boolean isSuccess = dataService.add(data);
        return isSuccess ? "success" : "fail";
    }

    @GetMapping("list")
    public List<Object> listData(){
        return dataService.getDataList();
    }
}

c、编写业务回调类

@Component
@RequiredArgsConstructor
@Slf4j
public class LocalListDataSyncTriggerCallBack implements DataSyncTriggerCallBack {

    private final DataService dataService;

    @Override
    public void execute(Object data) {
        dataService.getDataList().add(data);
        log.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Sync data:-->{}",data);
    }
}

d、小细节

注: 当项目重启时,本地存储容器是没内容的,因此需要在项目重启时,写一个钩子,从其他缓存介质将数据刷到本地存储中

@Component
@RequiredArgsConstructor
@Slf4j
public class DataInitTask implements CommandLineRunner {

    private final RedisTemplate redisTemplate;

    private final DataService dataService;


    @Override
    public void run(String... args) throws Exception {
        List redisDataList = redisTemplate.opsForList().range(RedisConstant.REDIS_LIST_KEY, 0, -1);
        if(CollectionUtil.isNotEmpty(redisDataList)){
            dataService.getDataList().addAll(redisDataList);
            log.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Loaded data from redis finished!!!");
        }

    }


}

e、测试

从一个节点(示例:54860端口)添加数据,如图

5783665f1a3eef84b3d9eabf1f73c856_3658dff9dddb0888e61e6158274518dc.png

观察其他节点(示例:59829端口)本地存储是否接收到数据

391f57e9aea4f9e7435fd38b1925f1e4_54e76766e4aaacdf8e58feff6e326cb5.png

从图可以发现已经收到数据,同时我们观察控制台


1a3e0654bc8b83edd298171005098697_09d9e70be00a18a2924e634ce61b382e.png

可以看出业务回调已经触发

总结

本文介绍了通过redis pubsub实现广播效果,示例代码中也提供基于注册中心以及配置中心apollo来实现广播的效果。基于篇幅就不再论述了,感兴趣的朋友,可以查看下方demo链接。本文除了介绍多个节点实例数据同步如何触发之外,其实还有实现一个通用组件套路原则--依赖倒置原则。高层定义抽象,程序依赖高层抽象,也不依赖具体实现,这样后续才比较好扩展

demo链接

https://github.com/lyb-geek/springboot-learning/tree/master/springboot-localdata-sync

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,525评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,203评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,862评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,728评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,743评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,590评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,330评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,244评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,693评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,885评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,001评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,723评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,343评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,919评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,042评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,191评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,955评论 2 355

推荐阅读更多精彩内容