基本功能
发送普通消息
- 发布者在xml中配置group和EVENTCODE
<sofa:publisher id="uniformEventPublisher" group="P_tutorial_geling">
<sofa:channels>
<sofa:channel value="TP_DEFAULT_UNIFORM_EVENT"/>
</sofa:channels>
<sofa:binding.msg_broker/>
</sofa:publisher>
- 发布消息服务实现三步走
- 创建消息对象UniformEvent:由uniformEventBuilder负责创建,设置topic和eventcode两个属性值
- 设置消息对象属性值:setEventPayload
- 发送消息:通过uniformEventPublisher的publisherUniformEvent 方法完成
// 发布消息,消息类型由topic和eventcode指定,业务对象作为消息负载。
public boolean publicUniformEvent(String topic, String eventcode, Object payload) {
// 创建消息,第一个参数是topic, 第二个参数是eventcode
final UniformEvent uniformEvent = uniformEventBuilder.buildUniformEvent(topic, eventcode);
// 设置消息负载,一般为业务对象
uniformEvent.setEventPayload(payload);
// 设置发消息失败抛出运行时异常
uniformEvent.setThrowExceptionOnFailed(true);
try {
// 发布消息
uniformEventPublisher.publishUniformEvent(uniformEvent);
logger.info("[Public an uniformEvent, success] topic {} eventcode {} eventId {}", new Object[] { topic, eventcode, uniformEvent.getId() });
} catch (Exception e) {
logger.error("[Public an uniformEvent, failure] topic {} eventcode {} eventId {} error {}", new Object[] { topic, eventcode, uniformEvent.getId(), e.getMessage() });
return false;
}
return true;
}
发送事务消息
- 发布者在xml中配置,需设置tx-callback属性txCallbackListener作为事务型消息回查接口。
<sofa:publisher id="uniformEventPublisher" group="P_appname_service">
<sofa:channels tx-callback="txCallbackListener">
<sofa:channel value="TP_DEFAULT"/>
</sofa:channels>
<sofa:binding.msg_broker/>
</sofa:publisher>
<bean id="txCallbackListener" class="com.alipay.example.TxCallbackListener" />
- 当消息代理组件(Msgbroker)无法确定事务型消息是处于提交状态还是回滚状态时会主动调用消息发布者实现的事务型消息回查接口,这种回查属于异常场景,一般不会触发,TxCallbackListener实现如下:
public class TxCallbackListener implements UniformEventTxSynMessageListener {
@Override
public void onUniformEventTxSynchronized(UniformEvent message, UniformEventContext uContext) {
try {
if (!txMessageCheck(message)) {
uContext.setRollbackOnly(); // 设置状态为回滚
}
} catch (Exception e) {
throw e; // 抛出异常
}
}
private boolean txMessageCheck(UniformEvent message) {
return false;
}
}
- 发送事务型消息服务实现和普通消息服务有两点不同:
- 第一点是必须设置 uniformEvent.setTransactional(true)
- 第二点是必须在 Spring 事务模板中发送消息
// 发布事务型消息,消息类型由topic/eventcode指定,业务对象作为消息payload。
public boolean publicTransactionUniformEvent(String topic, String eventcode, Object payload) {
// 创建消息,第一个参数是topic, 第二个参数是eventcode
final UniformEvent uniformEvent = uniformEventBuilder.buildUniformEvent(topic, eventcode);
// 设置消息负载,一般为业务对象
uniformEvent.setEventPayload(payload);
// 第一点:transactional为true代表事务型消息
uniformEvent.setTransactional(true);
// 第二点:在spring事务模板中发送消息
transactionTemplate.execute(new TransactionCallback() {
@Override
public Object doInTransaction(TransactionStatus status) {
try {
// 发布消息,与发布普通消息一样是通过publishUniformEvent方法发送
uniformEventPublisher.publishUniformEvent(uniformEvent);
} catch (Exception e) {
// 事务型消息状态与本地事务一同回滚
status.setRollbackOnly();
}
return null;
}
});
return true;
}
订阅消息
- 消费者配置xml文件,sofa:consumer元素group属性值是订阅者group, sofa:listener元素配置了消息接收Handler,sofa:channel元素value属性值是需要订阅的消息类型TOPIC值。
<sofa:consumer id="uniformEventSubscriber" group="S_tutorial_geling">
<sofa:listener ref="uniformEventMessageListener"/>
<sofa:channels>
<sofa:channel value="TP_DEFAULT_UNIFORM_EVENT">
<sofa:event eventType="direct" eventCode="EC_TUTORIAL_TOM" persistence="false"/>
</sofa:channel>
</sofa:channels>
<sofa:binding.msg_broker/>
</sofa:consumer>
- 消息监听器UniformEventMessageListenerImpl实现,需要实现UniformEventMessageListener接口。消费者受到消息之后有三种处理手段。
- 第一种:消息消费正常。
- 第二种:消息无法消费,主动回滚并设置回滚原因。消息会被重新投递。
- 第三种:消息消费异常,抛出未捕获异常。消息会被重新投递。
public class UniformEventMessageListenerImpl implements UniformEventMessageListener {
@Override
public void onUniformEvent(UniformEvent message, UniformEventContext context) throws Exception {
// 获取topic,eventcode和payload
final String topic = message.getTopic();
final String eventcode = message.getEventCode();
final Object payload = message.getEventPayload();
try {
boolean processSuccess = processMessage(message);
if (!processSuccess) {
// 处理消息失败,设置回滚原因并主动回滚,消息会被重新投递。
context.setContextDesc("process error cause");
context.setRollbackOnly();
}
} catch (Exception e) {
logger.error("[Process a message, failure] topic {} eventcode {} eventId {} error {}",
new Object[] { topic, eventcode, message.getId(), e.getMessage() });
// 处理消息异常,抛出异常,消息会被重新投递。
throw e;
}
}
// 处理接收到的消息
private boolean processMessage(UniformEvent message) {
return true;
}
}
源码分析
发送消息
从以上发送消息的case中可以看到发送消息的入口是UniformEventPublisherAdapter的publishUniformEvent方法
package com.alipay.common.event.tbnotify.adapter;
public class UniformEventPublisherAdapter extends EventSendFacadeTBNotifyImpl implements UniformEventPublisher {
public void publishUniformEvent(UniformEvent event) {
UniformEventHelper.setLocalTxMode(event, false);
super.sendEventWithoutAdditionalInfo(event);
}
}
可以看到它调用了父类EventSendFacadeTBNotifyImpl的sendEventWithoutAdditionalInfo方法,首先会通过isTransactional来判断是否是事务型消息,事务型消息调用父类NotifyManagerBeanEx的sendMessageTx发送,普通消息调用sendMessage发送
package com.alipay.common.event.tbnotify.adapter;
public class EventSendFacadeTBNotifyImpl extends NotifyManagerBeanEx implements EventSendFacade, BeanFactoryAware {
protected void sendEventWithoutAdditionalInfo(UniformEvent event) {
MsgTracer msgTracer = TracerFactory.getMsgTracer();
SendResult sendResult = null;
MsgBrokerContext context = null;
try {
if (event.isTransactional() && checkMessageListener == null)
throw new IllegalArgumentException("CheckMessageListener cannot be null when sending the transaction uniform event, [" + event + "]");
//把传递的event(topic+eventcode)封装成消息
Message message = convertToMessage(event);
if (logger.isDebugEnabled()) {
logger.debug("Send an uniform event, topic [" + event.getTopic() + "] eventcode [" + event.getEventCode() + "]");
}
//把发布事件记录进tracer组件
handleTraceContextOnPublisherMessageSend(event, message);
context = constructMsgContext(event, message);
msgTracer.publisherSend();
handleMsgContextOnPublisherMessageSend(context);
int sendTimes = 0;
do {
try {
if (sendTimes > 0) {
// 日志处理
}
//通过isTransactional判断是否是事务型消息,是的话调用父类的sendMessageTx,不是就调用sendMessage
sendResult = event.isTransactional() ? super.sendMessageTx(message) : super
.sendMessage(message);
} finally {
sendTimes++;
}
} while (sendTimes <= this.tryTimes && !sendResult.isSuccess());
if (!sendResult.isSuccess()) {
throw new RuntimeException(sendResult.getErrorMessage(),
sendResult.getRuntimeException());
}
handleTraceContextOnPublisherReceive(MsgTracer.MSG_RESULT_SUCCESS, sendResult);
handleMsgContextOnPublisherReceive(context);
} catch (RuntimeException e) {
...
}
}
}
普通消息
首先来看发送普通消息的NotifyManagerBeanEx.sendMessage方法,首先会判断当前线程是否存在事务,存在的话注册到事务同步管理器TransactionSynchronizationManager中,不存在的话就调用sendMessageNoTx发送普通消息
package com.alipay.common.event.tbnotify.client;
public class NotifyManagerBeanEx extends NotifyManagerBean implements InitializingBean {
@Override
public SendResult sendMessage(Message message) {
// 判断当前线程中是否含有事务
if (TransactionSynchronizationManager.isActualTransactionActive()) {
//如果存在事务,把消息message封装成事务注册到事务同步器中,并将事务状态发送给broker
TransactionSynchronizationManager
.registerSynchronization(new TBNotifyTransactionSynchronization(message, null));
SendResult result = new SendResult();
result.setSuccess(true);
return result;
} else {
//如果没有事务发送普通消息
return sendMessageNoTx(message);
}
}
}
再来看sendMessageNoTx方法,调用了父类NotifyManagerBean的sendMessage方法发送普通消息。
package com.alipay.common.event.tbnotify.client;
public class NotifyManagerBeanEx extends NotifyManagerBean implements InitializingBean {
public SendResult sendMessageNoTx(Message message) {
SendResult result = null;
try {
return result = super.sendMessage(message);
} finally {
...
}
}
}
普通消息的发送,共有一下几种结果
不在本地事务的情况下:
- 发送成功,消息中心开始投递消息
- 发送失败,publishUniformEvent方法抛出异常,消息中心没有收到消息
- 发送超时,publishUniformEvent方法抛出异常,消息中心没有收到消息
- 发送超时,publishUniformEvent方法抛出异常,消息中心收到消息,开始投递消息
在本地事务的情况下,共有以下几种结果
- 本地事务提交失败,事务回滚,业务系统放弃消息的发送.(业务系统从头到尾都没和消息中心产生通讯)
- 本地事务提交成功,发送消息成功,消息中心收到消息,开始投递消息
- 本地事务提交成功,发送消息失败(被try-catch住).消息中心没有收到消息
- 本地事务提交成功,发送消息超时(被try-catch住).消息中心没有收到消息
- 本地事务提交成功,发送消息超时(被try-catch住).消息中心收到消息,开始投递消息
事务消息
发送事务型消息可以分为两个阶段:
- 第一个阶段:当发送端运行到消息发送这行代码的时候,还是和普通消息一样,将消息发送给Broker,Broker会将该条记录保存在数据库中,并将其事务状态设置为未知状态。 入库操作完成后,Broker会向发送端返回一个消息确认的信息,此时一阶段结束。
- 第二个阶段:发送端的代码包在事务模板中,当这个事务完结的时候,发送端会将本地事务的执行结果(提交/回滚)发送给Broker,Broker对结果做出判断。
第一阶段
首先来看发送事务消息的第一阶段,NotifyManagerBeanEx.sendMessageTx方法首先会检查事务是否存在,存在就发送事务型消息,不存在就调用sendMessageNoTx发送普通消息。
通过taobao Notify的sendMessage()方法将该条消息发送给Broker并同步等待发送是否成功。如果发送成功,则会在Spring 事务模板中注册一个事务同步器,第二阶段的逻辑将会交给这个事务同步器,它会将本地事务的执行结果发送给Broker。
public class NotifyManagerBeanEx extends NotifyManagerBean implements InitializingBean {
/**
* Send the transaction message.
*
* If the transaction context is not found, send the normal message.
*
* @param message - the message to send.
* @return sendResult
*/
public SendResult sendMessageTx(Message message) {
// 检查确认该方法包在一个spring事务模板中
if (TransactionSynchronizationManager.isActualTransactionActive()) {
InnerSendResult inResult = null;
try {
// MsgBroker首先把消息发送到broker,状态是”未提交“,起到数据落地
inResult = notifyManager.sendMessage(message, false);
if (inResult.isSuccess()) {
// 在Spring事务模板中注册事务同步器,当该Spring事务完结后会将结果发送给Broker
// 第一阶段在此完结,第二阶段的任务将交给这个同步器处理
TransactionSynchronizationManager
.registerSynchronization(new TBNotifyTransactionSynchronization(message,
inResult));
}
// 直接返回结果
return genInnerSendResult(inResult);
} finally {
// 日志处理
}
} else {
// 发送普通消息
return sendMessageNoTx(message);
}
}
}
第二阶段
再来看发送事务消息的第二阶段,主要是在事务同步器中,将本地事务的执行结果发送给Broker。该实现主要在类NotifyManagerBeanEx中,由afterCompletion方法负责执行。同步器会将本地事务的执行结果记录在commited字段当中,并通过sendOneWay调用发送给Broker。
public class NotifyManagerBeanEx extends NotifyManagerBean implements InitializingBean {
private final class TBNotifyTransactionSynchronization extends
TransactionSynchronizationAdapter {
public void afterCompletion(int status) {
try {
// 日志处理
}
if (inResult == null) {
// here is not a tow-phase message
} else {
byte[] serverData = inResult.getServerData(0);
boolean committed = false;
if (status == STATUS_COMMITTED && inResult.isSuccess()) {
// 如果本地事务为 提交
// 则将committed设为true
committed = true;
} else if (status == STATUS_ROLLED_BACK) {
logger.warn("MsgBroker Client makes the message rollback, msgId [" + UniqId.getInstance().bytes2string(message.getMessageId()) + "]");
// 如果本地事务为 回滚
// 则将committed设为false
committed = false;
inResult.setErrorMessage("MsgBroker Client makes the message rollback");
inResult.setSuccess(false);
} else {
// STATUS_UNKNOWN
}
// 将本地事务结果通过oneway调用发送给Broker
inResult.getRemotingClient().sendOneWay(message.getMessageId(), serverData, committed, -1L);
}
} catch (Exception e) {
//
}
}
}
回查阶段
在回查阶段,Broker会调用UniformEventTxSynMessageListener接口的onUniformEventTxSynchronized()方法。
public interface UniformEventTxSynMessageListener {
void onUniformEventTxSynchronized(UniformEvent uniformEvent,
UniformEventContext uniformEventContext);
}
事务型消息发送结果工有以下两种
- 本地事务失败,消费者不会收到消息
- 本地事务成功,消费者收到消息
订阅消息
从以上case中可以看到订阅消息是通过实现UniformEventMessageListener接口的onUniformEvent方法中实现。
知识点回顾
事务同步器
TransactionSynchronizationManager是spring中事务管理同步器
使用ThreadLocal把当前线程和一个事务绑定起来,用于应用扩展同步器的使用,在事务开启,挂起,提交等点上回调应用
- TransactionSynchronizationManager.isActualTransactionActive
可以判断当前线程(当前执行的代码段)是否在一个事务,原理是通过ThreadLocal变量。我们在发起一个事务型消息的时候会将isActualTransactionActive设为true
private static final ThreadLocal<Boolean> actualTransactionActive =
new NamedThreadLocal<Boolean>("Actual transaction active");
public static boolean isActualTransactionActive() {
return (actualTransactionActive.get() != null);
}
- TransactionSynchronizationManager.registerSynchronization。为当前线程注册一个事务回调。
public static void registerSynchronization(TransactionSynchronization synchronization)
throws IllegalStateException {
Assert.notNull(synchronization, "TransactionSynchronization must not be null");
if (!isSynchronizationActive()) {
throw new IllegalStateException("Transaction synchronization is not active");
}
synchronizations.get().add(synchronization);
}
总结
- MsgBroker整体上采用二阶段的思路实现了事务消息的发送。
- MsgBroker的事务由Spring事务模板来支持,通过在Spring事务中注册的同步器将本地事务结果发送给Broker。
- MsgBroker通过回查方法来应对Broker无法确定本地事务执行结果的异常情况,如果没有该机制,将会导致大量的数据积压情况的出现。
- MsgBroker有较多的功能需要依赖taobao Notify来完成。