RabbitMQ:Spring整合RabbitMQ

13F2925356653218AFBD71C70F752C7C.jpg

在前面的几篇博客里面已经把RabbitMQ的一些理论详细了说明了,在这一篇中将记录下Spring整合RabbitMQ,本文只是简单一个整合介绍,属于抛砖引玉,具体实现还需大家深入研究哈..

代码我会上传到我的码云上,如需下载请在文章的末尾寻找下载地址

1、POM引入

<!-- RabbitMQ -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.5.1</version>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>1.4.5.RELEASE</version>
</dependency>

2、RabbitMQ配置信息

添加rabbitmq.properties配置文件

rabbit.hosts=127.0.0.1
rabbit.username=hrabbit
rabbit.password=123
rabbit.port=5672
rabbit.virtualHost=/hrabbit
# 统一XML配置中易变部分的命名
rabbit.queue=rabbitmq_test

3、添加FastJson转化类

spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现

package www.hrabbit.cn.configer;


import com.alibaba.fastjson.JSON;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.*;

import java.io.IOException;
import java.io.UnsupportedEncodingException;


/**
 * @Auther: hrabbit
 * @Date: 2018-07-02 下午6:35
 * @Description:
 */


public class FastJsonMessageConverter  extends AbstractJsonMessageConverter {

    private static Log log = LogFactory.getLog(FastJsonMessageConverter.class);

    private static ClassMapper classMapper = new DefaultClassMapper();

    public FastJsonMessageConverter() {
        super();
    }

    @Override
    protected Message createMessage(Object object, MessageProperties messageProperties) {
        byte[] bytes = null;
        try {
            String jsonString = JSON.toJSONString(object);
            bytes = jsonString.getBytes(getDefaultCharset());
        } catch (IOException e) {
            throw new MessageConversionException("Failed to convert Message content", e);
        }
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        messageProperties.setContentEncoding(getDefaultCharset());
        if (bytes != null) {
            messageProperties.setContentLength(bytes.length);
        }
        classMapper.fromClass(object.getClass(), messageProperties);
        return new Message(bytes, messageProperties);
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        Object content = null;
        MessageProperties properties = message.getMessageProperties();
        if (properties != null) {
            String contentType = properties.getContentType();
            if (contentType != null && contentType.contains("json")) {
                String encoding = properties.getContentEncoding();
                if (encoding == null) {
                    encoding = getDefaultCharset();
                }
                try {
                    Class<?> targetClass = getClassMapper().toClass(message.getMessageProperties());
                    content = convertBytesToObject(message.getBody(), encoding, targetClass);
                } catch (IOException e) {
                    throw new MessageConversionException("Failed to convert Message content", e);
                }
            } else {
                log.warn("Could not convert incoming message with content-type [" + contentType + "]");
            }
        }
        if (content == null) {
            content = message.getBody();
        }
        return content;
    }

    private Object convertBytesToObject(byte[] body, String encoding, Class<?> clazz)
            throws UnsupportedEncodingException {
        String contentAsString = new String(body, encoding);
        return JSON.parseObject(contentAsString, clazz);
    }
}

4、添加amqp-application.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" >

    <description>rabbitmq 连接服务配置</description>

    <!-- 连接配置 -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbit.hosts}" username="${rabbit.username}" password="${rabbit.password}" port="${rabbit.port}"  virtual-host="${rabbit.virtualHost}"/>
    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->
    <bean id="jsonMessageConverter"  class="www.hrabbit.cn.util.FastJsonMessageConverter"></bean>

    <!-- spring template声明-->
    <rabbit:template exchange="koms" id="amqpTemplate"  connection-factory="connectionFactory" message-converter="jsonMessageConverter"/>

    <!--
        durable:是否持久化

        exclusive: 仅创建者可以使用的私有队列,断开后自动删除

        auto_delete: 当所有消费客户端连接断开后,是否自动删除队列
     -->

    <!--  申明一个消息队列Queue   -->
    <rabbit:queue id="order" name="order" durable="true" auto-delete="false" exclusive="false" />
    <!--
     rabbit:direct-exchange:定义exchange模式为direct,意思就是消息与一个特定的路由键完全匹配,才会转发。

    rabbit:binding:设置消息queue匹配的key
     -->
    <!-- 交换机定义 -->
    <rabbit:direct-exchange name="koms" durable="true" auto-delete="false" id="koms">
        <rabbit:bindings>
            <rabbit:binding queue="order" key="order"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!--
         queues:监听的队列,多个的话用逗号(,)分隔

        ref:监听器
     -->
    <bean class="www.hrabbit.cn.rabbitMq.listener.MessageListener" id="messageListener"></bean>
    <!-- 配置监听  acknowledeg = "manual"   设置手动应答  当消息处理失败时:会一直重发  直到消息处理成功 -->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
        <!-- 配置监听器 -->
        <rabbit:listener queues="order" ref="messageListener"/>
    </rabbit:listener-container>
