中间件在中大型的系统中应用较为广泛,主要用来解决系统模块之间的强耦合关系;也就是说消息中间件不需要同步返回结果,也就是大家常说的削峰处理;
一、简介
- 消息中间件主要用来解决系统之间或者系统模块间通信的中间件,一般有两种模式,即:
- Queue模式
- 即PTP:Peer To Peer,点对点的消息传送,当消息的生产者把消息存储在队列中,消息的消费者就会自动从队列中获取消息;
- Topic模式
- 即Pub/Sub:发布订阅模式,类似于广播,仅仅将消息发送给在线的注册了的消费者;
- Queue模式
- 常见的消息中间件
-
ActiveMQ
- ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。我们在本次课程中介绍 ActiveMQ的使用。
-
RabbitMQ
- 采用AMQP协议的领导实现,支持多种场景。淘宝的MySQL集群内部有使用它进行通讯,OpenStack开源云平台的通信组件,最先在金融行业得到运用。
-
ZeroMQ
- 据说是史上最快的消息队列系统
-
Kafka
- Apache下的一个子项目 。特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统。适合处理海量数据。
-
ActiveMQ
二、安装
- 下载,地址
- 解压,移动到想安装的目录:
tar -zxvf apache-activemq-5.12.3-bin.tar.gz
- 启动
./activemq start
- 监控页面访问
http://<host>:8161/
- 登录
- 点击:Manage ActiveMQ broker;
- 输入默认的用户名密码admin;
- 查看消息中心的内容
三、JMS-ActiveMQ Demo
- 导入pom依赖
<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.14.5</version> </dependency> </dependencies>
- PTP生产者代码
package org.shreker.middleware.activemq.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @Author: ShrekerNil * @Date: 2016-10-14 20:59 * @Description: PTP的消息生产者 */ public class PTPProducer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.1.10:61616"); Connection connection = factory.createConnection(); connection.start(); // 参数1:是否启用事务;参数2:消息的确认方式 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("<queue-name>"); MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 3; i++) { TextMessage message = session.createTextMessage("message--" + i); Thread.sleep(1000); //通过消息生产者发出消息 producer.send(message); } producer.close(); session.commit(); session.close(); connection.close(); } }
- PTP消费者代码
package org.shreker.middleware.activemq.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @Author: ShrekerNil * @Date: 2016-10-14 21:07 * @Description: PTP的消息消费者 */ public class PTPConsumer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.1.10:61616"); Connection connection = factory.createConnection(); connection.start(); // 参数1:是否启用事务;参数2:消息的确认方式 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("<queue-name>"); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { System.out.println(textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }); System.in.read(); // 暂停,让一只等待消息 consumer.close(); session.commit(); session.close(); connection.close(); } }
- Topic的生产者代码
package org.shreker.middleware.activemq.topic; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @Author: ShrekerNil * @Date: 2016-10-14 20:59 * @Description: PubSub的消息生产者 */ public class TopicProducer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.1.10:61616"); Connection connection = factory.createConnection(); connection.start(); // 参数1:是否启用事务;参数2:消息的确认方式 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //Destination destination = session.createQueue("<queue-name>"); Topic topic = session.createTopic("<topic-name>"); // Queue和Topic至于这一步不一样 MessageProducer producer = session.createProducer(topic); for (int i = 0; i < 3; i++) { TextMessage message = session.createTextMessage("message--" + i); Thread.sleep(1000); //通过消息生产者发出消息 producer.send(message); } producer.close(); session.commit(); session.close(); connection.close(); } }
- Topic的消费者代码
package org.shreker.middleware.activemq.topic; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @Author: ShrekerNil * @Date: 2016-10-14 21:07 * @Description: PubSub的消息消费者 */ public class TopicConsumer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.1.10:61616"); Connection connection = factory.createConnection(); connection.start(); // 参数1:是否启用事务;参数2:消息的确认方式 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //Destination destination = session.createQueue("<queue-name>"); Topic topic = session.createTopic("<topic-name>"); // Queue和Topic至于这一步不一样 MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { System.out.println(textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }); System.in.read(); // 暂停,让一只等待消息 consumer.close(); session.commit(); session.close(); connection.close(); } }
四、SpringJMS
- 导入POM依赖
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>5.1.2.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>5.1.2.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.14.5</version> </dependency>
- 编写代码
- 这里就不要把代码直接贴在这里了,代码超级简单,我把代码放在码云上了;