rabbitmq官方文档:http://www.rabbitmq.com/getstarted.html
RabbitMQ是一个消息代理:它接受和转发消息。你可以把它想象成一个邮局:当你把邮件放在邮箱里时,你可以确定邮差先生最终会把邮件发送给你的收件人。
在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员
RabbitMQ与邮局的主要区别是它不处理纸张,而是接受,存储和转发数据消息的二进制数据块。
P是我们的生产者,C是我们的消费者。中间的框是一个队列
package com.wiwj.passport.util.mq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.wiwj.passport.config.PassportConfig;
/**
* <p>Title: MQ.java</p>
*
* <p>Package: com.wiwj.passport.util.mq</p>
*
* <p>Description: mq</p>
*
* @author: 杨磊
*
* @date: 2017年9月11日 下午2:40:21
*
* @version: 1.0
*/
public class MQ {
private static ConnectionFactory factory = null;
static{
if(factory==null){
factory=new ConnectionFactory(); //创建连接工厂
factory.setHost(PassportConfig.getString("mq_ip")); //ip
factory.setVirtualHost(PassportConfig.getString("mq_virtualhost")); //虚拟机
factory.setUsername(PassportConfig.getString("mq_name"));
factory.setPassword(PassportConfig.getString("mq_password"));
factory.setPort(5672);
}
}
public static ConnectionFactory getfactory() {
return factory;
}
public static void send(String message)throws Exception{
//创建一个新的连接
Connection connection = getfactory().newConnection();
//创建一个通道
Channel channel = connection.createChannel();
// 绑定一个队列
channel.queueBind(PassportConfig.getString("mq_queue_name"), PassportConfig.getString("mq_exchange_name"), "");
//发送消息到队列中
channel.basicPublish("", PassportConfig.getString("mq_queue_name"), null, message.getBytes("UTF-8"));
System.out.println("发送到MQ的消息为 +'" + message + "'");
//关闭通道和连接
channel.close();
connection.close();
}
public static void Receive()throws Exception{
//创建一个新的连接
Connection connection = getfactory().newConnection();
//创建一个通道
Channel channel = connection.createChannel();
// 绑定一个队列
channel.queueBind(PassportConfig.getString("mq_queue_name"), PassportConfig.getString("mq_exchange_name"), "");
QueueingConsumer consumer = new QueueingConsumer(channel) ;
channel.basicConsume(PassportConfig.getString("mq_queue_name"), true, consumer) ;
//循环获取消息
while(true){
//获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery() ;
String msg = new String(delivery.getBody()) ;
System.out.println("接收消息为[" + msg + "] 来自于队列: " + PassportConfig.getString("mq_queue_name"));
}
}
public static void main(String[] args) {
try {
//send("@#$%&&^^^^**!@@s上搜索谁谁谁水水水水水水水水水水水水1");
Receive();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Spring 整合rabbitmq
applicationContext-mq.xml:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">
<!--<?xml version="1.0" encoding="UTF-8"?>-->
<!--<beans xmlns="http://www.springframework.org/schema/beans"-->
<!--xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"-->
<!--xmlns:rabbit="http://www.springframework.org/schema/rabbit"-->
<!--xsi:schemaLocation="http://www.springframework.org/schema/beans-->
<!--http://www.springframework.org/schema/beans/spring-beans-4.0.xsd-->
<!--http://www.springframework.org/schema/rabbit-->
<!--http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd" >-->
<description>rabbitmq 连接服务配置</description>
<!-- 连接配置 -->
<rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}" virtual-host="${mq.vhost}"/>
<rabbit:admin connection-factory="connectionFactory"/>
<!-- spring template声明-->
<rabbit:template exchange="amqpExchange" id="amqpTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" />
<!-- 消息对象json转换类 -->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
<!--申明一个消息队列Queue-->
<rabbit:queue id="test_queue_key" name="queue_house_bj" durable="true" auto-delete="false" exclusive="false" />
<!-- 定义交换器,自动声明 -->
<rabbit:fanout-exchange name="amqpExchange" auto-declare="true" durable="false">
<rabbit:bindings>
<rabbit:binding queue="test_queue_key"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
<bean id = "queueListenter" class="com.Consumer"></bean>
<!--<bean id="rabbitTxManage" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">-->
<!--<property name="connectionFactory" ref="cachingConnectionFactory"></property>-->
<!--</bean>-->
<!--监听配置-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
<rabbit:listener queues="test_queue_key" ref="queueListenter"/>
</rabbit:listener-container>
</beans>
rabbitmq-config.properties
mq.host=10.1.7.255
mq.username=admin
mq.password=admin
mq.port=5672
mq.vhost=centerMQ
applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<!-- ========================================spring注解扫描包路径=============================================== -->
<!-- ========================================数据库连接参数文件===================================================== -->
<context:property-placeholder location="classpath*:rabbitmq-config.properties"/>
<bean id = "producer" class="com.ProducerImpl"></bean>
</beans>
生产者:
ProducerImpl
package com;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
public class ProducerImpl implements Producer {
private static final Logger logger = LoggerFactory.getLogger(ProducerImpl.class);
@Autowired
private AmqpTemplate amqpTemplate;
public void sendDataToQueue(String queueKey, Object object) {
try {
amqpTemplate.convertAndSend(queueKey, object);
// amqpTemplate.convertAndSend(object);
} catch (Exception e) {
logger.error(e.getMessage());
}
}
}
消费者:
Consumer.java
package com;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
/**
*
*/
public class Consumer implements ChannelAwareMessageListener{
// public void onMessage(Message message) {
// try {
// System.out.println("============================1231231");
//// System.out.println(new String(message.getBody(), "UTF-8"));
// System.out.println(message.toString());
//
// }catch (Exception e){
//
// }
//
// }
@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("============================1231231");
System.out.println(new String(message.getBody(), "UTF-8"));
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.bin.spring-amqp</groupId>
<artifactId>spring-amqp-test</artifactId>
<packaging>war</packaging>
<version>1.0-SNAPSHOT</version>
<name>spring-amqp Maven Webapp</name>
<url>http://maven.apache.org</url>
<properties>
<!-- spring版本号 -->
<spring.framework.version>4.2.5.RELEASE</spring.framework.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.framework.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.framework.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>${spring.framework.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.framework.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>${spring.framework.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>${spring.framework.version}</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<build>
<finalName>spring-amqp</finalName>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.xml</include>
</includes>
<filtering>true</filtering>
</resource>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<!-- Test -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<!-- jetty插件 -->
<!-- Idea 添加此插件以便启动jetty -->
<plugin>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-maven-plugin</artifactId>
<version>9.2.21.v20170120</version>
<configuration>
<httpConnector>
<port>8082</port><!-- 启动端口号-->
</httpConnector>
<stopPort>9967</stopPort>
<stopKey>stop</stopKey>
<scanIntervalSeconds>10</scanIntervalSeconds>
<webApp>
<contextPath>/amqp</contextPath>
</webApp>
<!-- 适应测试阶段的jetty,日志写入目录,tomcat写入tomcat/logs下 -->
<systemProperties>
<systemProperty>
<name>catalina.base</name>
<value>.</value>
</systemProperty>
</systemProperties>
</configuration>
</plugin>
</plugins>
</build>
</project>