安装ActiveMQ/队列消息/发布订阅

1.下载ActiveMQ,http://activemq.apache.org/

2.解压后结构如下
bin存放的是脚本文件
conf存放的是基本配置文件
data存放的是日志文件
docs存放的是说明文档
examples存放的是简单的实例
lib存放的是activemq所需jar包
webapps用于存放项目的目录

3.启动MQ

  • 进入bin目录,./activemq start(linux)
  • 进入bin目录,执行activemq start(windows)

启动后打开http://127.0.0.1:8161/admin/,可以看到MQ的控制台,用户名密码默认都是admin。

4.更改用户名密码
activemq.xml中有<import resource="jetty.xml"/>, jetty.xml 中有

<bean id="securityLoginService" class="org.eclipse.jetty.security.HashLoginService">        
    <property name="name" value="ActiveMQRealm" />        
    <property name="config" value="${activemq.conf}/jetty-realm.properties" />    
</bean>

在jetty-realm.properties中可更改用户名密码。


示例

添加maven依赖

<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.13.3</version>
</dependency>

P2P生产者:参考

package com.tgb.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * 消息的生产者(发送者) 
 *
 */
public class JMSProducer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址
    private static final int SENDNUM = 10;//发送的消息数量

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;//连接工厂
        Connection connection = null;//连接
        Session session;//会话 接受或者发送消息的线程
        Destination destination;//消息的目的地
        MessageProducer messageProducer;//消息生产者
        connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);//实例化连接工厂
        try {
            connection = connectionFactory.createConnection();//通过连接工厂获取连接
            connection.start();//启动连接
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//创建session
            destination = session.createQueue("HelloWorld");//创建一个名称为HelloWorld的消息队列
            messageProducer = session.createProducer(destination);//创建消息生产者
            sendMessage(session, messageProducer);//发送消息
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            if(connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    /**
     * 发送消息
     * @param session
     * @param messageProducer  消息生产者
     * @throws Exception
     */
    public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
        for (int i = 0; i < JMSProducer.SENDNUM; i++) {
            TextMessage message = session.createTextMessage("ActiveMQ 发送消息" +i);//创建一条文本消息 
            System.out.println("发送消息:Activemq 发送消息" + i);
            messageProducer.send(message);//通过消息生产者发出消息 
        }
    }
}

P2P消费者

package testactivemq;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * Created by zzhblh on 2016/8/27.
 */
public class JMSConsumer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址
    public static void main(String[] args) {
        ConnectionFactory connectionFactory;//连接工厂
        Connection connection = null;//连接
        Session session;//会话 接受或者发送消息的线程
        Destination destination;//消息的目的地
        MessageConsumer messageConsumer;//消息的消费者
        connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);//实例化连接工厂
        try {
            connection = connectionFactory.createConnection();//通过连接工厂获取连接
            connection.start();//启动连接
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建session
            destination = session.createQueue("HelloWorld");//创建一个连接到HelloWorld的消息队列
            messageConsumer = session.createConsumer(destination);//创建消息消费者

            //只能收到一个
            TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);//使这个messageConsumer运行500秒
            if(textMessage != null){
                System.out.println("收到的消息:" + textMessage.getText());
            }
            //收到所有
            messageConsumer.setMessageListener(new Listener());

        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}

Topic 生产者

package testactivemq;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMapMessage;

import javax.jms.*;

/**
 * Created by zzhblh on 2016/8/27.
 */
public class Publisher {
    ConnectionFactory factory;//连接工厂
    Connection connection = null;//连接
    Session session;//会话 接受或者发送消息的线程
    Destination[] destinations;//消息的目的地
    MessageProducer producer;

