RocketMQ数据同步源码分析

数据同步包括两部分数据

  • 元数据信息
  • 真实的消息数据
    元数据信息同步是通过netty实现,消息数据是通过socket实现的,下面通过源码分析一下这两个过程。

元数据同步

可以猜想一下,数据同步是发生在broker主从之间,元数据信息应该在slave broker启动时同步master broker的数据。在BrokerController的initialize()方法中找到了同步的代码。

// 如果角色是slave
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
    if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
        this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
        this.updateMasterHAServerAddrPeriodically = false;
    } else {
        this.updateMasterHAServerAddrPeriodically = true;
    }

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                // 定时任务进行同步元数据信息
                BrokerController.this.slaveSynchronize.syncAll();
            } catch (final Throwable e) {
                log.error("ScheduledTask syncAll slave exception", e);
            }
        }
    }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}

如果是slave broker 会每隔一分钟发起一次同步请求,然后进入slaveSynchronize.syncAll()方法。

public void syncAll() {
    // 同步 topic 配置信息
    this.syncTopicConfig();
    // 同步消费者偏移量
    this.syncConsumerOffset();
    // 同步延迟偏移量
    this.syncDelayOffset();
    // 同步订阅组配置信息
    this.syncSubscriptionGroupConfig();
}

private void syncTopicConfig() {
    final String masterAddrBak = this.masterAddr;
    if (masterAddrBak != null) {
        try {
            // 获取topic 信息
            final TopicConfigSerializeWrapper topicWrapper =
                    this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
            // 如果数据的版本号不匹配,更新版本号和数据
            if (!this.brokerController.getTopicConfigManager().getDataVersion()
                    .equals(topicWrapper.getDataVersion())) {

                this.brokerController.getTopicConfigManager().getDataVersion()
                        .assignNewOne(topicWrapper.getDataVersion());
                this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();
                this.brokerController.getTopicConfigManager().getTopicConfigTable()
                        .putAll(topicWrapper.getTopicConfigTable());
                this.brokerController.getTopicConfigManager().persist();

                log.info("Update slave topic config from master, {}", masterAddrBak);
            }
        } catch (final Exception e) {
            log.error("SyncTopicConfig Exception, {}", masterAddrBak, e);
        }
    }
}

private void syncConsumerOffset() {
    final String masterAddrBak = this.masterAddr;
    if (masterAddrBak != null) {
        try {
            final ConsumerOffsetSerializeWrapper offsetWrapper =
                    this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
            this.brokerController.getConsumerOffsetManager().getOffsetTable()
                    .putAll(offsetWrapper.getOffsetTable());
            this.brokerController.getConsumerOffsetManager().persist();
            log.info("Update slave consumer offset from master, {}", masterAddrBak);
        } catch (final Exception e) {
            log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e);
        }
    }
}

private void syncDelayOffset() {
    final String masterAddrBak = this.masterAddr;
    if (masterAddrBak != null) {
        try {
            final String delayOffset =
                    this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
            if (delayOffset != null) {

                final String fileName =
                        StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController
                                .getMessageStoreConfig().getStorePathRootDir());
                try {
                    MixAll.string2File(delayOffset, fileName);
                } catch (final IOException e) {
                    log.error("Persist file Exception, {}", fileName, e);
                }
            }
            log.info("Update slave delay offset from master, {}", masterAddrBak);
        } catch (final Exception e) {
            log.error("SyncDelayOffset Exception, {}", masterAddrBak, e);
        }
    }
}

private void syncSubscriptionGroupConfig() {
    final String masterAddrBak = this.masterAddr;
    if (masterAddrBak != null) {
        try {
            final SubscriptionGroupWrapper subscriptionWrapper =
                    this.brokerController.getBrokerOuterAPI()
                            .getAllSubscriptionGroupConfig(masterAddrBak);

            // 如果数据的版本号不匹配,更新版本号和数据
            if (!this.brokerController.getSubscriptionGroupManager().getDataVersion()
                    .equals(subscriptionWrapper.getDataVersion())) {
                final SubscriptionGroupManager subscriptionGroupManager =
                        this.brokerController.getSubscriptionGroupManager();
                subscriptionGroupManager.getDataVersion().assignNewOne(
                        subscriptionWrapper.getDataVersion());
                subscriptionGroupManager.getSubscriptionGroupTable().clear();
                subscriptionGroupManager.getSubscriptionGroupTable().putAll(
                        subscriptionWrapper.getSubscriptionGroupTable());
                subscriptionGroupManager.persist();
                log.info("Update slave Subscription Group from master, {}", masterAddrBak);
            }
        } catch (final Exception e) {
            log.error("SyncSubscriptionGroup Exception, {}", masterAddrBak, e);
        }
    }
}

通过代码,可以清晰的看到四种元数据的同步:

  • 同步 topic 配置信息
  • 同步消费者偏移量
  • 同步延迟偏移量
  • 同步订阅组配置信息
    继续跟进代码,进入BrokerOuterAPI类
