在三个项目中用到了ActiveMQ,梳理出一些使用技巧和方法,可能不如直接读源码来的直接,但是希望可以给读者提供一些参考。
2018.1.21 更新一些新的心得,整理排版
2018.9.8 更新ActiveMQ绑定本机IP和端口
1. 系统消息(advisory message)
官方说明:Advisory Message
一个项目是科管系统,需要记录Agent的在线与否,ActiveMQ自带系统消息的功能(一系列前缀为Activemq.Advisory的TOPIC),官方说法,目前已经支持的包括:
消费者,生产者和连接的启停
临时目的地的建立和销毁
TOPIC和QUEUE中的到期消息
Broker发送消息到一个没有消费者的队列上
连接的建立和停止
注意:Advisory的TOPIC只能提供其所属的Broker的信息,因此如果搭建成集群,那么需要订阅集群中所有机器的Advisory Message Topic。
因此,我们在集群上的每一个Broker上加了一个小程序,程序作为消费者订阅ActiveMQ.Advisory.Connection这个TOPIC,获取到Client连接的建立和停止,而后将数据放到MQ的一个队列HeartBeat上。这样消费者就能从集群中的任意一台MQ上获取到与该MQ集群建立连接的所有Client的连接信息了。
此外,Advisory Message Topic在每次有新消费者订阅时,都会给该消费者发送一次全量信息(比如全量连接信息)。利用这个特性,加在Broker上的小程序每隔半小时会进行一次关闭消费者/重建消费者的操作,这样可以重新获取一次全量的连接信息,保证数据的有效性。
2. 连接状态的监听器
在ActiveMQ的使用中,需要对Connection进行监控,在连接异常时可以关闭或重建连接。ActiveMQ提供了TransportListener,在Connection.start()前,可以为Connection添加一个监听器。
新建一个类:
package com.company.ActiveMQ;
import java.io.IOException;
import org.apache.activemq.transport.TransportListener;
public class ActiveMQTransportListener implements TransportListener{
public boolean status = true;
public void onCommand(Object arg0) {}
public void onException(IOException arg0) {}
public void transportInterupted() {
System.out.println("Connection error..");
this.status = false;
}
public void transportResumed() {
System.out.println("Connection resume..");
this.status = true;
}
}
而后在主进程建立连接时添加如下的代码
ActiveMQTransportListener transportListener = new ActiveMQTransportListener();
connection = ((ActiveMQConnection)myConnectionFactory.createConnection());
connection.addTransportListener(transportListener);
需要注意的是,connection必须是ActiveMQConnection才行。Connection或者poolConnection都无法添加该类型的监听器。(但是可以添加Exception listener)
3. 文件消息的传输
AMQ官方在5.14版本后彻底停止FileServer功能,具体请见ActiveMQ fileserver功能取消。FileServer功能的取消意味着生产者无法再往MQ上发文件消息了,但是对于消费者还是可以用BlobMessage功能的。也就是说生产者需要自己实现把文件往一个FileServer(HTTP,FTP均可)上发,然后把地址url放在BlobMessage中发出去。消费者收到后仍然可以拿到文件。
对于MQ来说,文件传输并不是它的本职,但是如果仍然想借用MQ来传输文件该怎么做呢?大文件使用一个HTTP SERVER做中转,小文件转化为BytesMessage发送吧。对于多小的文件可以使用BytesMessage,我在使用中是小于1MB的文件都转化为BytesMessage。
当初选型ActiveMq有一个原因是自带了文件消息的功能,省去了项目中文件传输那一块的设计,但之前上网查资料发现有关ActiveMQ的文件传输功能相关的资料也不多。这里写出一些使用的经验。
3.1 原理
文件消息是BlobMessage,有两种方式传输,一种是直接上传文件,一种是上传一个url。
直接上传文件时,生产者调用api将文件转为流的形式,而后使用http的PUT方式将文件上传到broker的webapps/fileserver目录下。文件名变为messageid,但可以通过改变文件名的方式还原这个文件。而后,生产者将这个文件在broker上的地址变为url,作为BlobMessage的消息体。消费者在接收到这条消息后,直接用msg.getInputStream就可以取出文件流。
上传url只是把上传文件的步骤省略了,其他相同。
3.2 使用F5时生产者需要注意的事项
需要注意的是,在建立连接时,如果使用F5负载均衡的方式,为了防止上传文件时url变成F5的地址(如果变成F5地址,消费者收到消息后,也会访问F5获取文件,有可能因为F5负载到另一台MQ上导致获取消息失败),最好用实地址再指定一下uploadurl,方法为
connection.getBlobTransferPolicy.setDefaultUploadUrl("http://实际BrokerIP:61616/fileserver/");
3.3 清理残留消息
消费者可以在消费后,删除broker上的文件,但是如果是使用发布订阅模式,将一个文件发往多个消费者的情况下,由于无法预测消费者何时消费完,所以是不能在消费者端进行消息清理的。
此外,消费者delete文件这个功能实际是调用fileserver的功能,实测不太好用(常常返回404错误),所以建议用crontab定时清理fileserver目录下的残留文件消息。这里附上crontab的配置(清理一天前的消息):
0 * * * * find /opt/activemq/apache-activemq-5.13.4/webapps/fileserver -type f -mtime +1 -name "ID_*" -exec rm -f {} \; >/dev/null 2&1
此外,同一个producer依次发送两条文件消息可能导致文件发送失败,因为在源码的DefaultBlobUploadStrategy类里,在发送后没有关闭connection的outputstream流,我也是醉了。
4. 在线修改配置文件
之前以为activemq.xml的修改必然需要重启一下broker才能生效。这就给动态扩容,在线修改用户名和密码等功能带来了影响。没想到ActiveMQ是支持在线reload配置文件的。只需要在activemq.xml中插入以下几行。(简书竟然不能拷贝xml)
支持在线修改的部分有
- 集群配置networkConnectors
- 队列配置destinationPolicy
- 用户授权plugins插件中的authorizationPlugin
- destinationInterceptors
相关资料地址http://activemq.apache.org/runtime-configuration.html
5. 集群与消息选择器
大家都知道消息选择器可以为消费者提供一个消息筛选的功能,这个其实在QUEUE和TOPIC中都是可以用的。实际我司也试了一下,发现了一个问题:
如果配置了Network of Broker,在Queue上使用了消息选择器,可能导致消息不可达及Broker上消息堆积。
场景:场景配置了两台MQ,两个消费者各连一台,带上了不同的消息选择器,生产者只连接一台,发送带有不同消息属性的消息。
问题:MQ产生了堆积,堆积的消息不能被本地消费者消费的消息。
推理:集群的转发机制不会考虑消息选择器的问题,对于这个场景下的集群消费负载均衡来说,消息到达Broker后,会被平均分配两份(配置也可配置不同转发机制),但是这个转发不会考虑到消息选择器的问题,所以本该被转发的消息被留下了。
需要使用消息选择器的场景一般是在发布/订阅时,有些消息要被群发,有些消息又要被点对点发送。因此对于这种需要使用消息选择器的场景,一律使用Topic就可以了。
6. 大量Destination下集群转发问题
这个问题我整理在了文章ActiveMQ集群消息转发问题整理中,主要是由于DemandForwardBridge的机制需要使用Advisory Message,而Advisory Message又受到activemq.xml中的destinationPolicy配置的影响,可能导致Broker在启动时无法完全建立DemandForwardBridge导致消息无法在集群中转发的问题。
7. 利用好AMQ插件功能
官方说明:Interceptors
AMQ相比其他消息中间件来说,性能并不占优势,但是在TPS较小的场景下,我仍然认为AMQ可以作为首选。其主要原因是AMQ的功能太全太丰富了。
而AMQ开放了一个插件接口,更是使定制化功能成为了可能。截止目前,我在公司内使用AMQ实现了这些功能:
- MQTT协议消费者配置消息选择器,插件中实现addConsumer方法。
- MQTT协议用户权限认证(不走AMQ自带的权限功能),插件中实现addConnection方法。
- 消息流日志,根据场景定制化日志内容,截取消息中的部分消息属性展现数据流。插件中实现send方法等。
- 权限认证重做,AMQ自带的权限认证和shiro都有较复杂的配置,参考shiro插件直接重写了一个权限认证插件,定时扫描一个新的配置文件,实现黑白名单的功能。
在编写插件的时候,就像是AMQ帮忙实现了消息中间件的基础功能,而我们可以根据具体场景实现更定制化的功能。
8. 集群拓扑设计
我之前也写过一篇文章介绍了集中常见的拓扑ActiveMQ几种集群拓扑架构分析。
完全图拓扑的适用范围最广,但是性能肯定也是最差的。每一个集群的连接都会占用资源,因此建议根据实际情况尽量减少MQ集群的大小。
如果有可能无止尽扩展MQ集群,那最好不要使用完全图拓扑。集线器拓扑目前来看是最合适的。
9. ActiveMQ绑定本机地址和端口
ActiveMQ默认的机制里,BrokerUrl配置服务端的地址,如
tcp://112.32.124.12:61616
这种配置方法是默认配置,如果客户端有多个网卡多个IP,需要指定本机IP和端口来建立通讯该怎么实现呢?
tcp://112.32.124.12:61616/81.31.12.3:2091
使用这种写法,就表示本机配置81.31.12.3这个IP地址,端口使用2091。如果客户端没把握指定的端口是否被占用,可以将端口配置成0,这样就由操作系统自动分配端口了。