消息中间件(消息队列)是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题实现高性能,高可用,可伸缩和最终一致性[架构] 使用较多的消息
队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ以下介绍消息队列在实际应用中常用的使用场景:异步处理,应用解耦,流量削锋和消息通讯四个场景。
安装
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现,是运行在 Erlang 语言环境下的,所以需要先安装 下载并安装 Eralng,在安装 rabbitMQ
1.安装Erlang
2.安装RabbitMQ
3.安装管理界面(插件)
进入rabbitMQ安装目录的sbin目录,输入命令
rabbitmq‐plugins enable rabbitmq_management
4.重新启动服务
5.打开浏览器,地址栏输入http://127.0.0.1:15672 , 即可看到管理界面的登陆页,输入用户名和密码,都为 guest 进入主界面。
spring中使用
打包
<!-- rabbitMQ消息中间件 -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.4.RELEASE</version>
</dependency>
创建消息生产者applicationContext-rabbitmq-producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--连接工厂-->
<rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="guest" password="guest"/>
<!--指定连接工厂-->
<rabbit:admin connection-factory="connectionFactory"></rabbit:admin>
<!--创建队列-->
<rabbit:queue name="queue.sms"></rabbit:queue>
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" />
</beans>
再次创建消息消费者applicationContext-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--连接工厂-->
<rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="guest" password="guest" />
<!--队列-->
<rabbit:queue name="queue.sms" />
<!--消费者监听类-->
<bean id="messageConsumer" class="cn.qingcheng.consumer.SmsMessageConsumer"></bean>
<!--设置监听容器-->
<rabbit:listener-container connection-factory="connectionFactory" >
<!-- 关联队列与监听类 -->
<rabbit:listener queue-names="queue.sms" ref="messageConsumer"/>
</rabbit:listener-container>
</beans>
创建一个监听类cn.qingcheng.consumer.SmsMessageConsumer
package cn.qingcheng.consumer;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import java.util.HashMap;
import java.util.Map;
/**
* rabbitMQ 监听类
*/
public class SmsMessageConsumer implements MessageListener {
@Autowired
private SmsUtil smsUtil;
@Value("${smsCode}")
private String smsCode;
@Value("${param}")
private String param;// {"code":"value"}
/**
* 获取消息队列中的消息
* @param message
*/
@Override
public void onMessage(Message message) {
// 接收 rabbitTemplate.convertAndSend() 发送到 rabbitMQ 的消息
byte[] body = message.getBody();
String jsonString = new String(body);
Map<String,String> map = JSON.parseObject(jsonString, Map.class);
// 手机号
String phone = map.get("phone");
// 验证码
String code = map.get("code");
// 阿里云短信发送
String code1 = String.format("{\"code\":\"%s\"}",123);
/*
JSONObject object = new JSONObject();
object.put("code",456);
String code = object.toString();
Map<String, Object> map = new HashMap<>();
map.put("code",789);
String code = JSON.toJSON(map).toString();
*/
smsUtil.sendSms(phone,smsCode,code1);
}
}