public TopicConfigSerializeWrapper getAllTopicConfig(
        final String addr) throws RemotingConnectException, RemotingSendRequestException,
        RemotingTimeoutException, InterruptedException, MQBrokerException {
    // 封装cmd request
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);

    RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 3000);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigSerializeWrapper.class);
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark());
}

public ConsumerOffsetSerializeWrapper getAllConsumerOffset(
    final String addr) throws InterruptedException, RemotingTimeoutException,
    RemotingSendRequestException, RemotingConnectException, MQBrokerException {
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return ConsumerOffsetSerializeWrapper.decode(response.getBody(), ConsumerOffsetSerializeWrapper.class);
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark());
}

public String getAllDelayOffset(
    final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
    RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null);
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return new String(response.getBody(), MixAll.DEFAULT_CHARSET);
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark());
}

public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(
    final String addr) throws InterruptedException, RemotingTimeoutException,
    RemotingSendRequestException, RemotingConnectException, MQBrokerException {
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return SubscriptionGroupWrapper.decode(response.getBody(), SubscriptionGroupWrapper.class);
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark());
}

四种元数据的请求是几乎一致,调用NettyRemotingClient的invokeSync()方法,然后再调用NettyRemotingClient的invokeSyncImpl()方法

public RemotingCommand invokeSync(final String addr, final RemotingCommand request, final long timeoutMillis)
            throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
    final long beginStartTime = System.currentTimeMillis();
    // 获取channel
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null && channel.isActive()) {
        try {
            if (this.rpcHook != null) {
                this.rpcHook.doBeforeRequest(addr, request);
            }

            // 计算耗时时间
            final long costTime = System.currentTimeMillis() - beginStartTime;
            // 如果超时时间 < 耗时时间 抛出异常
            if (timeoutMillis < costTime) {
                throw new RemotingTimeoutException("invokeSync call timeout");
            }
            // 调用invokeSyncImpl
            final RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
            if (this.rpcHook != null) {
                this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
            }
            return response;
        } catch (final RemotingSendRequestException e) {
            log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
            this.closeChannel(addr, channel);
            throw e;
        } catch (final RemotingTimeoutException e) {
            if (this.nettyClientConfig.isClientCloseSocketIfTimeout()) {
                this.closeChannel(addr, channel);
                log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
            }
            log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
                                          final long timeoutMillis)
            throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
    final int opaque = request.getOpaque();

    try {
        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
        this.responseTable.put(opaque, responseFuture);
        final SocketAddress addr = channel.remoteAddress();
        // netty channel send request
        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(final ChannelFuture f) throws Exception {
                if (f.isSuccess()) {
                    responseFuture.setSendRequestOK(true);
                    return;
                } else {
                    responseFuture.setSendRequestOK(false);
                }

                NettyRemotingAbstract.this.responseTable.remove(opaque);
                responseFuture.setCause(f.cause());
                responseFuture.putResponse(null);
                log.warn("send a request command to channel <" + addr + "> failed.");
            }
        });

        final RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
        if (null == responseCommand) {
            if (responseFuture.isSendRequestOK()) {
                throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                        responseFuture.getCause());
            } else {
                throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
            }
        }

        return responseCommand;
    } finally {
        this.responseTable.remove(opaque);
    }
}

元数据同步分析完毕,然后分析一下消息的同步。

消息同步

核心类:HAService、HAConnection
Master:
HAService.AcceptSocketService:接收Slave节点的连接
HAConnection.ReadSocketService:读来自Slave节点的数据
HAConnection.WriteSocketService:往Slave节点写数据
Slave:
HAService.HAClient:对Master连接、读写数据

Slave => Master 上报CommitLog已经同步到的offset
Master => Slave 传输新的CommitLog数据

消息同步是基于NIO 完成的,下面简单分析一下源码

public void start() throws Exception {
    // 注册accept事件
    this.acceptSocketService.beginAccept();
    // 启动AcceptSocketService线程
    this.acceptSocketService.start();
    // 启动GroupTransferService线程
    this.groupTransferService.start();
    // 启动HAClient
    this.haClient.start();
}

首先分析一下acceptSocketService.beginAccept()

public void beginAccept() throws Exception {
    this.serverSocketChannel = ServerSocketChannel.open();
    this.selector = RemotingUtil.openSelector();
    this.serverSocketChannel.socket().setReuseAddress(true);
    this.serverSocketChannel.socket().bind(this.socketAddressListen);
    this.serverSocketChannel.configureBlocking(false);
    this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}

可以看到这是一个标准NIO编程写法,注册accept事件。
接下来分析一下HAClient的run() 方法

