ActiveMQ插件开发实例-任务日志


项目里遇到需要在ActiveMQ上记录任务日志的场景。其实AMQ本身自带一个LoggingPlugin,但是用起来总是不切合项目的实际场景。思来想去,正好前段时间为其他项目做了个MQTT协议认证的插件,技术基础已经有了,还是自己给项目写一个定制化版的插件吧。

在我之前的文章ActiveMQ插件开发里介绍了如何开发一个AMQ的插件。其实这次的功能就是基于之前的代码里的内容进行修改的。主要功能是每次AMQ接收到一个任务消息后,就往一台服务器上使用HTTP POST方法发送一条消息。表示任务流经MQ。

先来看入口类,相比之前的代码,增加了两个参数,这两个参数可以在配置文件activemq.xml中实现手动配置。

package com.cn.amqs;
import org.apache.activemq.broker.Broker;   
import org.apache.activemq.broker.BrokerPlugin;   
import org.apache.activemq.plugin.StatisticsBrokerPlugin;
import org.apache.commons.logging.Log;   
import org.apache.commons.logging.LogFactory;  

public class MessageLogPlugin implements BrokerPlugin {   

  private Log log = LogFactory.getLog(StatisticsBrokerPlugin.class);  
  private String seviceUrl;
  private String sign;
  public Broker installPlugin(Broker broker) throws Exception {   
    log.info("install MessageLogPlugin");   
    return new MessageLog(broker,serviceUrl,sign);   
  }   
  public void setServiceUrl(String serviceUrl) {
    this.serviceUrl=serviceUrl;
  }
  ……
}  

主要功能在MessageLog类中,实现了几个功能:

  1. 每来一个任务消息,判断消息是否曾经来过,如果是第一次收到,则发送一条post消息到服务器上
  2. 记录一个任务的消息数量
  3. 为了防止任务数量无限增长,设置了定时清理机制(但是由于每个任务都设置了Timer,适用的场景应该是任务较少或者任务可清理时间较短的场景,否则也是对资源的消耗)
  4. 区分任务上行还是任务下行
package com.cn.amqs;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.activemq.broker.Broker;   
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.Message;
import org.apache.commons.logging.Log;   
import org.apache.commons.logging.LogFactory;

/**
 * 实现每次任务到达MQ时自动往一个地址上送一条信息
 * @author MiSterRabbit
 */
public class MessageLog extends BrokerFilter{
private Log log;  
/**下行任务HashMap*/
private ConcurrentHashMap<Object, Integer> downWards;
/**上行任务HashMap*/
private ConcurrentHashMap<Object, Integer> upWards;
private String seviceUrl;
private String sign;

public MessageLog(Broker next,String seviceUrl,String sign) {   
super(next);   
downWards = new ConcurrentHashMap<Object, Integer>();
upWards = new ConcurrentHashMap<Object, Integer>();
this.seviceUrl=seviceUrl;
this.sign=sign.isEmpty()? "分部":"总部";
log = LogFactory.getLog(com.cn.amqs.MessageLog.class);
log.info("initialize Message Log plugin");
}

/**
* Timer类,实现定时清理日志HashMap,防止Map的无限增长
*/
class missionTimer extends TimerTask {
private String missionID;
private Log log;
private ConcurrentHashMap<Object, Integer> map;
public missionTimer(String missionID, Log log, ConcurrentHashMap<Object, Integer> map) {
this.missionID=missionID;
this.log=log;
this.map=map;
}
@Override
public void run() {
    this.map.remove(missionID);
this.log.info("[FLOW_LOG] Remove expired mission: "+missionID);
}
}

/**
* 判断日志是否在map中,如果不在,则发送一条消息,若存在,则增加计数器
* @param missionID  任务号
* @param map  任务下发和任务上送使用不同的map
*/
 public synchronized void insertIntoMap(String missionID, ConcurrentHashMap<Object,Integer> map,String direction) {
 if(map.containsKey(missionID)) {
 int count = map.get(missionID)+1;
 map.put(missionID,count);
           this.log.debug("[FLOW_LOG] "+map);
       } else{
       map.put(missionID,1);
           this.log.info("[FLOW_LOG] Receive a new "+direction+" mission: "+missionID);
           // 开启一个线程发送一条任务数据,这里的MissionSend类其实就是开启一个线程发送一条http post消息
           if (direction.equalsIgnoreCase("DOWNWARD")){
           MissionSend tmqs = new MissionSend(missionID, super.getBrokerName().toString().substring(3), "ActiveMQ", "/opt/activemq/apache-activemq-5.13.4/data/mission.log", this.sign+"MQ收到下行任务", "ok", this.seviceUrl);
       new Thread(tmqs,"mission_send").start();
           } else {
           MissionSend tmqs = new MissionSend(missionID, super.getBrokerName().toString().substring(3), "ActiveMQ", "/opt/activemq/apache-activemq-5.13.4/data/mission.log", this.sign+"MQ收到上行任务", "ok", this.seviceUrl);
       new Thread(tmqs,"mission_send").start();
           }
           // 使用Timer定时清理,1800秒后清理这个任务
           Timer timer =new Timer();
           TimerTask task = new missionTimer(missionID,this.log,downWards);
           timer.schedule(task,1800000);
       }
 }
 
 /**
  * 每当MQ收到一条生产者发送过来的消息的时候执行判断。
  */
 public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
// 如果任务ID不为空,且不是经由集群内部发过来的消息
 if ((messageSend.getProperty("misid") != null) && 
(!producerExchange.getProducerState().getInfo().getProducerId().toString().contains("MQ_"))) {
   // 如果目的地不包含UPLAOD字段,则判断为消息下行,否则为消息上行。记录一个日志,然后调用insertIntoMap判断是否需要发送http post消息
 if (!messageSend.getDestination().toString().toLowerCase().contains("upload")) {
 this.log.info("[FLOW_LOG] Down Mission: " + messageSend.getProperty("misid") + ".  Destination: " + messageSend.getDestination() + ".  Producer: "+producerExchange.getConnectionContext().getConnection().getRemoteAddress());
 insertIntoMap(messageSend.getProperty("misid").toString(),downWards,"DOWNWARD");
 } else {
this.log.info("[FLOW_LOG] Up Mission: " + messageSend.getProperty("misid") + ".  Destination: " + messageSend.getDestination()+".  Producer: "+producerExchange.getConnectionContext().getConnection().getRemoteAddress());
insertIntoMap(messageSend.getProperty("misid").toString(),upWards,"UPWARD");
 }
 }
   super.send(producerExchange, messageSend);
 }
}  

MissionSend的类可以自由扩展。我就不赘述了。

插件功能为AMQ带来了极强的扩展性,用户可以实现在不对现有功能进行修改的前提下进行功能的二次开发。有空我会整理一个插件可以实现的功能清单。其实如果有空,看看BrokerFilter这个类,就能明白插件能实现的功能了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,372评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,368评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,415评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,157评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,171评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,125评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,028评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,887评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,310评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,533评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,690评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,411评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,004评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,812评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,693评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,577评论 2 353

推荐阅读更多精彩内容