ActiveMQ 消息队列
1.消息队列的产生原因:
对于用户体验有很大的提升。如用户注册时,要提示注册成功还要发手机信息、发邮件通知,让用户知道自己操作成功的良好体验。但是这些发手机消息等操作需要程序运作,需要时间,导致程序变慢。所以需要消息队列来管理发手机消息等操作,而不会影响提示注册成功的运行效率。
2.ActiveMQ运行原理:
当有一个消息发过来时,将其保存在ActiveMQ中,然后通过JAVA有的JMS消息服务来监听ActiveMQ,有消息过来就执行这个消息的业务程序。是通过异步来实现的,来保证正常的业务正常执行。
3.ActiveMQ的应用场景:
不影响主业务的一些优化,提高用户的体验的业务。如用户注册时,如果要输入验证码的业务就不能加入消息队列中。因为这个已经是会影响主业务的流程。
4.ActiveMQ的两种模式:
p2p(Queue):单对单的操作。对时间依赖性高(MQ队列中的消息不管是什么时候加入进去的,JMS的监听都会消费执行掉) 如MQ中有一个消息队列,而JMS中有个监听队列一直看MQ中的消息队列有没有消息,有消息就执行掉这个消息的业务,成功后返回给MQ结果,让MQ将其消息删掉。
PB 发布订阅(Topic):多对多的操作,对时间依赖性低。如多个用户订阅一个书籍的话题时,MQ中书籍话题的多个消息是要推送给多个用户的。但是只会推送订阅后的消息,而订阅前的消息不会推送。
5.ActiveMQ类似的主流技术:
Kafka:国外应用的技术,用zookeeper进行服务注册。
RocketMQ:阿里巴巴仿照kafka制作出来的,主要是使用nameSrv简化了服务注册。
6.ActiveMQ的安装下载和使用:
文件是解压版不需要安装,下载路径:http://activemq.apache.org
使用是执行运行解压文件bin目录下的win64下的activemq.bat文件。其中端口8161是用来查看消息的。账号/密码默认为admin/admin
7.JAVA代码实现ActiveMQ:
生产消息的类如下:
package com.fenggf.MQ;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MQMain {
public static void main(String[] args) throws JMSException {
//创建连接工厂 JMS 用它创建连接
ConnectionFactory connectFac = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
//JMS 客户端到JMS Provider 的连接
Connection conn = connectFac.createConnection();
conn.start();
//参数 是否开启事务模式,签收模式,默认是自动模式
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//获取session注册参数值my-queue的名字
//Destination 消息的目的地,消息发送给谁
Destination destina = session.createQueue("fgf-queue");
//MessageProducer :消息生产者
MessageProducer producer = session.createProducer(destina);
//设置不持久
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//发送一条消息
sendMSG(session,producer,"你好 消息队列 3333");
//关闭连接
conn.close();
}
private static void sendMSG(Session session, MessageProducer producer,
String msg) throws JMSException {
//创建一条文本消息
TextMessage message = session.createTextMessage(msg);
//通过消息生产者发出消息
producer.send(message);
}
}
监听消息的消费类代码如下:
package com.fenggf.MQ;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JMSMain {
public static void main(String[] args) throws JMSException {
//创建工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER
, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
//JMS 客户端到JMS provider 的连接
Connection connection = connectionFactory.createConnection();
connection.start();
//Session : 发送或者接收消息的线程
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//消息的目的地,消息发送给谁
//获取服务器上同名的消息队列
Destination destina = session.createQueue("fgf-queue");
//消息接收者
MessageConsumer consumer = session.createConsumer(destina);
while(true){
TextMessage message = (TextMessage)consumer.receive();
if(null != message){
System.out.println("收到消息为: "+message.getText());
//session.commit();
}else
break;
}
session.close();
connection.close();
}
}