public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            // 连接master
            if (this.connectMaster()) {

                if (this.isTimeToReportOffset()) {
                    boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                    if (!result) {
                        this.closeMaster();
                    }
                }

                this.selector.select(1000);

                // 处理读时间
                boolean ok = this.processReadEvent();
                if (!ok) {
                    this.closeMaster();
                }

                if (!reportSlaveMaxOffsetPlus()) {
                    continue;
                }

                // 判断上次的时间间隔,如果大于配置值,关闭master
                long interval =
                    HAService.this.getDefaultMessageStore().getSystemClock().now()
                        - this.lastWriteTimestamp;
                if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
                    .getHaHousekeepingInterval()) {
                    log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
                        + "] expired, " + interval);
                    this.closeMaster();
                    log.warn("HAClient, master not response some time, so close connection");
                }
            } else {
                this.waitForRunning(1000 * 5);
            }
        } catch (Exception e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
            this.waitForRunning(1000 * 5);
        }
    }

    log.info(this.getServiceName() + " service end");
}

进入connectMaster()方法

private boolean connectMaster() throws ClosedChannelException {
    if (null == this.socketChannel) {
        final String addr = this.masterAddress.get();
        if (addr != null) {
            // 获取地址
            final SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
            if (socketAddress != null) {
                // 连接
                this.socketChannel = RemotingUtil.connect(socketAddress);
                if (this.socketChannel != null) {
                    //注册读事件
                    this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                }
            }
        }

        this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

        this.lastWriteTimestamp = System.currentTimeMillis();
    }

    return this.socketChannel != null;
}

RemotingUtil.connect()中会完成客户端socket 初始化,连接工作。

public static SocketChannel connect(SocketAddress remote){
    return connect(remote, 1000 * 5);
}

public static SocketChannel connect(SocketAddress remote, final int timeoutMillis) {
    SocketChannel sc = null;
    try {
        sc = SocketChannel.open();
        sc.configureBlocking(true);
        sc.socket().setSoLinger(false, -1);
        sc.socket().setTcpNoDelay(true);
        sc.socket().setReceiveBufferSize(1024 * 64);
        sc.socket().setSendBufferSize(1024 * 64);
        sc.socket().connect(remote, timeoutMillis);
        sc.configureBlocking(false);
        return sc;
    } catch (Exception e) {
        if (sc != null) {
            try {
                sc.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        }
    }

    return null;
}

我们继续分析run()里的processReadEvent()方法

private boolean processReadEvent() {
            int readSizeZeroTimes = 0;
    while (this.byteBufferRead.hasRemaining()) {
        try {
            // readSize > 0 有数据读取,进行数据读取
            final int readSize = this.socketChannel.read(this.byteBufferRead);
            if (readSize > 0) {
                this.lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
                readSizeZeroTimes = 0;
                final boolean result = this.dispatchReadRequest();
                if (!result) {
                    log.error("HAClient, dispatchReadRequest error");
                    return false;
                }
            } else if (readSize == 0) {
                if (++readSizeZeroTimes >= 3) {
                    break;
                }
            } else {
                log.info("HAClient, processReadEvent read socket < 0");
                return false;
            }
        } catch (final IOException e) {
            log.info("HAClient, processReadEvent read socket exception", e);
            return false;
        }
    }

    return true;
}

主要逻辑是通过dispatchReadRequest()方法完成

private boolean dispatchReadRequest() {
    final int msgHeaderSize = 8 + 4; // phyoffset + size
    final int readSocketPos = this.byteBufferRead.position();

    while (true) {
        // begin -> 读取到请求数据
        final int diff = this.byteBufferRead.position() - this.dispatchPostion;
        if (diff >= msgHeaderSize) {
            final long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion);
            final int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8);

            final long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

            if (slavePhyOffset != 0) {
                // 如果slave的同步offset != master的同步offset 返回false
                if (slavePhyOffset != masterPhyOffset) {
                    log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
                            + slavePhyOffset + " MASTER: " + masterPhyOffset);
                    return false;
                }
            }

            if (diff >= (msgHeaderSize + bodySize)) {
                final byte[] bodyData = new byte[bodySize];
                this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize);
                this.byteBufferRead.get(bodyData);

                // 追加数据
                HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);

                this.byteBufferRead.position(readSocketPos);
                this.dispatchPostion += msgHeaderSize + bodySize;

                if (!reportSlaveMaxOffsetPlus()) {
                    return false;
                }

                continue;
            }
        }

        // 如果写满了 重新分配空间
        if (!this.byteBufferRead.hasRemaining()) {
            this.reallocateByteBuffer();
        }

        break;
    }

    return true;
}

在这关键点是比较 slave和master的位置,如果一致,同步消息数据,不一致直接返回false。

消息同步的简单分析就到此结束了。

结尾语

本次主要分析了RocketMQ的数据同步,包括元数据同步和消息同步,希望各位读者有所收获。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,406评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,732评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,711评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,380评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,432评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,301评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,145评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,008评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,443评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,649评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,795评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,501评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,119评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,731评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,865评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,899评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,724评论 2 354

推荐阅读更多精彩内容