SpringCloud 集成RabbitMQ使用

(一)简单使用

  • 1、配置pom包,主要是添加spring-boot-starter-amqp的支持
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

  • 2、配置文件
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

  • 3、发送者
public class HelloSender {

// spring boot 为我们提供的包装类
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "hello " + new Date();
        System.out.println("Sender : " + context);
// 调用 发送消息的方法 
        this.rabbitTemplate.convertAndSend("hello", context);
    }

}

  • 4、接受者
    //1\. @RabbitListener(queues = "myQueue") // 不能自动创建队列
    //2\. 自动创建队列 @RabbitListener(queuesToDeclare = @Queue("myQueue"))
    //3\. 自动创建, Exchange和Queue绑定
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("myQueue"),
            exchange = @Exchange("myExchange")
    ))
    public void process(String message) {
        log.info("MqReceiver: {}", message);
    }

    /**
     * 数码供应商服务 接收消息
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange("myOrder"),
            key = "computer",
            value = @Queue("computerOrder")
    ))
    public void processComputer(String message) {
        log.info("computer MqReceiver: {}", message);
    }

    /**
     * 水果供应商服务 接收消息
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange("myOrder"),
            key = "fruit",
            value = @Queue("fruitOrder")
    ))
    public void processFruit(String message) {
        log.info("fruit MqReceiver: {}", message);
    }

  • Test

/**
 * 发送mq消息测试
 */
@Component
public class MqSenderTest extends OrderApplicationTests {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Test
    public void send() {
        amqpTemplate.convertAndSend("myQueue", "now " + new Date());
    }

    @Test
    public void sendOrder() {
        amqpTemplate.convertAndSend("myOrder", "computer", "now " + new Date());
    }
}

使用 spring cloud stream 操作 rabbitmq

  • Maven依赖
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

  • 使用系统提供 Sink ,创建接收器
    然后创建一个名为SinkReceiver的类,用来接收RabbitMQ发送来的消息,如下:
@EnableBinding(Sink.class)
public class SinkReceiver {
    private static Logger logger = LoggerFactory.getLogger(StreamHelloApplication.class);
    @StreamListener(Sink.INPUT)
    public void receive(Object playload) {
        logger.info("Received:"+playload);
    }
}

首先使用了@EnableBinding注解实现对消息通道的绑定,我们在该注解中还传入了一个参数Sink.class,Sink是一个接口,该接口是Spring Cloud Stream中默认实现的对输入消息通道绑定的定义。然后我们在SinkReceiver类中定义了receive方法,并在该方法上添加了@StreamListener注解,该注解表示该方法为消息中间件上数据流的事件监听器,Sink.INPUT参数表示这是input消息通道上的监听处理器。

  • 测试

    image
  • 自定义消息通道
    前面提到了Sink和Source两个接口,这两个接口中分别定义了输入通道和输出通道,而Processor通过继承Source和Sink,同时具有输入通道和输出通道。这里我们就模仿Sink和Source,来定义一个自己的消息通道。

首先我们定义一个接口叫做MySink,如下:

public interface MySink {
    String INPUT = "mychannel";

    @Input(INPUT)
    SubscribableChannel input();
}

这里我们定义了一个名为mychannel的消息输入通道,@Input注解的参数则表示了消息通道的名称,同时我们还定义了一个方法返回一个SubscribableChannel对象,该对象用来维护消息通道订阅者。然后,我们再定义一个名为MySource的接口,如下:

public interface MySource {
    @Output(MySink.INPUT)
    MessageChannel output();
}

@Output注解中描述了消息通道的名称,还是mychannel,然后这里我们也定义了一个返回MessageChannel对象的方法,该对象中有一个向消息通道发送消息的方法。

最后我们定义一个消息接收类,如下:

@EnableBinding(value = {MySink.class})
public class SinkReceiver2 {
    private static Logger logger = LoggerFactory.getLogger(StreamHelloApplication.class);

    @StreamListener(MySink.INPUT)
    public void receive(Object playload) {
        logger.info("Received:" + playload);
    }
}

OK,我们在这里绑定消息通道,然后监听自定义的消息通道,最后来测试一下,如下:

@RestController
public class StreamHelloApplicationTests {

    @Autowired
    private MySource mySource;

   @GetMapping("testMyStream")
    public void contextLoads() {
        mySource.output().send(MessageBuilder.withPayload("hello 123").build());
    }
}

如果想要发送对象也可以直接发送,不用进行对象转换,如下:

发送:

User user= new User (1234, "唐志强", "牛逼");
mySource.output().send(MessageBuilder.withPayload(user).build());

接收:

@StreamListener(MySink.INPUT)
public void receive(User user ) {
    logger.info("Received:" + user );
}

如果我们想要在接收成功后给一个回执,也是OK的,如下:

@StreamListener(MySink.INPUT)
@SendTo(Source.OUTPUT)//定义回执发送的消息通道
public String receive(Book playload) {
    logger.info("Received:" + playload);
    return "receive msg :" + playload;
}

方法的返回值就是回执消息,回执消息在系统默认的output通道中,我们如果想要接收这个消息,当然就要监听这个通道,如下:

@StreamListener(Source.OUTPUT)
public void receive2(String msg) {
    System.out.println("msg:"+msg);
}

  • rabbitmq 控制面板查看发送的消息,以便判断信息的正确性
spring:
    stream:
      bindings:
        myMessage:
          group: order
          content-type: application/json # 设置消息发送后为 json 格式,以便控制台查看,接收方会自动转换为 具体的消息数据格式

  • 消费组
    由于我们的服务可能会有多个实例同时在运行,如果不做任何设置,此时发送一条消息将会被所有的实例接收到,但是有的时候我们可能只希望消息被一个实例所接收,这个需求我们可以通过消息分组来解决。方式很简单,给项目配置消息组,如下:
spring.cloud.stream.bindings.mychannel.group=order_group

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352

推荐阅读更多精彩内容