RabbitMQ基本应用(java)API(二)

rabbitmq官方文档:http://www.rabbitmq.com/getstarted.html

RabbitMQ是一个消息代理:它接受和转发消息。你可以把它想象成一个邮局:当你把邮件放在邮箱里时,你可以确定邮差先生最终会把邮件发送给你的收件人。
在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员
RabbitMQ与邮局的主要区别是它不处理纸张,而是接受,存储和转发数据消息的二进制数据块。


图片.png

P是我们的生产者,C是我们的消费者。中间的框是一个队列

package com.wiwj.passport.util.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.wiwj.passport.config.PassportConfig;

/**
 * <p>Title: MQ.java</p>
 *
 * <p>Package: com.wiwj.passport.util.mq</p>
 * 
 * <p>Description: mq</p>
 * 
 * @author: 杨磊
 * 
 * @date: 2017年9月11日 下午2:40:21
 *
 * @version: 1.0
 */
public class MQ {
    private  static  ConnectionFactory factory =  null;  
    static{
        if(factory==null){
              factory=new ConnectionFactory(); //创建连接工厂
              factory.setHost(PassportConfig.getString("mq_ip"));  //ip
              factory.setVirtualHost(PassportConfig.getString("mq_virtualhost"));  //虚拟机
              factory.setUsername(PassportConfig.getString("mq_name"));         
              factory.setPassword(PassportConfig.getString("mq_password"));
              factory.setPort(5672);    
        }

    }
    
    public static ConnectionFactory getfactory() {
        return factory;

    }
    
    public static void send(String message)throws  Exception{
        //创建一个新的连接
        Connection connection = getfactory().newConnection();
        //创建一个通道
        Channel channel = connection.createChannel();
        // 绑定一个队列
        channel.queueBind(PassportConfig.getString("mq_queue_name"), PassportConfig.getString("mq_exchange_name"), "");
        //发送消息到队列中
        channel.basicPublish("", PassportConfig.getString("mq_queue_name"), null, message.getBytes("UTF-8"));
        System.out.println("发送到MQ的消息为 +'" + message + "'");
        //关闭通道和连接
        channel.close();
        connection.close();             
    }

    public static void Receive()throws Exception{
        //创建一个新的连接
        Connection connection = getfactory().newConnection();
        //创建一个通道
        Channel channel = connection.createChannel();
        // 绑定一个队列
        channel.queueBind(PassportConfig.getString("mq_queue_name"), PassportConfig.getString("mq_exchange_name"), "");     
        QueueingConsumer consumer = new QueueingConsumer(channel) ;  
        channel.basicConsume(PassportConfig.getString("mq_queue_name"), true, consumer) ;      
        //循环获取消息  
        while(true){           
            //获取消息,如果没有消息,这一步将会一直阻塞  
            Delivery delivery = consumer.nextDelivery() ;              
            String msg = new String(delivery.getBody()) ;                 
            System.out.println("接收消息为[" + msg + "] 来自于队列: " + PassportConfig.getString("mq_queue_name"));  
        }
        
    }
    
     public static void main(String[] args) {
            try {
            //send("@#$%&&^^^^**!@@s上搜索谁谁谁水水水水水水水水水水水水1");
            Receive();
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
}

Spring 整合rabbitmq
applicationContext-mq.xml:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">
<!--<?xml version="1.0" encoding="UTF-8"?>-->
<!--<beans xmlns="http://www.springframework.org/schema/beans"-->
       <!--xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"-->
       <!--xmlns:rabbit="http://www.springframework.org/schema/rabbit"-->
       <!--xsi:schemaLocation="http://www.springframework.org/schema/beans-->
    <!--http://www.springframework.org/schema/beans/spring-beans-4.0.xsd-->
    <!--http://www.springframework.org/schema/rabbit-->
    <!--http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd" >-->

    <description>rabbitmq 连接服务配置</description>
    <!-- 连接配置 -->
    <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}"  virtual-host="${mq.vhost}"/>
    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- spring template声明-->
    <rabbit:template exchange="amqpExchange" id="amqpTemplate"  connection-factory="connectionFactory"  message-converter="jsonMessageConverter" />

    <!-- 消息对象json转换类 -->
    <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />

    <!--申明一个消息队列Queue-->
    <rabbit:queue id="test_queue_key" name="queue_house_bj" durable="true" auto-delete="false" exclusive="false" />

    <!-- 定义交换器,自动声明 -->
    <rabbit:fanout-exchange name="amqpExchange" auto-declare="true" durable="false">
        <rabbit:bindings>
            <rabbit:binding queue="test_queue_key"/>
        </rabbit:bindings>
    </rabbit:fanout-exchange>

    <bean id = "queueListenter" class="com.Consumer"></bean>

    <!--<bean id="rabbitTxManage" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">-->
        <!--<property name="connectionFactory" ref="cachingConnectionFactory"></property>-->
    <!--</bean>-->

