1.下载ActiveMQ,http://activemq.apache.org/
2.解压后结构如下
bin存放的是脚本文件
conf存放的是基本配置文件
data存放的是日志文件
docs存放的是说明文档
examples存放的是简单的实例
lib存放的是activemq所需jar包
webapps用于存放项目的目录
3.启动MQ
- 进入bin目录,./activemq start(linux)
- 进入bin目录,执行activemq start(windows)
启动后打开http://127.0.0.1:8161/admin/,可以看到MQ的控制台,用户名密码默认都是admin。
4.更改用户名密码
activemq.xml中有<import resource="jetty.xml"/>, jetty.xml 中有
<bean id="securityLoginService" class="org.eclipse.jetty.security.HashLoginService">
<property name="name" value="ActiveMQRealm" />
<property name="config" value="${activemq.conf}/jetty-realm.properties" />
</bean>
在jetty-realm.properties中可更改用户名密码。
示例
添加maven依赖
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.13.3</version>
</dependency>
P2P生产者:参考
package com.tgb.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 消息的生产者(发送者)
*
*/
public class JMSProducer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址
private static final int SENDNUM = 10;//发送的消息数量
public static void main(String[] args) {
ConnectionFactory connectionFactory;//连接工厂
Connection connection = null;//连接
Session session;//会话 接受或者发送消息的线程
Destination destination;//消息的目的地
MessageProducer messageProducer;//消息生产者
connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);//实例化连接工厂
try {
connection = connectionFactory.createConnection();//通过连接工厂获取连接
connection.start();//启动连接
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//创建session
destination = session.createQueue("HelloWorld");//创建一个名称为HelloWorld的消息队列
messageProducer = session.createProducer(destination);//创建消息生产者
sendMessage(session, messageProducer);//发送消息
session.commit();
} catch (Exception e) {
e.printStackTrace();
}finally{
if(connection != null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/**
* 发送消息
* @param session
* @param messageProducer 消息生产者
* @throws Exception
*/
public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
for (int i = 0; i < JMSProducer.SENDNUM; i++) {
TextMessage message = session.createTextMessage("ActiveMQ 发送消息" +i);//创建一条文本消息
System.out.println("发送消息:Activemq 发送消息" + i);
messageProducer.send(message);//通过消息生产者发出消息
}
}
}
P2P消费者
package testactivemq;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Created by zzhblh on 2016/8/27.
*/
public class JMSConsumer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址
public static void main(String[] args) {
ConnectionFactory connectionFactory;//连接工厂
Connection connection = null;//连接
Session session;//会话 接受或者发送消息的线程
Destination destination;//消息的目的地
MessageConsumer messageConsumer;//消息的消费者
connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);//实例化连接工厂
try {
connection = connectionFactory.createConnection();//通过连接工厂获取连接
connection.start();//启动连接
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建session
destination = session.createQueue("HelloWorld");//创建一个连接到HelloWorld的消息队列
messageConsumer = session.createConsumer(destination);//创建消息消费者
//只能收到一个
TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);//使这个messageConsumer运行500秒
if(textMessage != null){
System.out.println("收到的消息:" + textMessage.getText());
}
//收到所有
messageConsumer.setMessageListener(new Listener());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
Topic 生产者
package testactivemq;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMapMessage;
import javax.jms.*;
/**
* Created by zzhblh on 2016/8/27.
*/
public class Publisher {
ConnectionFactory factory;//连接工厂
Connection connection = null;//连接
Session session;//会话 接受或者发送消息的线程
Destination[] destinations;//消息的目的地
MessageProducer producer;
public Publisher() throws JMSException {
factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);
connection = factory.createConnection();
try {
connection.start();//启动连接
} catch (JMSException e) {
connection.close();
throw e;
}
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(null);
}
protected void setTopics(String[] stocks) throws JMSException {
destinations = new Destination[stocks.length];
for(int i = 0; i < stocks.length; i++) {
destinations[i] = session.createTopic("STOCKS." + stocks[i]);
}
}
protected void sendMessage(String[] stocks) throws JMSException {
for(int i = 0; i < stocks.length; i++) {
Message message = createStockMessage(stocks[i], session);
System.out.println("Sending: " + ((ActiveMQMapMessage)message).getContentMap() + " on destination: " + destinations[i]);
producer.send(destinations[i], message);
}
}
protected Message createStockMessage(String stock, Session session) throws JMSException {
MapMessage message = session.createMapMessage();
message.setString("stock", stock);
message.setDouble("price", 1.00);
message.setDouble("offer", 0.01);
message.setBoolean("up", true);
return message;
}
public void close() throws JMSException {
if (connection != null) {
connection.close();
}
}
public static void main(String[] args) throws JMSException {
// Create publisher
Publisher publisher = new Publisher();
// Set topics
String[] stocks = {"a","b","c","d","e"};
publisher.setTopics(stocks);
for(int i = 0; i < 10; i++) {
publisher.sendMessage(stocks);
System.out.println("Publisher '" + i + " price messages");
try {
Thread.sleep(1000);
} catch(InterruptedException e) {
e.printStackTrace();
}
}
// Close all resources
publisher.close();
}
}
Topic消费者
package testactivemq;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.text.DecimalFormat;
/**
* Created by zzhblh on 2016/8/27.
*/
public class Subscriber {
ConnectionFactory factory;//连接工厂
Connection connection = null;//连接
Session session;//会话 接受或者发送消息的线程
Destination destination;//消息的目的地
public Subscriber() throws JMSException {
factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
public static void main(String[] args) throws JMSException {
Subscriber consumer = new Subscriber();
String[] stocks = {"a","b","c","d","e"};//找到发布的Topic
for (String stock : stocks) {
Destination destination = consumer.getSession().createTopic("STOCKS." + stock);
MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination);
//只能收到一次
MapMessage map = (MapMessage) messageConsumer.receive();//使这个messageConsumer运行500秒
if(map != null){
stock = map.getString("stock");
double price = map.getDouble("price");
double offer = map.getDouble("offer");
boolean up = map.getBoolean("up");
DecimalFormat df = new DecimalFormat("#,###,###,##0.00");
System.out.println(stock + "\t" + df.format(price) + "\t" + df.format(offer) + "\t" + (up ? "up" : "down"));
}else {
break;
}
//监听收到所有
messageConsumer.setMessageListener(new Listener());
}
}
public Session getSession() {
return session;
}
}
用于监听的Listener
package testactivemq;
import org.apache.activemq.command.ActiveMQTextMessage;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import java.text.DecimalFormat;
/**
* Created by zzhblh on 2016/8/27.
*/
public class Listener implements MessageListener {
public void onMessage(Message message) {
try {
if(message instanceof ActiveMQTextMessage){
ActiveMQTextMessage textMessage = (ActiveMQTextMessage) message;
System.out.println("收到的消息:" + textMessage.getText());
}
if(message instanceof MapMessage){
MapMessage map = (MapMessage) message;
String stock = map.getString("stock");
double price = map.getDouble("price");
double offer = map.getDouble("offer");
boolean up = map.getBoolean("up");
DecimalFormat df = new DecimalFormat("#,###,###,##0.00");
System.out.println(stock + "\t" + df.format(price) + "\t" + df.format(offer) + "\t" + (up ? "up" : "down"));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
异步与同步接收
- 消息的异步接收:
异步接收是指当消息到达时,主动通知客户端。通过客户为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。
JMS客户端可以通过注册一个实现MessageListener接口的对象到MessageConsumer,这样,每当消息到达时,JMS Provider 会调用MessageListener中的onMessage 方法。会话(主题或队列)负责产生某些消息,这些消息被传送到使用onMessage方法的监听者那里。 - 消息的同步接收:
jms同步接受消息的功能(客户端必须请求每个消息),通过调用消费者的receive方法从目的地中显式提取消息,receive方法可以一直阻塞到消息到达。
接收消息的方法还有一个"不等待"的版本,使用这个方法时QueueReceiver对象检查是否有消息之后立即返回,将控制交还给程序。
TextMessage message = queueReceiver.receiveNoWait ()。
jms消息的确认模式
一般建议,一个事务类型的Session中只有一个Consumer,避免混乱。
public Consumer() throws JMSException {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
connection.start();
//第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
//第一个参数为false时,第二个参数的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
//Session.AUTO_ACKNOWLEDGE为自动确认,在同步模式下(使用consumer.receive):客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功,此消息将丢失。基于异步调用时(使用Listener),消息的确认是在onMessage方法返回之后,如果onMessage方法异常,会导致消息不能被ACK,会触发重发。
// 对于consumer而言,optimizeAcknowledge属性只会在AUTO_ACK模式下有效。
//Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。需要注意的是调用acknowledge方法后,此session下所有未确认的消息将全部被确认,一般建议,一个事务类型的Session中只有一个Consumer,避免混乱。
//DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
如果使用CLIENT_ACKNOWLEDGE,则需要手动确认。否则MQ-SERVER不会删除消息,消息会被重复发送。
msg = (TextMessage) consumer.receive();
//acknowledge
msg.acknowledge();
如果第一个参数为true,则一定要session.commit()。否则MQ-SERVER不会删除消息,消息会被重复发送。当session使用事务时,在事务开启之后,和session.commit()之前,所有消费的消息,要么全部正常确认,要么全部redelivery。
session.commit();
messageConsumer.close();
session.close();
connection.close();
Broker内每条消息都有一个ACK_TYPE,它通常是一种内部机制,并不会面向开发者。ActiveMQ中定义了如下几种ACK_TYPE(参看MessageAck类):
- DELIVERED_ACK_TYPE = 0 消息"已接收",但尚未处理结束
- STANDARD_ACK_TYPE = 2 "标准"类型,通常表示为消息"处理成功",broker端可以删除消息了
- POSION_ACK_TYPE = 1 消息"错误",通常表示"抛弃"此消息,比如消息重发多次后,都无法正确处理时,消息将会被删除或者DLQ(死信队列)
- REDELIVERED_ACK_TYPE = 3 消息需"重发",比如consumer处理消息时抛出了异常,broker稍后会重新发送此消息
- INDIVIDUAL_ACK_TYPE = 4 表示只确认"单条消息",无论在任何ACK_MODE下
- UNMATCHED_ACK_TYPE = 5 在Topic中,如果一条消息在转发给“订阅者”时,发现此消息不符合Selector过滤条件,那么此消息将 不会转发给订阅者,消息将会被存储引擎删除(相当于在Broker上确认了消息)。
要点:
1.connection = connectionFactory.createConnection();//通过连接工厂获取连接
2.connection.start();//启动连接
3.session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//创建session,true表示是否事务
4.destination = session.createQueue("HelloWorld");//创建一个名称为HelloWorld的消息队列
5.messageProducer = session.createProducer(destination);//创建消息生产者
6.TextMessage message = session.createTextMessage("ActiveMQ 发送消息")//创建消息
7.messageProducer.send(message);//通过消息生产者发出消息
8.connection.close();//关闭connection
参考:
http://shift-alt-ctrl.iteye.com/blog/2034440
http://shift-alt-ctrl.iteye.com/blog/2020182