MQ 消息队列
应用程序之间的一种通信方式。
消息生产者 -> MQ -> 消息消费者
使用场景:将一些无需即时返回且耗时的操作提取出来,进行异步处理。
JMS Java Message Service
Java 平台中关于面向消息中间件的 API,即技术规范。
RabbitMQ
RabbitMQ is the most widely deployed open source message broker.
RabbitMQ 在 MAC 上的安装与启动
通过 Homebrew
来安装:brew install rabbitmq
。
安装到目录:/usr/local/Cellar/rabbitmq/3.7.7
启动 RabbitMQ 服务:sudo sbin/rabbitmq-server
默认端口 5672。
在单机上启动的时候可能会碰到问题,这里需要禁用几个插件:
.sbin/rabbitmq-plugins disable rabbitmq_stomp
.sbin/rabbitmq-plugins disable rabbitmq_mqtt
对应两个日志文件:
/usr/local/var/log/rabbitmq/rabbit@localhost.log
/usr/local/var/log/rabbitmq/rabbit@localhost_upgrade.log
管理界面:
在浏览器中输入:http://127.0.0.1:15672 默认用户名密码都为 guest
。
ActiveMQ
Apache ActiveMQ ™ is the most popular and powerful open source messaging and Integration Patterns server.
ActiveMQ 是 JMS 的一种实现。
ActiveMQ 在 MAC 上的安装与启动
下载:http://activemq.apache.org/download.html
本地解压并移动到 /usr/local
:
tar zxvf apache-activemq-5.15.4-bin.tar.gz
sudo mv apache-activemq-5.15.4 /usr/local
cd /usr/local/apache-activemq-5.15.4
cd bin/macosx
启动 ActiveMQ 服务:/activemq start
默认端口 61616。
管理界面:
在浏览器中输入:http://127.0.0.1:8161/admin/ 默认用户名密码都为 admin
。
- Queues:显示 队列 方式的消息
- Topics:显示 主题 方式的消息
ActiveMQ 消息有 3 种形式:
- JMS 公共
-
点对点方式 point-to-point:
- 即上述所说的 队列 方式
- Sender -> MQ -> Receiver
- 没有时间上的依赖,Sender 无需知道 Receiver 是否在运行
-
发布/订阅方式 publish/subscribe:
- 即上述所说的 主题 方式
- 用于多接收端
- 有时间上的依赖,一个 Subscriber 只能接受它创建以后的 Publisher 的消息
ActiveMQ 的使用
创建工厂类 ConnectionFactory
:提供 ActiveMQ Server 的连接信息。
得到 Connection
:Connection connection = connectionFactory.createConnection();
得到 Session
:Session session = connection.createSession();
发送消息:使用 MessageProducer
Destination destination = session.createQueue("mq.test");
MessageProducer producer = session.createProducer(destination);
MapMessage message = session.createMapMessage();
message.setString("name", "Tom");
producer.send(message);
session.commit();
接受消息:使用 MessageConsumer
Destination destination = session.createQueue("mq.test");
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
msg.getString("name");
}
});
Spring 集成 ActiveMQ
Spring 配置:
<!-- ActiveMQ Server 的连接 -->
<bean id="connectionFactory" class="ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<!-- 消息目标,例如上述的 mq.test -->
<bean id="destination" class="ActiveMQQueue">
<constructor-arg index="0" value="mq.test" />
</bean>
<bean id="jmsTemplate" class="JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="destination" />
<property name="messageConverter">
<bean class="SimpleMessageConverter" />
</property>
</bean>
发送消息:
jmsTemplate.send(message);
接受消息:
while(true) {
Map msg = (Map) jmsTemplate.receiveAndConvert();
msg.getString("name");
}