(十七) 整合spring cloud云架构 -消息驱动 Spring Cloud Stream

在使用spring cloud云架构的时候,我们不得不使用Spring cloud Stream,因为消息中间件的使用在项目中无处不在,我们公司后面做了娱乐方面的APP,在使用spring cloud做架构的时候,其中消息的异步通知,业务的异步处理都需要使用消息中间件机制。spring cloud的官方给出的集成建议(使用rabbit mq和kafka),我看了一下源码和配置,只要把rabbit mq集成,kafka只是换了一个pom配置jar包而已,闲话少说,我们就直接进入配置实施:


1. 简介:

Spring cloud Stream数据流操作开发包,封装了与Redis,Rabbit、Kafka等发送接收消息。


2. 使用工具:

rabbit,具体的下载和安装细节我这里不做太多讲解,网上的实例太多了


3. 创建commonservice-mq-producer消息的发送者项目,在pom里面配置stream-rabbit的依赖

<span style="font-size: 16px;"><!-- 引入MQ消息驱动的微服务包,引入stream只需要进行配置化即可,是对rabbit、kafka很好的封装 -->  

<dependency>  

    <groupId>org.springframework.cloud</groupId>  

    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>  

</dependency></span>  

 4. 在yml文件里面配置rabbit mq

<span style="font-size: 16px;">server:  

port:5666  

spring:  

  application:  

    name: commonservice-mq-producer  

  profiles:   

    active: dev  

  cloud:  

    config:  

      discovery:   

enabled:true  

        service-id: commonservice-config-server  

    stream:  

      bindings:  

        mqScoreOutput:   

          destination: honghu_exchange  

          contentType: application/json  

  rabbitmq:  

     host: localhost  

port:5672  

     username: honghu  

     password: honghu</span>  

eureka:   

  client:  

    service-url:  

defaultZone: http://honghu:123456@localhost:8761/eureka  

  instance:  

prefer-ip-address:true</span>  

 5. 定义接口ProducerService

<span style="font-size: 16px;">package com.honghu.cloud.producer;  


import org.springframework.cloud.stream.annotation.Output;  

import org.springframework.messaging.SubscribableChannel;  


public interface ProducerService {  


String SCORE_OUPUT ="mqScoreOutput";  


@Output(ProducerService.SCORE_OUPUT)  

    SubscribableChannel sendMessage();  

}</span>  

 6. 定义绑定

<span style="font-size: 16px;">package com.honghu.cloud.producer;  


import org.springframework.cloud.stream.annotation.EnableBinding;  


@EnableBinding(ProducerService.class)  

public class SendServerConfig {  


}</span>  

 7. 定义发送消息业务ProducerController

<span style="font-size: 16px;">package com.honghu.cloud.controller;  

import org.springframework.beans.factory.annotation.Autowired;  

import org.springframework.integration.support.MessageBuilder;  

import org.springframework.messaging.Message;  

import org.springframework.web.bind.annotation.PathVariable;  

import org.springframework.web.bind.annotation.RequestBody;  

import org.springframework.web.bind.annotation.RequestMapping;  

import org.springframework.web.bind.annotation.RequestMethod;  

import org.springframework.web.bind.annotation.RestController;  


import com.honghu.cloud.common.code.ResponseCode;  

import com.honghu.cloud.common.code.ResponseVO;  

import com.honghu.cloud.entity.User;  

import com.honghu.cloud.producer.ProducerService;  


import net.sf.json.JSONObject;  


@RestController  

@RequestMapping(value = "producer")  

public class ProducerController {  


@Autowired  

private ProducerService producerService;  



/**

     * 通过get方式发送</span>对象<span style="font-size: 16px;">

     * @param name 路径参数

     * @return 成功|失败

     */  

@RequestMapping(value = "/sendObj", method = RequestMethod.GET)  

public ResponseVO sendObj() {  

User user =new User(1, "hello User");  

boolean result = producerService.sendMessage().send(msg);  

if(result){  

return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false);  

        }  

return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false);  

    }  



