MsgBroker源码学习

基本功能

发送普通消息

  1. 发布者在xml中配置group和EVENTCODE
<sofa:publisher id="uniformEventPublisher" group="P_tutorial_geling">
    <sofa:channels>
        <sofa:channel value="TP_DEFAULT_UNIFORM_EVENT"/>
    </sofa:channels>
    <sofa:binding.msg_broker/>
</sofa:publisher>
  1. 发布消息服务实现三步走
  • 创建消息对象UniformEvent:由uniformEventBuilder负责创建,设置topic和eventcode两个属性值
  • 设置消息对象属性值:setEventPayload
  • 发送消息:通过uniformEventPublisher的publisherUniformEvent 方法完成
    // 发布消息,消息类型由topic和eventcode指定,业务对象作为消息负载。
    public boolean publicUniformEvent(String topic, String eventcode, Object payload) {
        // 创建消息,第一个参数是topic, 第二个参数是eventcode
        final UniformEvent uniformEvent = uniformEventBuilder.buildUniformEvent(topic, eventcode);
        // 设置消息负载,一般为业务对象
        uniformEvent.setEventPayload(payload);
        // 设置发消息失败抛出运行时异常
        uniformEvent.setThrowExceptionOnFailed(true);
        try {
            // 发布消息
            uniformEventPublisher.publishUniformEvent(uniformEvent);
            logger.info("[Public an uniformEvent, success] topic {} eventcode {} eventId {}", new Object[] { topic, eventcode, uniformEvent.getId() });
        } catch (Exception e) {
            logger.error("[Public an uniformEvent, failure] topic {} eventcode {} eventId {} error {}", new Object[] { topic, eventcode, uniformEvent.getId(), e.getMessage() });
            return false;
        }
        return true;
    }

发送事务消息

  1. 发布者在xml中配置,需设置tx-callback属性txCallbackListener作为事务型消息回查接口。
<sofa:publisher id="uniformEventPublisher" group="P_appname_service">
  <sofa:channels tx-callback="txCallbackListener">
    <sofa:channel value="TP_DEFAULT"/> 
  </sofa:channels>
  <sofa:binding.msg_broker/>
</sofa:publisher>

<bean id="txCallbackListener" class="com.alipay.example.TxCallbackListener" />
  1. 当消息代理组件(Msgbroker)无法确定事务型消息是处于提交状态还是回滚状态时会主动调用消息发布者实现的事务型消息回查接口,这种回查属于异常场景,一般不会触发,TxCallbackListener实现如下:
public class TxCallbackListener implements UniformEventTxSynMessageListener {
    @Override
    public void onUniformEventTxSynchronized(UniformEvent message, UniformEventContext uContext) {
        try {
            if (!txMessageCheck(message)) {
                uContext.setRollbackOnly(); // 设置状态为回滚
            }
        } catch (Exception e) {
            throw e; // 抛出异常
        }
    }
    private boolean txMessageCheck(UniformEvent message) {
        return false;
    }
}
  1. 发送事务型消息服务实现和普通消息服务有两点不同:
  • 第一点是必须设置 uniformEvent.setTransactional(true)
  • 第二点是必须在 Spring 事务模板中发送消息
// 发布事务型消息,消息类型由topic/eventcode指定,业务对象作为消息payload。
public boolean publicTransactionUniformEvent(String topic, String eventcode, Object payload) {
    // 创建消息,第一个参数是topic, 第二个参数是eventcode
    final UniformEvent uniformEvent = uniformEventBuilder.buildUniformEvent(topic, eventcode);
    // 设置消息负载,一般为业务对象
    uniformEvent.setEventPayload(payload);
    // 第一点:transactional为true代表事务型消息
    uniformEvent.setTransactional(true);
    // 第二点:在spring事务模板中发送消息
    transactionTemplate.execute(new TransactionCallback() {
        @Override
        public Object doInTransaction(TransactionStatus status) {
            try {
                // 发布消息,与发布普通消息一样是通过publishUniformEvent方法发送
                uniformEventPublisher.publishUniformEvent(uniformEvent);
            } catch (Exception e) {
                // 事务型消息状态与本地事务一同回滚
                status.setRollbackOnly();
            }
            return null;
        }
    });
    return true;
}

订阅消息

  1. 消费者配置xml文件,sofa:consumer元素group属性值是订阅者group, sofa:listener元素配置了消息接收Handler,sofa:channel元素value属性值是需要订阅的消息类型TOPIC值。
