源码版本 4.6.0
先看一个简单消息发送的例子:
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
for (int i = 0; i < 128; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
在进行消息发送,即producer.send(msg)之前,需要启动Producer,可以猜想下在启动Prodducer中完成了消息发送端初始化操作,本文就是对初始化进行分析。
org.apache.rocketmq.client.producer.DefaultMQProducer#start
public void start() throws MQClientException {
// 设置生产者组
this.setProducerGroup(withNamespace(this.producerGroup));
// 核心初始化方法
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
// 消息轨迹相关
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 检查生产者组名是否符合规范,不为空且不为默认组名 DEFAULT_PRODUCER
this.checkConfig();
// 更换生产者实例名称,这个待①说明
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// 获取MQ客户端工厂,注意这个MQClientManager是单例模式的,②补充
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 注册该生产者,③处说明
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 如果注册成功,则加入自动创建主题的内置Topic
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
// 启动客户端 ④处补充
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
// 开始发送心跳 ⑤
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 扫描过期请求 ⑥
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
RequestFutureTable.scanExpiredRequest();
} catch (Throwable e) {
log.error("scan RequestFutureTable exception", e);
}
}
}, 1000 * 3, 1000);
}
①处说明:
public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = String.valueOf(UtilAll.getPid());
}
}
在未设置系统参数rocketmq.client.name的时候,默认instanceName为DEFAULT,如果未进行设置,则设置为进程ID,即启动JVM进程的ID
为啥这么干?我的思考点主要是以下两点:
- 保证同一个JVM中,获取的mQClientFactory只有一份,获取mQClientFactory的参数是以Instance拼接的字符串,如果Instance保持一致,就可保证在同一个JVM中,只会创建一个客户端工厂。这个有什么好处,首先mQClientFactory中包含了网络组件,定时任务组件,消息拉取组件等,如果都是依据创建一个实例就获取一个新的实例工厂,那么在JVM中可能存在多套相同的功能组件,这样即造成了资源浪费,也可能使得一些内部任务执行错乱。
- 不同JVM中的生产者实例能区别开
②处说明:
首先MQClientManager是单例的,也就是一个JVM中只会存在一个实例,接着看getOrCreateMQClientInstance方法,首先构建实例ID:
String clientId = clientConfig.buildMQClientId();
|
|
v
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
// 获取客户端IP
sb.append(this.getClientIP());
sb.append("@");
// 拼装InstanceName,一般情况下就是进程ID
sb.append(this.getInstanceName());
// 设置unitName 一般为空,可在Producer上设置
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}
总的来说clientId = IP + @ + instanceName + unitName,接着就拿这个clientId去缓存中寻找,如果没有,就进行创建。
主要实例化的组件包含这几个:
- mQClientAPIImpl (Netty通讯组件)
- pullMessageService (消息拉取组件)
- rebalanceService (重平衡组件)
- consumerStatsManager (消费信息统计组件)
顺带说一句,DefaultMQProducer和DefaultMQProducerImpl的关系,可以这么理解,两者之间互相包含,DefaultMQProducer继承了ClientConfig,更相当于一个实例自定义配置类的角色,DefaultMQProducerImpl实现MQProducerInner,消息发送主要逻辑是在这里面完成的。
③处说明:
注册该主题及对应的生产者实例,也就是在Map中放入该数据,即:
MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
if (prev != null) {
log.warn("the producer group[{}] exist already.", group);
return false;
}
注意这里用的是putIfAbsent,如果生产者重复启动,或者组名相同的生产者启动,都会注册失败,触发警告,并启动失败。
抛出的异常:
new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
④处说明:
获取的客户端实例启动,这个是真正的启动工作线程
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
// 如果Producer未设置nameServer地址,则进行远端拉取
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
// 通讯组件启动
this.mQClientAPIImpl.start();
// Start various schedule tasks
// 定时任务启动
this.startScheduledTask();
// Start pull service
// 拉取线程启动
this.pullMessageService.start();
// Start rebalance service
// 重平衡启动
this.rebalanceService.start();
// Start push service
// 启动生产客户端
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
4.1 注意当Producer为配置NameServer地址的时候,则进行远端拉取,这个作用相当大,这个就让线上环境动态对NameServer扩容,迁移成为可能
public String fetchNameServerAddr() {
try {
String addrs = this.topAddressing.fetchNSAddr();
if (addrs != null) {
if (!addrs.equals(this.nameSrvAddr)) {
log.info("name server address changed, old=" + this.nameSrvAddr + ", new=" + addrs);
this.updateNameServerAddressList(addrs);
this.nameSrvAddr = addrs;
return nameSrvAddr;
}
}
} catch (Exception e) {
log.error("fetchNameServerAddr Exception", e);
}
return nameSrvAddr;
}
根据设置的NameServer路由拉取地址进行拉取,地址拼接如下:
public static String getWSAddr() {
String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
String wsDomainSubgroup = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
String wsAddr = "http://" + wsDomainName + ":8080/rocketmq/" + wsDomainSubgroup;
if (wsDomainName.indexOf(":") > 0) {
wsAddr = "http://" + wsDomainName + "/rocketmq/" + wsDomainSubgroup;
}
return wsAddr;
}
4.2 定时任务启动
private void startScheduledTask() {
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 拉取NameSever地址 MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 更新Topic路由地址
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 清除下线Broker,发送心跳
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 定时持久化消费进度,对于广播模式很重要
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 调整线程池 空实现,没啥用
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}
总结下:
- 拉取NameServer地址,延时10S,频率2min
- 更新主题路由信息,延时10ms,频率30S
- 向Broker发送心跳,延时1S,频率30S
- 消费进度持久化,延时1S,频率5S
- 动态调整线程池,不起作用
值得注意的是,定时任务线程池是单线程无界队列类型的,且用的FixedRate模式,实际的执行频率可能不是准确的,有兴趣可以看下ScheduledExecutorService源码
4.3 其余的组件启动和消费相关,这里先不深入了
⑤处说明:
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
启动成功后开始发送心跳,心跳发送的过程是持有锁的,个人感觉主要是避免心跳混乱,特殊用途暂时没联想到。
心跳发送主代码:
private void sendHeartbeatToAllBroker() {
// 准备心跳发送包,主要是消费订阅配置和生产者配置等信息,这个后续再详细讨论
final HeartbeatData heartbeatData = this.prepareHeartbeatData();
final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
if (producerEmpty && consumerEmpty) {
log.warn("sending heartbeat, but no consumer and no producer");
return;
}
// 获取所有的Broker地址
if (!this.brokerAddrTable.isEmpty()) {
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, HashMap<Long, String>> entry = it.next();
String brokerName = entry.getKey();
HashMap<Long, String> oneTable = entry.getValue();
if (oneTable != null) {
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
Long id = entry1.getKey();
String addr = entry1.getValue();
if (addr != null) {
if (consumerEmpty) {
if (id != MixAll.MASTER_ID)
continue;
}
try {
// 发送心跳,超时3S
int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
}
// 更新版本号 this.brokerVersionTable.get(brokerName).put(addr, version);
if (times % 20 == 0) {
log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
log.info(heartbeatData.toString());
}
} catch (Exception e) {
if (this.isBrokerInNameServer(addr)) {
log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
} else {
log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
id, addr, e);
}
}
}
}
}
}
}
}
这块值得注意的是:
- 如果当前JVM中只有生产者实例,那么只向主节点发送心跳。
- 如果当前JVM即存在生产者又存在消费者,那么就向所有节点发送心跳。这个和消息发送逻辑,消息消费逻辑有关,后期再谈。
从这也可以学到,只要涉及到网络请求,请加上超时,为了你的服务稳定!
⑤处说明:
移除过期请求,这个requestFutureTable的填充涉及的API:
org.apache.rocketmq.client.producer.DefaultMQProducer#request(org.apache.rocketmq.common.message.Message, long)
从方法说明上看是发送消息,在等到该消息消费后再返回,提供异步和同步模式的API,改API在生产上没实际用过,关于这个就不过多讲解了。
最后提供一个总图: