ActiveMQ使用心得

在三个项目中用到了ActiveMQ,梳理出一些使用技巧和方法,可能不如直接读源码来的直接,但是希望可以给读者提供一些参考。

2018.1.21 更新一些新的心得,整理排版
2018.9.8 更新ActiveMQ绑定本机IP和端口

1. 系统消息(advisory message)

官方说明:Advisory Message

一个项目是科管系统,需要记录Agent的在线与否,ActiveMQ自带系统消息的功能(一系列前缀为Activemq.Advisory的TOPIC),官方说法,目前已经支持的包括:

  • 消费者,生产者和连接的启停

  • 临时目的地的建立和销毁

  • TOPIC和QUEUE中的到期消息

  • Broker发送消息到一个没有消费者的队列上

  • 连接的建立和停止

注意:Advisory的TOPIC只能提供其所属的Broker的信息,因此如果搭建成集群,那么需要订阅集群中所有机器的Advisory Message Topic。

因此,我们在集群上的每一个Broker上加了一个小程序,程序作为消费者订阅ActiveMQ.Advisory.Connection这个TOPIC,获取到Client连接的建立和停止,而后将数据放到MQ的一个队列HeartBeat上。这样消费者就能从集群中的任意一台MQ上获取到与该MQ集群建立连接的所有Client的连接信息了。

此外,Advisory Message Topic在每次有新消费者订阅时,都会给该消费者发送一次全量信息(比如全量连接信息)。利用这个特性,加在Broker上的小程序每隔半小时会进行一次关闭消费者/重建消费者的操作,这样可以重新获取一次全量的连接信息,保证数据的有效性。

2. 连接状态的监听器

在ActiveMQ的使用中,需要对Connection进行监控,在连接异常时可以关闭或重建连接。ActiveMQ提供了TransportListener,在Connection.start()前,可以为Connection添加一个监听器。

新建一个类:

package com.company.ActiveMQ;

import java.io.IOException;

import org.apache.activemq.transport.TransportListener;

public class ActiveMQTransportListener  implements TransportListener{

  public boolean status = true;

  public void onCommand(Object arg0) {}

  public void onException(IOException arg0) {}



  public void transportInterupted() {

 System.out.println("Connection error..");

 this.status = false;

  }



  public void transportResumed()  {

  System.out.println("Connection resume..");

    this.status = true;

  }

}

而后在主进程建立连接时添加如下的代码

ActiveMQTransportListener transportListener = new ActiveMQTransportListener();
connection = ((ActiveMQConnection)myConnectionFactory.createConnection());
connection.addTransportListener(transportListener);

需要注意的是,connection必须是ActiveMQConnection才行。Connection或者poolConnection都无法添加该类型的监听器。(但是可以添加Exception listener)

3. 文件消息的传输

AMQ官方在5.14版本后彻底停止FileServer功能,具体请见ActiveMQ fileserver功能取消。FileServer功能的取消意味着生产者无法再往MQ上发文件消息了,但是对于消费者还是可以用BlobMessage功能的。也就是说生产者需要自己实现把文件往一个FileServer(HTTP,FTP均可)上发,然后把地址url放在BlobMessage中发出去。消费者收到后仍然可以拿到文件。

对于MQ来说,文件传输并不是它的本职,但是如果仍然想借用MQ来传输文件该怎么做呢?大文件使用一个HTTP SERVER做中转,小文件转化为BytesMessage发送吧。对于多小的文件可以使用BytesMessage,我在使用中是小于1MB的文件都转化为BytesMessage。

当初选型ActiveMq有一个原因是自带了文件消息的功能,省去了项目中文件传输那一块的设计,但之前上网查资料发现有关ActiveMQ的文件传输功能相关的资料也不多。这里写出一些使用的经验。

3.1 原理
文件消息是BlobMessage,有两种方式传输,一种是直接上传文件,一种是上传一个url。

直接上传文件时,生产者调用api将文件转为流的形式,而后使用http的PUT方式将文件上传到broker的webapps/fileserver目录下。文件名变为messageid,但可以通过改变文件名的方式还原这个文件。而后,生产者将这个文件在broker上的地址变为url,作为BlobMessage的消息体。消费者在接收到这条消息后,直接用msg.getInputStream就可以取出文件流。

上传url只是把上传文件的步骤省略了,其他相同。

3.2 使用F5时生产者需要注意的事项
需要注意的是,在建立连接时,如果使用F5负载均衡的方式,为了防止上传文件时url变成F5的地址(如果变成F5地址,消费者收到消息后,也会访问F5获取文件,有可能因为F5负载到另一台MQ上导致获取消息失败),最好用实地址再指定一下uploadurl,方法为

connection.getBlobTransferPolicy.setDefaultUploadUrl("http://实际BrokerIP:61616/fileserver/");