<sofa:consumer id="uniformEventSubscriber" group="S_tutorial_geling">
    <sofa:listener ref="uniformEventMessageListener"/>
    <sofa:channels>
        <sofa:channel value="TP_DEFAULT_UNIFORM_EVENT">
            <sofa:event eventType="direct" eventCode="EC_TUTORIAL_TOM" persistence="false"/>
        </sofa:channel>
    </sofa:channels>
    <sofa:binding.msg_broker/>
</sofa:consumer>
  1. 消息监听器UniformEventMessageListenerImpl实现,需要实现UniformEventMessageListener接口。消费者受到消息之后有三种处理手段。
  • 第一种:消息消费正常。
  • 第二种:消息无法消费,主动回滚并设置回滚原因。消息会被重新投递。
  • 第三种:消息消费异常,抛出未捕获异常。消息会被重新投递。
public class UniformEventMessageListenerImpl implements UniformEventMessageListener {
    @Override
    public void onUniformEvent(UniformEvent message, UniformEventContext context) throws Exception {
        // 获取topic,eventcode和payload
        final String topic = message.getTopic();
        final String eventcode = message.getEventCode();
        final Object payload = message.getEventPayload();
        try {
            boolean processSuccess = processMessage(message);
            if (!processSuccess) {
                // 处理消息失败,设置回滚原因并主动回滚,消息会被重新投递。
                context.setContextDesc("process error cause");
                context.setRollbackOnly();
            }
        } catch (Exception e) {
            logger.error("[Process a message, failure] topic {} eventcode {} eventId {} error {}",
                new Object[] { topic, eventcode, message.getId(), e.getMessage() });
            // 处理消息异常,抛出异常,消息会被重新投递。
            throw e;
        }
    }
    // 处理接收到的消息
    private boolean processMessage(UniformEvent message) {
        return true;
    }
}

源码分析

发送消息

image

从以上发送消息的case中可以看到发送消息的入口是UniformEventPublisherAdapter的publishUniformEvent方法

package com.alipay.common.event.tbnotify.adapter;
public class UniformEventPublisherAdapter extends EventSendFacadeTBNotifyImpl implements UniformEventPublisher {
    public void publishUniformEvent(UniformEvent event) {
        UniformEventHelper.setLocalTxMode(event, false);
        super.sendEventWithoutAdditionalInfo(event);
    }
}

可以看到它调用了父类EventSendFacadeTBNotifyImpl的sendEventWithoutAdditionalInfo方法,首先会通过isTransactional来判断是否是事务型消息,事务型消息调用父类NotifyManagerBeanEx的sendMessageTx发送,普通消息调用sendMessage发送

package com.alipay.common.event.tbnotify.adapter;
public class EventSendFacadeTBNotifyImpl extends NotifyManagerBeanEx implements EventSendFacade, BeanFactoryAware {
    protected void sendEventWithoutAdditionalInfo(UniformEvent event) {
        MsgTracer msgTracer = TracerFactory.getMsgTracer();
        SendResult sendResult = null;
        MsgBrokerContext context = null;
        try {
            if (event.isTransactional() && checkMessageListener == null)
                throw new IllegalArgumentException("CheckMessageListener cannot be null when sending the transaction uniform event, [" + event + "]");
            //把传递的event(topic+eventcode)封装成消息
            Message message = convertToMessage(event);
            if (logger.isDebugEnabled()) {
                logger.debug("Send an uniform event, topic [" + event.getTopic() + "] eventcode [" + event.getEventCode() + "]");
            }
            //把发布事件记录进tracer组件
            handleTraceContextOnPublisherMessageSend(event, message);
            context = constructMsgContext(event, message);
            msgTracer.publisherSend();
            handleMsgContextOnPublisherMessageSend(context);
            int sendTimes = 0;
            do {
                try {
                    if (sendTimes > 0) {
                        // 日志处理
                    }
                    //通过isTransactional判断是否是事务型消息,是的话调用父类的sendMessageTx,不是就调用sendMessage
                    sendResult = event.isTransactional() ? super.sendMessageTx(message) : super
                        .sendMessage(message);
                } finally {
                    sendTimes++;
                }
            } while (sendTimes <= this.tryTimes && !sendResult.isSuccess());
            if (!sendResult.isSuccess()) {
                throw new RuntimeException(sendResult.getErrorMessage(),
                    sendResult.getRuntimeException());
            }
            handleTraceContextOnPublisherReceive(MsgTracer.MSG_RESULT_SUCCESS, sendResult);
            handleMsgContextOnPublisherReceive(context);
        } catch (RuntimeException e) {
            ...
        }
    }
}

普通消息

首先来看发送普通消息的NotifyManagerBeanEx.sendMessage方法,首先会判断当前线程是否存在事务,存在的话注册到事务同步管理器TransactionSynchronizationManager中,不存在的话就调用sendMessageNoTx发送普通消息