    public Publisher() throws JMSException {
        factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);
        connection = factory.createConnection();
        try {
            connection.start();//启动连接
        } catch (JMSException e) {
            connection.close();
            throw e;
        }
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        producer = session.createProducer(null);
    }

    protected void setTopics(String[] stocks) throws JMSException {
        destinations = new Destination[stocks.length];
        for(int i = 0; i < stocks.length; i++) {
            destinations[i] = session.createTopic("STOCKS." + stocks[i]);
        }
    }


    protected void sendMessage(String[] stocks) throws JMSException {
        for(int i = 0; i < stocks.length; i++) {
            Message message = createStockMessage(stocks[i], session);
            System.out.println("Sending: " + ((ActiveMQMapMessage)message).getContentMap() + " on destination: " + destinations[i]);
            producer.send(destinations[i], message);
        }
    }

    protected Message createStockMessage(String stock, Session session) throws JMSException {
        MapMessage message = session.createMapMessage();
        message.setString("stock", stock);
        message.setDouble("price", 1.00);
        message.setDouble("offer", 0.01);
        message.setBoolean("up", true);
        return message;
    }

    public void close() throws JMSException {
        if (connection != null) {
            connection.close();
        }
    }

    public static void main(String[] args) throws JMSException {

        // Create publisher
        Publisher publisher = new Publisher();

        // Set topics
        String[] stocks = {"a","b","c","d","e"};
        publisher.setTopics(stocks);

        for(int i = 0; i < 10; i++) {
            publisher.sendMessage(stocks);
            System.out.println("Publisher '" + i + " price messages");
            try {
                Thread.sleep(1000);
            } catch(InterruptedException e) {
                e.printStackTrace();
            }
        }
        // Close all resources
        publisher.close();
    }
}

Topic消费者

package testactivemq;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.text.DecimalFormat;

/**
 * Created by zzhblh on 2016/8/27.
 */
public class Subscriber {
    ConnectionFactory factory;//连接工厂
    Connection connection = null;//连接
    Session session;//会话 接受或者发送消息的线程
    Destination destination;//消息的目的地

    public Subscriber() throws JMSException {
        factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);
        connection = factory.createConnection();
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    }

    public static void main(String[] args) throws JMSException {
        Subscriber consumer = new Subscriber();
        String[] stocks = {"a","b","c","d","e"};//找到发布的Topic
        for (String stock : stocks) {
            Destination destination = consumer.getSession().createTopic("STOCKS." + stock);
            MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination);

            //只能收到一次
            MapMessage map = (MapMessage) messageConsumer.receive();//使这个messageConsumer运行500秒
            if(map != null){
                stock = map.getString("stock");
                double price = map.getDouble("price");
                double offer = map.getDouble("offer");
                boolean up = map.getBoolean("up");
                DecimalFormat df = new DecimalFormat("#,###,###,##0.00");
                System.out.println(stock + "\t" + df.format(price) + "\t" + df.format(offer) + "\t" + (up ? "up" : "down"));
            }else {
                break;
            }
            //监听收到所有
            messageConsumer.setMessageListener(new Listener());
        }
    }

    public Session getSession() {
        return session;
    }
}

用于监听的Listener

package testactivemq;

import org.apache.activemq.command.ActiveMQTextMessage;

import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import java.text.DecimalFormat;

/**
 * Created by zzhblh on 2016/8/27.
 */
