q:为什么主备昨天的统计数据不一致?
A:获取昨天的统计数据是一个定时任务。每天零点执行。两台机器的执行不可能极其一致,总会差一点。
org/apache/rocketmq/broker/BrokerController.java:257
final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis();
final long period = 1000 * 60 * 60 * 24;
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.getBrokerStats().record();
} catch (Throwable e) {
log.error("schedule record error.", e);
}
}
}, initialDelay, period, TimeUnit.MILLISECONDS);
Q,这个图的tps表示什么?和cluster页面的tps为什么对不上?
org/apache/rocketmq/store/stats/BrokerStats.java:40
public void record() {
this.msgPutTotalYesterdayMorning = this.msgPutTotalTodayMorning;
this.msgGetTotalYesterdayMorning = this.msgGetTotalTodayMorning;
this.msgPutTotalTodayMorning =
this.defaultMessageStore.getStoreStatsService().getPutMessageTimesTotal();
this.msgGetTotalTodayMorning =
this.defaultMessageStore.getStoreStatsService().getGetMessageTransferedMsgCount().get();
log.info("yesterday put message total: {}", msgPutTotalTodayMorning - msgPutTotalYesterdayMorning);
log.info("yesterday get message total: {}", msgGetTotalTodayMorning - msgGetTotalYesterdayMorning);
}
./mqadmin brokerStatus -b ip:port
[rocketmq@hrmq01 bin]$ ./mqadmin brokerStatus -b 10.138.225.31:10922
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0
bootTimestamp : 1512464004666
brokerVersion : 232
brokerVersionDesc : V4_1_0_SNAPSHOT
commitLogDirCapacity : Total : 777.7 GiB, Free : 654.3 GiB.
commitLogDiskRatio : 0.15872792096573887
commitLogMaxOffset : 497108611246
commitLogMinOffset : 367219703808
consumeQueueDiskRatio : 0.15872792096573887
dispatchBehindBytes : 0
dispatchMaxBuffer : 0
earliestMessageTimeStamp : 1512159654174
getFoundTps : 416.35836416358364 413.23223064744604 401.3675276346905
getMessageEntireTimeMax : 62
getMissTps : 158.7841215878412 152.12065661194902 143.54373971979516
getTotalTps : 575.1424857514248 565.3528872593951 544.9112673544856
getTransferedTps : 2447.4552544745525 2435.2970585784515 2427.7681769020146
msgGetTotalTodayMorning : 19869118
msgGetTotalTodayNow : 64490048
msgGetTotalYesterdayMorning : 0
msgPutTotalTodayMorning : 19665538
msgPutTotalTodayNow : 63615258
msgPutTotalYesterdayMorning : 0
pageCacheLockTimeMills : 0
pullThreadPoolQueueCapacity : 100000
pullThreadPoolQueueHeadWaitTimeMills: 0
pullThreadPoolQueueSize : 0
putMessageAverageSize : 1723.2784922761768
putMessageDistributeTime : [<=0ms]:21325 [0~10ms]:11891 [10~50ms]:100 [50~100ms]:0 [100~200ms]:0 [200~500ms]:0 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0
putMessageEntireTimeMax : 4136
putMessageSizeTotal : 109626805892
putMessageTimesTotal : 63615258
putTps : 1120.18798120188 1151.6040329972504 1381.3062338662237
remainHowManyDataToFlush : 5.5 KiB
remainTransientStoreBufferNumbs : 2147483647
runtime : [ 0 days, 21 hours, 51 minutes, 38 seconds ]
sendThreadPoolQueueCapacity : 10000
sendThreadPoolQueueHeadWaitTimeMills: 0
sendThreadPoolQueueSize : 0
startAcceptSendRequestTimeStamp : 0
其中这个趋势图,反应的是getTotalTps。
后面的值分别代表 10秒钟,1分钟,10分钟的tps
TotalTps = FoundTps + MissTps
FoundTps和MissTps是由getMessageTimesTotalFound和getMessageTimesTotalMiss计算而来。
getMessageTimesTotalFound和getMessageTimesTotalMiss是原子变量,用来计数,
在com.alibaba.rocketmq.store.DefaultMessageStore#getMessage方法中,找到则Found+1,
否则Miss+1。
StoreStatsService是内部统计服务类 ,这些数值的统计都在这里。
是一个线程类。
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStoped()) {
try {
this.waitForRunning(FrequencyOfSampling);
this.sampling();
this.printTps();
}
catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info(this.getServiceName() + " service end");
}
FrequencyOfSampling值是1000ms。相当于一个定时任务。每秒采样一次。
关于waitForRunning方法的实现,
有兴趣的可以看源码深入研究,不跑题了,看采样方法。
this.getTimesFoundList.add(new CallSnapshot(System.currentTimeMillis(),
this.getMessageTimesTotalFound.get()));
if (this.getTimesFoundList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) {
this.getTimesFoundList.removeFirst();
}
就是将次数和时间戳构造一个对象,加到list中。只存600条(10分钟),超时的,从头删除。
private String getGetFoundTps(int time) {
String result = "";
this.lockSampling.lock();
try {
CallSnapshot last = this.getTimesFoundList.getLast();
if (this.getTimesFoundList.size() > time) {
CallSnapshot lastBefore =
this.getTimesFoundList.get(this.getTimesFoundList.size() - (time + 1));
result += CallSnapshot.getTPS(lastBefore, last);
}
} finally {
this.lockSampling.unlock();
}
return result;
}
也比较容易看懂,就是拿到最新的snapshot,和要查询的时间段之前的那个snapshot做一下差值。
所以,回答下问题,这个趋势图表示pull消息的tps。
cluster页面取的两个值是putTps和getTransferedTps
因为每一次pull都是批量拉取消息。一次成功的pull将FoundTps +1。否则MissTps+1。
而getTransferedTps是每一条消息,都会+1。
代码见com.alibaba.rocketmq.store.DefaultMessageStore#getMessage