    <!--监听配置-->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
        <rabbit:listener queues="test_queue_key" ref="queueListenter"/>
    </rabbit:listener-container>
</beans>

rabbitmq-config.properties

mq.host=10.1.7.255
mq.username=admin
mq.password=admin
mq.port=5672
mq.vhost=centerMQ

applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
                        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
                        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">
    <!-- ========================================spring注解扫描包路径=============================================== -->
    <!-- ========================================数据库连接参数文件===================================================== -->
    <context:property-placeholder location="classpath*:rabbitmq-config.properties"/>

    <bean id = "producer" class="com.ProducerImpl"></bean>

</beans>

生产者:
ProducerImpl

package com;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;


public class ProducerImpl implements Producer {
    private static final Logger logger = LoggerFactory.getLogger(ProducerImpl.class);
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendDataToQueue(String queueKey, Object object) {
        try {
            amqpTemplate.convertAndSend(queueKey, object);
//            amqpTemplate.convertAndSend(object);
        } catch (Exception e) {
            logger.error(e.getMessage());
        }
    }
}


消费者:
Consumer.java

package com;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;

/**
 * 
 */
public class Consumer implements ChannelAwareMessageListener{
//    public void onMessage(Message message) {
//        try {
//            System.out.println("============================1231231");
////            System.out.println(new String(message.getBody(), "UTF-8"));
//            System.out.println(message.toString());
//
//        }catch (Exception e){
//
//        }
//
//    }

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        System.out.println("============================1231231");
        System.out.println(new String(message.getBody(), "UTF-8"));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
}

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.bin.spring-amqp</groupId>
  <artifactId>spring-amqp-test</artifactId>
  <packaging>war</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>spring-amqp Maven Webapp</name>
  <url>http://maven.apache.org</url>

  <properties>
    <!-- spring版本号 -->
    <spring.framework.version>4.2.5.RELEASE</spring.framework.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
    </dependency>

    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.7</version>
    </dependency>

    <dependency>
      <groupId>org.springframework.amqp</groupId>
      <artifactId>spring-rabbit</artifactId>
      <version>1.7.3.RELEASE</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-core</artifactId>
      <version>${spring.framework.version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-context</artifactId>
      <version>${spring.framework.version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-beans</artifactId>
      <version>${spring.framework.version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-test</artifactId>
      <version>${spring.framework.version}</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-web</artifactId>
      <version>${spring.framework.version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-tx</artifactId>
      <version>${spring.framework.version}</version>
    </dependency>

  </dependencies>

  <repositories>
    <repository>
      <id>spring-snapshots</id>
      <name>Spring Snapshots</name>
      <url>https://repo.spring.io/libs-snapshot</url>
      <snapshots>
        <enabled>true</enabled>
      </snapshots>
    </repository>
  </repositories>

  <build>
    <finalName>spring-amqp</finalName>

    <resources>
      <resource>
        <directory>src/main/java</directory>
        <includes>
          <include>**/*.xml</include>
        </includes>
        <filtering>true</filtering>
      </resource>
      <resource>
        <directory>src/main/resources</directory>
      </resource>
    </resources>

    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>2.3.2</version>
        <configuration>
          <source>1.7</source>
          <target>1.7</target>
        </configuration>
      </plugin>

      <!-- Test -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.18.1</version>
        <configuration>
          <skipTests>true</skipTests>
        </configuration>
      </plugin>


      <!-- jetty插件 -->
      <!-- Idea 添加此插件以便启动jetty -->
      <plugin>
        <groupId>org.eclipse.jetty</groupId>
        <artifactId>jetty-maven-plugin</artifactId>
        <version>9.2.21.v20170120</version>
        <configuration>
          <httpConnector>
            <port>8082</port><!-- 启动端口号-->
          </httpConnector>
          <stopPort>9967</stopPort>
          <stopKey>stop</stopKey>
          <scanIntervalSeconds>10</scanIntervalSeconds>
          <webApp>
            <contextPath>/amqp</contextPath>
          </webApp>
          <!-- 适应测试阶段的jetty,日志写入目录,tomcat写入tomcat/logs下 -->
          <systemProperties>
            <systemProperty>
              <name>catalina.base</name>
              <value>.</value>
            </systemProperty>
          </systemProperties>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,937评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,503评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,712评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,668评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,677评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,601评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,975评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,637评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,881评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,621评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,710评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,387评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,971评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,947评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,189评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,805评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,449评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,590评论 18 139
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,343评论 2 34
  • 关于消息队列,从前年开始断断续续看了些资料,想写很久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时...
    预流阅读 584,381评论 51 785
  • 注:这份文档是我和几个朋友学习后一起完成的。 目录 RabbitMQ 概念 exchange交换机机制什么是交换机...
    Mooner_guo阅读 32,911评论 8 97
  • 什么叫消息队列 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂...
    lijun_m阅读 1,335评论 0 1