</beans>

在这个项目中我的生产者和消费者是放到同一个项目中的。项目中的监听器,即为消费者。

5、生产者

注入AmqpTemplate模板,调用convertAndSend ();方法添加消息;

package www.hrabbit.cn.rabbitMq.service.impl;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Service;
import www.hrabbit.cn.rabbitMq.service.SpittleService;

import javax.annotation.Resource;
import java.util.LinkedHashMap;
import java.util.Map;

/**
 * @Auther: hrabbit
 * @Date: 2018-07-02 下午4:26
 * @Description:
 */
@Service("spittleService")
public class SpittleServiceImpl implements SpittleService {

    @Resource
    private AmqpTemplate amqpTemplate;

    /**
     * 生产消息
     * @return
     */
    public Map<String,Object> spittleMsg(){

        Map<String,Object> dataList = new LinkedHashMap<>();

        for (int i=0;i<10;i++){
            dataList.put("order","msgResult:"+i);
            amqpTemplate.convertAndSend("order","msgResult:"+i);
        }
        return dataList;
    }

}

6、添加监听器(即消费者)

package www.hrabbit.cn.rabbitMq.listener;

import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.io.IOException;

/**
 * @Auther: hrabbit
 * @Date: 2018-07-02 下午4:47
 * @Description:
 */
@Component
public class MessageListener implements ChannelAwareMessageListener {

    private Logger logger= LoggerFactory.getLogger(MessageListener.class);

    @Transactional
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        //业务处理,放到action层,并返回处理成功还是异常的flag
        //boolean mqFlag=rabbitMaConsumerTaskAction.saveMq(arg0);
        //还有一个点就是如何获取mq消息的报文部分message?
        String result=new String(message.getBody(),"UTF-8");
        System.out.println("消息:"+result);
        if(true){
            basicACK(message,channel);//处理正常--ack
        }else{
            basicNACK(message,channel);//处理异常--nack
        }
    }


    //正常消费掉后通知mq服务器移除此条mq
    private void basicACK(Message message,Channel channel){
        try{
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }catch(IOException e){
            logger.error("通知服务器移除mq时异常,异常信息:"+e);
        }
    }
    //处理异常,mq重回队列
    private void basicNACK(Message message,Channel channel) {
        try {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        } catch (IOException e) {
            logger.error("mq重新进入服务器时出现异常,异常信息:" + e);
        }
    }
}

7、启动项目,测试

访问地址:http://localhost:8080/amqp/spittleMsg生产了10条消息,此时查看控制台10条消息都被消费了!

image.png

项目地址:https://gitee.com/hrabbit/spring-rabbitMQ

系列文章:

RabbitMQ:RabbitMQ-理论基础
RabbitMQ:RabbitMQ:快速入门hello word
RabbitMQ:RabbitMQ:work queues 工作队列(Round-robin/Fair dispatch)
RabbitMQ:RabbitMQ:消息应答与消息持久化
RabbitMQ:发布/订阅 Publish/Subscribe
RabbitMQ:路由Routing
RabbitMQ:Topic类型的exchange
RabbitMQ:RabbitMQ之消息确认机制(事务+Confirm)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,904评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,581评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,527评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,463评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,546评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,572评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,582评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,330评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,776评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,087评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,257评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,923评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,571评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,192评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,436评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,145评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容