public class Listener implements MessageListener {
    public void onMessage(Message message) {
        try {
            if(message instanceof ActiveMQTextMessage){
                ActiveMQTextMessage textMessage = (ActiveMQTextMessage) message;
                System.out.println("收到的消息:" + textMessage.getText());
            }
            if(message instanceof MapMessage){
                MapMessage map = (MapMessage) message;
                String stock = map.getString("stock");
                double price = map.getDouble("price");
                double offer = map.getDouble("offer");
                boolean up = map.getBoolean("up");
                DecimalFormat df = new DecimalFormat("#,###,###,##0.00");
                System.out.println(stock + "\t" + df.format(price) + "\t" + df.format(offer) + "\t" + (up ? "up" : "down"));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

异步与同步接收

  • 消息的异步接收:
    异步接收是指当消息到达时,主动通知客户端。通过客户为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。
    JMS客户端可以通过注册一个实现MessageListener接口的对象到MessageConsumer,这样,每当消息到达时,JMS Provider 会调用MessageListener中的onMessage 方法。会话(主题或队列)负责产生某些消息,这些消息被传送到使用onMessage方法的监听者那里。
  • 消息的同步接收:
    jms同步接受消息的功能(客户端必须请求每个消息),通过调用消费者的receive方法从目的地中显式提取消息,receive方法可以一直阻塞到消息到达。
    接收消息的方法还有一个"不等待"的版本,使用这个方法时QueueReceiver对象检查是否有消息之后立即返回,将控制交还给程序。
    TextMessage message = queueReceiver.receiveNoWait ()。

jms消息的确认模式

一般建议,一个事务类型的Session中只有一个Consumer,避免混乱。

public Consumer() throws JMSException {
    factory = new ActiveMQConnectionFactory(brokerURL);
    connection = factory.createConnection();
    connection.start();
    //第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
    //第一个参数为false时,第二个参数的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
    //Session.AUTO_ACKNOWLEDGE为自动确认,在同步模式下(使用consumer.receive):客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功,此消息将丢失。基于异步调用时(使用Listener),消息的确认是在onMessage方法返回之后,如果onMessage方法异常,会导致消息不能被ACK,会触发重发。
    // 对于consumer而言,optimizeAcknowledge属性只会在AUTO_ACK模式下有效。
    //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。需要注意的是调用acknowledge方法后,此session下所有未确认的消息将全部被确认,一般建议,一个事务类型的Session中只有一个Consumer,避免混乱。
    //DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}

如果使用CLIENT_ACKNOWLEDGE,则需要手动确认。否则MQ-SERVER不会删除消息,消息会被重复发送。

msg = (TextMessage) consumer.receive();
//acknowledge
msg.acknowledge();

如果第一个参数为true,则一定要session.commit()。否则MQ-SERVER不会删除消息,消息会被重复发送。当session使用事务时,在事务开启之后,和session.commit()之前,所有消费的消息,要么全部正常确认,要么全部redelivery。

session.commit();
messageConsumer.close();
session.close();
connection.close();

Broker内每条消息都有一个ACK_TYPE,它通常是一种内部机制,并不会面向开发者。ActiveMQ中定义了如下几种ACK_TYPE(参看MessageAck类):

  • DELIVERED_ACK_TYPE = 0 消息"已接收",但尚未处理结束
  • STANDARD_ACK_TYPE = 2 "标准"类型,通常表示为消息"处理成功",broker端可以删除消息了
  • POSION_ACK_TYPE = 1 消息"错误",通常表示"抛弃"此消息,比如消息重发多次后,都无法正确处理时,消息将会被删除或者DLQ(死信队列)
  • REDELIVERED_ACK_TYPE = 3 消息需"重发",比如consumer处理消息时抛出了异常,broker稍后会重新发送此消息
  • INDIVIDUAL_ACK_TYPE = 4 表示只确认"单条消息",无论在任何ACK_MODE下
  • UNMATCHED_ACK_TYPE = 5 在Topic中,如果一条消息在转发给“订阅者”时,发现此消息不符合Selector过滤条件,那么此消息将 不会转发给订阅者,消息将会被存储引擎删除(相当于在Broker上确认了消息)。

要点:
1.connection = connectionFactory.createConnection();//通过连接工厂获取连接
2.connection.start();//启动连接
3.session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//创建session,true表示是否事务
4.destination = session.createQueue("HelloWorld");//创建一个名称为HelloWorld的消息队列
5.messageProducer = session.createProducer(destination);//创建消息生产者
6.TextMessage message = session.createTextMessage("ActiveMQ 发送消息")//创建消息
7.messageProducer.send(message);//通过消息生产者发出消息
8.connection.close();//关闭connection


参考:
http://shift-alt-ctrl.iteye.com/blog/2034440
http://shift-alt-ctrl.iteye.com/blog/2020182

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,875评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,569评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,475评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,459评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,537评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,563评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,580评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,326评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,773评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,086评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,252评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,921评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,566评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,190评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,435评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,129评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,125评论 2 352

推荐阅读更多精彩内容