3.3 清理残留消息
消费者可以在消费后,删除broker上的文件,但是如果是使用发布订阅模式,将一个文件发往多个消费者的情况下,由于无法预测消费者何时消费完,所以是不能在消费者端进行消息清理的。

此外,消费者delete文件这个功能实际是调用fileserver的功能,实测不太好用(常常返回404错误),所以建议用crontab定时清理fileserver目录下的残留文件消息。这里附上crontab的配置(清理一天前的消息):

0 * * * * find /opt/activemq/apache-activemq-5.13.4/webapps/fileserver -type f -mtime +1 -name "ID_*" -exec rm -f {} \; >/dev/null 2&1

此外,同一个producer依次发送两条文件消息可能导致文件发送失败,因为在源码的DefaultBlobUploadStrategy类里,在发送后没有关闭connection的outputstream流,我也是醉了。

4. 在线修改配置文件

之前以为activemq.xml的修改必然需要重启一下broker才能生效。这就给动态扩容,在线修改用户名和密码等功能带来了影响。没想到ActiveMQ是支持在线reload配置文件的。只需要在activemq.xml中插入以下几行。(简书竟然不能拷贝xml)



支持在线修改的部分有

  • 集群配置networkConnectors
  • 队列配置destinationPolicy
  • 用户授权plugins插件中的authorizationPlugin
  • destinationInterceptors

相关资料地址http://activemq.apache.org/runtime-configuration.html

5. 集群与消息选择器

大家都知道消息选择器可以为消费者提供一个消息筛选的功能,这个其实在QUEUE和TOPIC中都是可以用的。实际我司也试了一下,发现了一个问题:

如果配置了Network of Broker,在Queue上使用了消息选择器,可能导致消息不可达及Broker上消息堆积。

场景:场景配置了两台MQ,两个消费者各连一台,带上了不同的消息选择器,生产者只连接一台,发送带有不同消息属性的消息。

问题:MQ产生了堆积,堆积的消息不能被本地消费者消费的消息。

推理:集群的转发机制不会考虑消息选择器的问题,对于这个场景下的集群消费负载均衡来说,消息到达Broker后,会被平均分配两份(配置也可配置不同转发机制),但是这个转发不会考虑到消息选择器的问题,所以本该被转发的消息被留下了。

需要使用消息选择器的场景一般是在发布/订阅时,有些消息要被群发,有些消息又要被点对点发送。因此对于这种需要使用消息选择器的场景,一律使用Topic就可以了。

6. 大量Destination下集群转发问题

这个问题我整理在了文章ActiveMQ集群消息转发问题整理中,主要是由于DemandForwardBridge的机制需要使用Advisory Message,而Advisory Message又受到activemq.xml中的destinationPolicy配置的影响,可能导致Broker在启动时无法完全建立DemandForwardBridge导致消息无法在集群中转发的问题。

7. 利用好AMQ插件功能

官方说明:Interceptors
AMQ相比其他消息中间件来说,性能并不占优势,但是在TPS较小的场景下,我仍然认为AMQ可以作为首选。其主要原因是AMQ的功能太全太丰富了。

而AMQ开放了一个插件接口,更是使定制化功能成为了可能。截止目前,我在公司内使用AMQ实现了这些功能:

  • MQTT协议消费者配置消息选择器,插件中实现addConsumer方法。
  • MQTT协议用户权限认证(不走AMQ自带的权限功能),插件中实现addConnection方法。
  • 消息流日志,根据场景定制化日志内容,截取消息中的部分消息属性展现数据流。插件中实现send方法等。
  • 权限认证重做,AMQ自带的权限认证和shiro都有较复杂的配置,参考shiro插件直接重写了一个权限认证插件,定时扫描一个新的配置文件,实现黑白名单的功能。

在编写插件的时候,就像是AMQ帮忙实现了消息中间件的基础功能,而我们可以根据具体场景实现更定制化的功能。

8. 集群拓扑设计

我之前也写过一篇文章介绍了集中常见的拓扑ActiveMQ几种集群拓扑架构分析

完全图拓扑的适用范围最广,但是性能肯定也是最差的。每一个集群的连接都会占用资源,因此建议根据实际情况尽量减少MQ集群的大小。

如果有可能无止尽扩展MQ集群,那最好不要使用完全图拓扑。集线器拓扑目前来看是最合适的。

9. ActiveMQ绑定本机地址和端口

ActiveMQ默认的机制里,BrokerUrl配置服务端的地址,如

tcp://112.32.124.12:61616

这种配置方法是默认配置,如果客户端有多个网卡多个IP,需要指定本机IP和端口来建立通讯该怎么实现呢?

tcp://112.32.124.12:61616/81.31.12.3:2091

使用这种写法,就表示本机配置81.31.12.3这个IP地址,端口使用2091。如果客户端没把握指定的端口是否被占用,可以将端口配置成0,这样就由操作系统自动分配端口了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容