2022最新 Rabbitmq 单消费端和单发布端代码

基于默认的direct(即路由)模式

yml的rabbitmq配置


image.png

引入maven依赖


image.png

1.自定义消息类

package com.zkmeiling.biz.rabbitmq;

import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;
import java.util.Date;
import java.util.List;

/**
 * 箱任务博
 *
 * @author WeiWei
 * @date 2022/10/17
 */
@NoArgsConstructor
@Data
public class BoxTaskBo  {
    private String request;
    private Date time;
    private Date data;


}


2.开启手动ack模式并准备相关的处理回调函数
2.1---ConfirmCallback 回调函数类( 发布端----send消息--> broker[由交换机exchang和队列queue组成 {只要进入exchange就触发ConfirmCllback }] )

package com.zkmeiling.biz.rabbitmq;


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {


    /**
     * Confirmation callback.
     *
     * @param correlationData correlation data for the callback.
     * @param ack             true for ack, false for nack
     * @param cause           An optional cause, for nack, when available, otherwise null.
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            log.error("bbk样本库消息发送异常!");
        } else {
            log.info("bbk样本库发送者爸爸已经收到确认,correlationData={} ,ack={}, cause={}", correlationData, ack, cause);
        }
    }
}

2.2---ReturnsCallback 回调函数类( 发布端----send消息--> broker[由交换机exchang和队列queue组成 {进入exchange 但未投递到queqe就触发ReturnsCallback }] )

package com.zkmeiling.biz.rabbitmq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnsCallback{

    /**
     * Returned message callback.
     *
     * @param returned the returned message and metadata.
     */
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.info("bbk样本库returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", returned.getReplyCode(), returned.getReplyText(), returned.getExchange(), returned.getRoutingKey());
    }
}

3.发布端配置
下图是发布端服务(简单)相关目录结构

image.png

3.1发布端(生产端配置代码)

package com.zkmeiling.biz.rabbitmq;


import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

//声明交换机和需要绑定的队列(这部分也可单独拿出做一个rabbitmq配置供应端)
@Configuration
public class RabbitmqQueueConfig {


    /**
     * 声明样本库管理系统->工控系统队列 支持持久化
     */
    @Bean(name = "bbkToIpcQueue")
    public Queue BbkToIpcQueue() {
        return QueueBuilder.durable("zkmeiling.bbk.to.ipc").build();
    }

    //声明交换机
    @Bean(name = "directExchange")
    public DirectExchange directExchange() {
        return ExchangeBuilder.directExchange("zkmeiling.direct").durable(true).build();
    }

    //声明交换机和队列的绑定关系
    @Bean
    public Binding bbkToIpcBinding(
            @Qualifier("directExchange") DirectExchange directExchange,
            @Qualifier("bbkToIpcQueue") Queue bbkToIpcQueue) {
        return BindingBuilder.bind(bbkToIpcQueue).to(directExchange).with("bbk2ipc_routing_key");
    }

}

3.2---发布端具体发送消息代码

package com.zkmeiling.biz.rabbitmq;


import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.ClassMapper;
import org.springframework.amqp.support.converter.DefaultJackson2JavaTypeMapper;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.UUID;

@RestController
public class PublishController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ConfirmCallbackService confirmCallbackService;

    @Autowired
    private ReturnCallbackService returnCallbackService;



    @PostMapping("frozenBoxPutInStore")
    public void frozenBoxPutInStore(@RequestBody BoxTaskBo boxs){
        sendMessage("zkmeiling.direct","bbk2ipc_routing_key",boxs);
    }


    public void sendMessage(String exchange, String routingKey, BoxTaskBo msg) {

//        /**
//         * 确保消息发送失败后可以重新返回到队列中
//         * 注意:yml需要配置 publisher-returns: true
//         */
//        rabbitTemplate.setMandatory(true);

        /**
         * 消费者确认收到消息后,手动ack回执回调处理
         */
        rabbitTemplate.setConfirmCallback(confirmCallbackService);

        /**
         * 消息投递到队列失败回调处理
         */
        rabbitTemplate.setReturnsCallback(returnCallbackService);

        /**
         * 开启Jackson2JsonMessageConverter的消息转换器(进行msg编码操作)
         */
       rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());


        /**
         * 发送消息
         */
        rabbitTemplate.convertAndSend(exchange, routingKey, msg, message -> {
            //这里是对msg的aop做一些前置处理操作
            //下面是为了解决
  <----原理说明--->
            实体类转换异常

            failed to resolve class name.
            解决方案

            1.依赖系统1的jar包,直接使用类A来接收
            2.不依赖系统1的jar包,自己建一个和A一模一样的类,连名称,包路径都一样
            3.负责监听 queue 的类实现 MessageListener 接口,直接接收 Message 类,再自己转
                    显然都不够好也不是自己想要的

            在 JsonMessageConverter 的 fromMessage 方法中有这么一段:

            f (getClassMapper() == null) {
                JavaType targetJavaType = getJavaTypeMapper()
                        .toJavaType(message.getMessageProperties());
                content = convertBytesToObject(message.getBody(), encoding, targetJavaType);
            } else {
                Class<?> targetClass = getClassMapper().toClass(
                        message.getMessageProperties());
                content = convertBytesToObject(message.getBody(), encoding, targetClass);
            }
            就是说默认情况下,JsonMessageConverter 使用的 ClassMapper 是 DefaultJackson2JavaTypeMapper,在转换时通过 Message 的 Properties 来获取要转换的目标类的类型。通过 Debug 可以发现,目标类的类型是存储在 Message 的 Proterties 的 一个 headers 的 Map 中,Key 叫“__TypeId__”。所以只要想办法在传输消息时更改__TypeId__的值即可。
  <----原理说明--->


            message.getMessageProperties().getHeaders().put("__TypeId__","com.example.rabbitmqdemo.BoxTaskBo");
            return message;
        } ,new CorrelationData(UUID.randomUUID().toString()));
        System.out.println("bbk样本库exchange = " + exchange);
    }
}

4.消费端配置
下图是消费端服务(简单)相关目录结构

image.png

4.1---消费端配置代码

package com.example.rabbitmqdemo;


import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //设置开启Jackson2JsonMessageConverter
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //设置开启手动确认ack
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }
}

4.1---消费端具体接收消息消费业务代码

package com.example.rabbitmqdemo;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Slf4j
@Component
@RabbitListener(queues = "zkmeiling.bbk.to.ipc")
public class ReceiverMessage1 {

    @RabbitHandler
    public void processHandler(@Payload BoxTaskBo msg, Channel channel, Message message) throws IOException {

        try {
            log.info("ipc工业端小富收到消息:{}", msg);

            //TODO 具体业务

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        }  catch (Exception e) {

            if (message.getMessageProperties().getRedelivered()) {

                log.error("ipc工业端消息已重复处理失败,拒绝再次接收...");

                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
            } else {

                log.error("ipc工业端消息即将再次返回队列处理...");

                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }

}

验证下
1.postmain 模拟发送请求

image.png

2.发布端:发送成功日志


image.png

3.消费端: 接收成功日志


image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容