package com.alipay.common.event.tbnotify.client;
public class NotifyManagerBeanEx extends NotifyManagerBean implements InitializingBean {
    @Override
    public SendResult sendMessage(Message message) {
        // 判断当前线程中是否含有事务
        if (TransactionSynchronizationManager.isActualTransactionActive()) {
            //如果存在事务,把消息message封装成事务注册到事务同步器中,并将事务状态发送给broker
            TransactionSynchronizationManager
                .registerSynchronization(new TBNotifyTransactionSynchronization(message, null));
            SendResult result = new SendResult();
            result.setSuccess(true);
            return result;
        } else {
            //如果没有事务发送普通消息
            return sendMessageNoTx(message);
        }
    }
}

再来看sendMessageNoTx方法,调用了父类NotifyManagerBean的sendMessage方法发送普通消息。

package com.alipay.common.event.tbnotify.client;
public class NotifyManagerBeanEx extends NotifyManagerBean implements InitializingBean {
    public SendResult sendMessageNoTx(Message message) {
        SendResult result = null;
        try {
            return result = super.sendMessage(message);
        } finally {
            ...
        }
    }
}

普通消息的发送,共有一下几种结果

不在本地事务的情况下:

  1. 发送成功,消息中心开始投递消息
  2. 发送失败,publishUniformEvent方法抛出异常,消息中心没有收到消息
  3. 发送超时,publishUniformEvent方法抛出异常,消息中心没有收到消息
  4. 发送超时,publishUniformEvent方法抛出异常,消息中心收到消息,开始投递消息

在本地事务的情况下,共有以下几种结果

  1. 本地事务提交失败,事务回滚,业务系统放弃消息的发送.(业务系统从头到尾都没和消息中心产生通讯)
  2. 本地事务提交成功,发送消息成功,消息中心收到消息,开始投递消息
  3. 本地事务提交成功,发送消息失败(被try-catch住).消息中心没有收到消息
  4. 本地事务提交成功,发送消息超时(被try-catch住).消息中心没有收到消息
  5. 本地事务提交成功,发送消息超时(被try-catch住).消息中心收到消息,开始投递消息

事务消息

发送事务型消息可以分为两个阶段:

  • 第一个阶段:当发送端运行到消息发送这行代码的时候,还是和普通消息一样,将消息发送给Broker,Broker会将该条记录保存在数据库中,并将其事务状态设置为未知状态。 入库操作完成后,Broker会向发送端返回一个消息确认的信息,此时一阶段结束。
  • 第二个阶段:发送端的代码包在事务模板中,当这个事务完结的时候,发送端会将本地事务的执行结果(提交/回滚)发送给Broker,Broker对结果做出判断。

第一阶段

首先来看发送事务消息的第一阶段,NotifyManagerBeanEx.sendMessageTx方法首先会检查事务是否存在,存在就发送事务型消息,不存在就调用sendMessageNoTx发送普通消息。
通过taobao Notify的sendMessage()方法将该条消息发送给Broker并同步等待发送是否成功。如果发送成功,则会在Spring 事务模板中注册一个事务同步器,第二阶段的逻辑将会交给这个事务同步器,它会将本地事务的执行结果发送给Broker。

public class NotifyManagerBeanEx extends NotifyManagerBean implements InitializingBean {
    /**
     * Send the transaction message.
     * 
     * If the transaction context is not found, send the normal message.
     * 
     * @param message - the message to send.
     * @return sendResult
     */
    public SendResult sendMessageTx(Message message) {
        // 检查确认该方法包在一个spring事务模板中
        if (TransactionSynchronizationManager.isActualTransactionActive()) {
            InnerSendResult inResult = null;
            try {
                // MsgBroker首先把消息发送到broker,状态是”未提交“,起到数据落地
                inResult = notifyManager.sendMessage(message, false);
                if (inResult.isSuccess()) {
                    // 在Spring事务模板中注册事务同步器,当该Spring事务完结后会将结果发送给Broker
                    // 第一阶段在此完结,第二阶段的任务将交给这个同步器处理
                    TransactionSynchronizationManager
                        .registerSynchronization(new TBNotifyTransactionSynchronization(message,
                            inResult));
                } 
                // 直接返回结果
                return genInnerSendResult(inResult);
            } finally {
                // 日志处理
            }
        } else {
            // 发送普通消息
            return sendMessageNoTx(message);
        }
    }
}

第二阶段

再来看发送事务消息的第二阶段,主要是在事务同步器中,将本地事务的执行结果发送给Broker。该实现主要在类NotifyManagerBeanEx中,由afterCompletion方法负责执行。同步器会将本地事务的执行结果记录在commited字段当中,并通过sendOneWay调用发送给Broker。

