回顾RocketMQ物理部署结构
漫谈NameServer
- NameServer相当于配置中心,维护Broker集群、Broker信息、Broker存活信息、主题与队列信息等。
- NameServer彼此之间不通信,每个Broker与集群内所有的Broker保持长连接,与生产者和消费者也保持长连接。
启动过程
- 解析配置文件,填充NameServerConfig、NettyServerConfig属性
- 根据启动属性创建NameserController,并初始化该实例
- 启动服务,注册JVM钩子函数
优雅的停机方式,服务停止前关闭线程池,关闭定时任务,关闭netty服务
NameServer路由注册、故障剔除
路由元信息
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; //@1
private final ReadWriteLock lock = new ReentrantReadWriteLock(); //@2
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; //@3
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; //@4
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; //@5
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; //@6
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; //@7
路由注册
1,Broker发送心跳包
2,NameServer处理心跳包
路由删除
RocketMQ有两个出发点来出发路由删除
1,NameServer定时扫描brokerLiveTable检测上次心跳包与当前系统时间的时间差,如果时间戳大于120s,则需移除该Broker信息。
2,Broker在正常被关闭的情况想,会执行unregisterBroker指令
路由发现
路由发现是非实时的,Topic路由出现变化后,NameServer不主动推送给客户端,而是有客户端定时拉取最新的路由.
漫谈RocketMQ消息发送
RocketMQ支持3种消息发送方式:同步(sync)、异步(async)、单向(oneway)
消息队列如何进行负载?
消息发送如何实现高可用?
生产者启动流程
- 创建MQClientInstance,整个JVM实例中只存在一个MQClientManager。
消息发送基本流程
- 验证消息长度
- 查找主题路由信息
-选择消息队列
private int retryTimesWhenSendFailed = 2;
private int retryTimesWhenSendAsyncFailed = 2;
1.sendLatencyFaultEnable=false,默认不启用Broker故障延迟机制。
2.sendLatencyFaultEnable=true,启用Broker故障延迟机制。
- 消息发送
-
批量消息发送
RocketMQ消息存储
read读文件方式
非直接内存的IO操作都是此种方式
内存映射
存储文件组织与内存映射
MappedFile
public static final int OS_PAGE_SIZE = 1024 * 4;
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
//ADD BY ChenYang
protected final AtomicInteger committedPosition = new AtomicInteger(0);
private final AtomicInteger flushedPosition = new AtomicInteger(0);
protected int fileSize;
protected FileChannel fileChannel;
/**
* Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
*/
protected ByteBuffer writeBuffer = null;
protected TransientStorePool transientStorePool = null;
private String fileName;
private long fileFromOffset;
private File file;
private MappedByteBuffer mappedByteBuffer;
private volatile long storeTimestamp = 0;
private boolean firstCreateInQueue = false;
- MappedFile初始化
根据是否开启transientStorePoolEnable存在两种初始化情况。trasientStorePoolEnable为true表示内容先存储在堆外内存,然后通过Commit线程将数据提交到内存映射Buffer中,再通过Flush线程将内存映射Buffer中的数据持久化到磁盘中。 - MappedFile提交(commit)
如果commitLeastPages为本次提交最小的页数,如果待提交数据不满commitLeastPages,则不执行本次提交操作,待下次提交。
-MappedFile刷盘(flush)
内存中的数据刷写到磁盘,永久存储在磁盘中。
TransientStorePool:短暂的存储池。RocketMQ单独创建一个MappedByteBuffer内存缓存池,用来临时存储数据,数据线写入该内存映射中,然后由commit线程定时将数据从该内存复制到与目的屋里文件对应的内存映射中。RocketMQ引入该机制主要的原因是提供一种内存锁定,将当前堆外内存一直锁定在内存中,避免被进程将内存交换到磁盘。
commit线程和flush线程如何初始化的?
RocketMQ存储文件
未完待续
RocketMQ消息消费
未完待续
消息过滤FilterServer
ClassFilter运行机制
FilterServer在启动时向Broker注册自己,通过心跳维持注册,Broker每隔10s扫描一下该注册表,如果30s内未收到FilterServer的注册信息,将关闭Broker与FilterServer的连接。同时每30s检测当前存活的FilterServer进程的个数,如果当前存活的FilterServer进程个数小于配置的数量,则自动创建一个FilterServer进程。
类过滤模式订阅机制
- 消费者定时将消息端订阅信息中的类过滤模式的过滤类源码上传到FilterServer、
- FilterServer端处理FilterClass上传,使用JDK 提供的方法将源代码编译并加装,然后创建其实例,并强制转换为MessageFilter,也就是自定义的消息过滤类必须实现MessageFilter接口。
RocketMQ主从同步(HA)机制
-
主从同步复制实现原理
- RocketMQ读写分离机制
- RocketMQ根据MessageQueue查找Broker地址的唯一依据是brokerName,它们的brokerName相同,但brokerId不同。
RocketMQ读写分离与其他中间件的实现方式完全不同,RocketMQ是消费者首先向主服务器发起拉取消息请求,然后主服务器返回一批消息,然后会根据主服务器负载压力与主从同步情况,向从服务器建议下次拉取消息是从主服务器还是从从服务器。
实现机制
如果当前主服务器消息存储文件最大偏移量减去此次拉取消息最大偏移量大于RocketMQ常驻内存的大小,表示主服务器繁忙,建议从从服务器拉取。