AMQ有开放插件开发接口,今天研究了一下,直接给项目组加了个消息流监控的功能,看起来还是挺好的。
开发步骤是:
1.开发插件jar包
我写的这个插件主要作用是当有消息被发送的时候,就往日志中记录一条。不使用amq自带的logging plugin的主要原因是格式问题,而且我只需要筛选某几个应用的消息。
注:BrokerFilter这个类中的方法决定了插件能实现的功能,比如连接建立、连接断开、消息到达、消息过期等等都可以自定义操作。
程序如下
//入口类
package com.cn.amqs;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.plugin.StatisticsBrokerPlugin;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class MessageLogPlugin implements BrokerPlugin {
private Log log = LogFactory.getLog(StatisticsBrokerPlugin.class);
public Broker installPlugin(Broker broker) throws Exception {
log.info("install MessageLogPlugin");
return new MessageLog(broker);
}
}
//主要实现功能的类
package com.cn.amqs;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.Message;
import org.apache.activemq.plugin.StatisticsBrokerPlugin;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
//BrokerFilter这个类包含了broker的很多操作,可以看看源码了解一下
public class MessageLog extends BrokerFilter{
private Log log;
public MessageLog(Broker next) {
super(next);
log = LogFactory.getLog(StatisticsBrokerPlugin.class);
log.info("initialize Message Log plugin")
}
@Override
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
//在这里实现筛选
if (messageSend.getProperty("appname")!=null && messageSend.getProperty("misid")!=null){
if (!messageSend.getProperty("appname").equals("OSC")&&!messageSend.getProperty("appname").equals("OSP"))
log.info("[FLOW_LOG] MissionID: "+messageSend.getProperty("misid")+" Destination: "+messageSend.getDestination());
}
super.send(producerExchange, messageSend);
}
}
2.打上面的程序打成jar包后,将jar包放在amq的lib目录下。
3.修改activemq.xml文件
在activemq.xml文件中的plugins标签下添加如下的一段代码
<plugins>
<bean xmlns="http://www.springframework.org/schema/beans" id="MessageLogPlugin" class="com.cn.amqs.MessageLogPlugin">
</plugins>
4.重启MQ,消息流的日志就会显示在data目录下的activemq.log中了