/**

     * 通过get方式发送字符串消息

     * @param name 路径参数

     * @return 成功|失败

     */  

@RequestMapping(value = "/send/{name}", method = RequestMethod.GET)  

public ResponseVO send(@PathVariable(value = "name", required = true) String name) {  

        Message msg = MessageBuilder.withPayload(name.getBytes()).build();  

boolean result = producerService.sendMessage().send(msg);  

if(result){  

return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false);  

        }  

return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false);  

    }  


/**

     * 通过post方式发送</span>json对象<span style="font-size: 16px;">

     * @param name 路径参数

     * @return 成功|失败

     */  

@RequestMapping(value = "/sendJsonObj", method = RequestMethod.POST)  

public ResponseVO sendJsonObj(@RequestBody JSONObject jsonObj) {  

        Message<JSONObject> msg = MessageBuilder.withPayload(jsonObj).build();  

boolean result = producerService.sendMessage().send(msg);  

if(result){  

return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false);  

        }  

return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false);  

    }  

}  

</span>  


8. 创建commonservice-mq-consumer1消息的消费者项目,在pom里面配置stream-rabbit的依赖

<!-- 引入MQ消息驱动的微服务包,引入stream只需要进行配置化即可,是对rabbit、kafka很好的封装 -->  

<dependency>  

    <groupId>org.springframework.cloud</groupId>  

    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>  

</dependency>  


 9. 在yml文件中配置:

server:  

port:5111  

spring:  

  application:  

    name: commonservice-mq-consumer1  

  profiles:   

    active: dev  

  cloud:  

    config:  

      discovery:   

enabled:true  

        service-id: commonservice-config-server  


      bindings:  

        mqScoreInput:  

          group: honghu_queue  

          destination: honghu_exchange  

          contentType: application/json  


  rabbitmq:  

     host: localhost  

port:5672  

     username: honghu  

     password: honghu</span>  

eureka:   

  client:  

    service-url:  

defaultZone: http://honghu:123456@localhost:8761/eureka  

  instance:  

prefer-ip-address:true  


9. 定义接口ConsumerService

package com.honghu.cloud.consumer;  


import org.springframework.cloud.stream.annotation.Input;  

import org.springframework.messaging.SubscribableChannel;  


public interface ConsumerService {  



@Input(ConsumerService.SCORE_INPUT)  

    SubscribableChannel sendMessage();</span>  


}  


10. 定义启动类和消息消费

package com.honghu.cloud;  

import org.springframework.boot.SpringApplication;  

import org.springframework.boot.autoconfigure.SpringBootApplication;  

import org.springframework.cloud.netflix.eureka.EnableEurekaClient;  

import org.springframework.cloud.stream.annotation.EnableBinding;  

import org.springframework.cloud.stream.annotation.StreamListener;  


import com.honghu.cloud.consumer.ConsumerService;  

import com.honghu.cloud.entity.User;  


@EnableEurekaClient  

@SpringBootApplication  

@EnableBinding(ConsumerService.class) //可以绑定多个接口  

public class ConsumerApplication {  


public static void main(String[] args) {  

SpringApplication.run(ConsumerApplication.class, args);  

    }  


public void onMessage(Object obj) {  

System.out.println("消费者1,接收到的消息:" + obj);  

    }</span>  


}  

11. 分别启动commonservice-mq-producer、commonservice-mq-consumer1

12. 通过postman来验证消息的发送和接收

可以看到接收到了消息,下一章我们介绍mq的集群方案。


到此,整个消息中心方案集成完毕(企业架构源码可以加求球:三五三六二四七二五九)!!


欢迎大家和我一起学习spring cloud构建微服务云架构,我这边会将近期研发的spring cloud微服务云架构的搭建过程和精髓记录下来,帮助更多有兴趣研发spring cloud框架的朋友,大家来一起探讨spring cloud架构的搭建过程及如何运用于企业项目。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,039评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,223评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,916评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,009评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,030评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,011评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,934评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,754评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,202评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,433评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,590评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,321评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,917评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,568评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,738评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,583评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,482评论 2 352

推荐阅读更多精彩内容