public class NotifyManagerBeanEx extends NotifyManagerBean implements InitializingBean {
    private final class TBNotifyTransactionSynchronization extends
                                                          TransactionSynchronizationAdapter {
        public void afterCompletion(int status) {
            try {
                    // 日志处理
                }
                if (inResult == null) {
                    // here is not a tow-phase message
                } else {
                    byte[] serverData = inResult.getServerData(0);
                    boolean committed = false;
                    if (status == STATUS_COMMITTED && inResult.isSuccess()) {
                        // 如果本地事务为 提交
                        // 则将committed设为true
                        committed = true;
                    } else if (status == STATUS_ROLLED_BACK) {
                        logger.warn("MsgBroker Client makes the message rollback, msgId [" + UniqId.getInstance().bytes2string(message.getMessageId()) + "]");
                        // 如果本地事务为 回滚
                        // 则将committed设为false
                        committed = false;
                        inResult.setErrorMessage("MsgBroker Client makes the message rollback");
                        inResult.setSuccess(false);
                    } else {
                        // STATUS_UNKNOWN
                    }
                    // 将本地事务结果通过oneway调用发送给Broker
                    inResult.getRemotingClient().sendOneWay(message.getMessageId(), serverData, committed, -1L);
                }
            } catch (Exception e) {
                // 
            }
        }
    }

回查阶段

在回查阶段,Broker会调用UniformEventTxSynMessageListener接口的onUniformEventTxSynchronized()方法。

public interface UniformEventTxSynMessageListener {
    void onUniformEventTxSynchronized(UniformEvent uniformEvent,
                                      UniformEventContext uniformEventContext);
}

事务型消息发送结果工有以下两种

  1. 本地事务失败,消费者不会收到消息
  2. 本地事务成功,消费者收到消息

订阅消息

从以上case中可以看到订阅消息是通过实现UniformEventMessageListener接口的onUniformEvent方法中实现。

知识点回顾

事务同步器

TransactionSynchronizationManager是spring中事务管理同步器
使用ThreadLocal把当前线程和一个事务绑定起来,用于应用扩展同步器的使用,在事务开启,挂起,提交等点上回调应用

  1. TransactionSynchronizationManager.isActualTransactionActive
    可以判断当前线程(当前执行的代码段)是否在一个事务,原理是通过ThreadLocal变量。我们在发起一个事务型消息的时候会将isActualTransactionActive设为true
private static final ThreadLocal<Boolean> actualTransactionActive =
            new NamedThreadLocal<Boolean>("Actual transaction active");

public static boolean isActualTransactionActive() {
   return (actualTransactionActive.get() != null);
}
  1. TransactionSynchronizationManager.registerSynchronization。为当前线程注册一个事务回调。
public static void registerSynchronization(TransactionSynchronization synchronization)
    throws IllegalStateException {
   Assert.notNull(synchronization, "TransactionSynchronization must not be null");
   if (!isSynchronizationActive()) {
      throw new IllegalStateException("Transaction synchronization is not active");
   }
   synchronizations.get().add(synchronization);
}

总结

  • MsgBroker整体上采用二阶段的思路实现了事务消息的发送。
  • MsgBroker的事务由Spring事务模板来支持,通过在Spring事务中注册的同步器将本地事务结果发送给Broker。
  • MsgBroker通过回查方法来应对Broker无法确定本地事务执行结果的异常情况,如果没有该机制,将会导致大量的数据积压情况的出现。
  • MsgBroker有较多的功能需要依赖taobao Notify来完成。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,029评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,395评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,570评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,535评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,650评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,850评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,006评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,747评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,207评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,536评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,683评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,342评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,964评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,772评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,004评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,401评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,566评论 2 349

推荐阅读更多精彩内容

  • 消息队列设计精要 消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终...
    meng_philip123阅读 1,510评论 1 25
  • “ 消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列...
    落羽成霜丶阅读 3,982评论 1 41
  • 中国式设计与现代设计管理模式 摘要:现代设计管理越来越受到重视,强调在设计部门的管理活动,重在提升设计活动效率,以...
    冬江花月夜阅读 1,461评论 0 9
  • 关于知乎,我接触的并不是特别久,但我知道就我目前所了解到的不过是它的冰山一角而已,创立至今不过六七年,被定位为一个...
    楚囚998阅读 156评论 0 2
  • 1 从未想过自己的生命里会有你 天真无邪 对什么都充满善意 2 我一直不敢爱人 所以也从未得到过爱意 但我却那般大...
    君小白阅读 457评论 6 8