spring cloud 12消息驱动

一、spring cloud stream
使用场景:消息驱动的微服务应用。
同步的方式:http也是一种消息,包含消息头和消息体,请求响应模型。
异步的方式:消息中间件
对比reactive streams
publisher
subscriber
processor既是消费者也是生产者
topic

二、主要概念
• 应⽤模型:中间件核心
• Binder 抽象
• 持久化 发布/订阅⽀持:MQ的基本特性
• 消费分组⽀持:MQ的基本特性
• 分区⽀持:Kafka

三、基本概念
• Source: Stream 发送源
• 近义词: Producer、 Publisher
• Sink: Stream 接收器
• 近义词: Consumer、 Subscriber
• Processor:既是Producer也是Consumer??
消息模型的组件都可以看作是管道

四、编程模型
• 激活 : @EnableBinding
• @Configuration
• @EnableIntegration
• Source:发送源
• @Output
• MessageChannel:发布消息
通过JMS规范,无论是源还是接受端,都是从中间件(Kafka,rabbitMQ)中寻址,所以我们的消息中间件就像一个路由器,路由到我们的目的地。发送者->中间件->订阅者。

五、编程模型
• Sink:接收器,订阅者
• @Input就会有SubscribableChannel,订阅管道,订阅某个消息。
一个SubscribableChannel能订阅几个topic?

三种监听topic的写法:
• SubscribableChannel:订阅管道
• @ServiceActivator:监听器
• @StreamListener:监听器

六、整合kafka


image.png

1,启动zookeeper,端口2181
进到zookeeper bin目录下
sh zkServer.sh start

2,Kafka:启动kafka
进到Kafka bin目录下
sh kafka-server-start.sh ../config/server.properties

3,修改UserApi下的User对象,增加序列化ID
private static final long serialVersionUID = 4050384138695906201L;

4,改造user-service-client,消息发送器(采用Kafka原生API)
(1)引入kafka依赖,这个是spring整合kafka的依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

(2)利用kafkaTemplate实现消息发送,spring的Template套路,JDBCTemplate,RedisTemplate,hibernateTemplate,RestTemplate,在UserServiceClientController添加如下代码:

    private final KafkaTemplate<String, Object> kafkaTemplate;

    @Autowired
    public UserServiceClientController(KafkaTemplate<String, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }


    @PostMapping("/saveUserByMessage")
    public boolean save(@RequestBody User user) {

        //topic是string,传输对象user是Object类型
        ListenableFuture<SendResult<String, Object>> future =
                kafkaTemplate.send("sf-users",0,user);

        return future.isDone();
    }

(3)实现java 序列化器,com.xixi.spring.cloud12.server.user.ribbon.client.serializer.ObjectSerializer

package com.xixi.spring.cloud12.server.user.ribbon.client.serializer;

import org.apache.kafka.common.serialization.Serializer;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.Map;

/**将对象转化成字节数组
 * user 序列化器
 */
public class ObjectSerializer implements Serializer<Object> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    /**
     * user 对象序列化过程:User对象转成byte[](二进制字节流)
     * @param topic
     * @param data
     * @return
     */
    @Override
    public byte[] serialize(String topic, Object data) {

        System.out.println("topic : "+topic+",object "+data);

        byte[] objectArray = null;

        //构造ObjectOutputStream的时候必须要传一个OutputStream,所以先new一个ByteArrayOutputStream
        OutputStream outputStream = new ByteArrayOutputStream();


        try {
            ObjectOutputStream objectOutputStream =
                    new ObjectOutputStream(outputStream);

            objectOutputStream.writeObject(data);//将对象写入outputStream

            objectArray = ((ByteArrayOutputStream) outputStream).toByteArray();//将对象赋值给userArray


        } catch (IOException e) {
            e.printStackTrace();
        }


        return objectArray;
    }

    @Override
    public void close() {

    }
}

(4)顺序启动eureka,config server
(5)用kafka-console-consumer订阅一个topic
sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sf-users
然后我们客户端发送这个topic,kafka-console-consumer就能监控得到。

5,改造user-service-provider,消息接收器(采用Stream binder的方式)
(1)引入依赖:这个是spring-cloud-stream整合kafka的依赖,和上面那个不同哦。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

(2)定义用户消息接口

package com.xixi.spring.cloud12.service.stream;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

/**
 * 用户消息stream 接口
 *
 * sink,@Input,SubscribableChannel
 *
 *
 */
public interface UserMessage {

    @Input
    SubscribableChannel input();
}

(3)激活用户消息Stream接口

@EnableHystrix
@SpringBootApplication
@EnableDiscoveryClient
@EnableBinding(UserMessage.class)//激活stream binding 到UserMessage
public class UserServiceProviderApplication {

    public static void main(String[] args) {

        SpringApplication.run(UserServiceProviderApplication.class,args);
    }
}

(4)在application.properties中配置kafka以及stream destination

###spring cloud stream集成kafka配置
###spring cloud stream binding配置sink管道的topic
#spring.cloud.stream.bindings.${channelid}
###destination指定Kafka topic
spring.cloud.stream.bindings.input.destination = sf-users
##Kafka的配置:
# 配置消息中间件服务,Kafka默认端口9092,localhost:9092
spring.kafka.bootstrap-servers = localhost:9092
spring.kafka.consumer.group-id = sf-group
spring.kafka.consumer.clientId = user-service-provider

配置
• Source : spring.cloud.stream.bindings.{source}.* • Sink : spring.cloud.stream.bindings.{sink}.*

(5)用户消息监听器
1,第一种实现方式:通过SubscribableChannel实现

package com.xixi.spring.cloud12.service.provider.service;

import com.xixi.spring.cloud12.server.api.UserService;
import com.xixi.spring.cloud12.server.domain.User;
import com.xixi.spring.cloud12.service.stream.UserMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;

import static com.xixi.spring.cloud12.service.stream.UserMessage.INPUT;

/**
 * 用户消息-服务
 * 当接收到消息的时候,listen()和init()都是回调,是交替执行
 */
@Service
public class UserMessagingService {

    @Autowired
    private UserMessage userMessage;

    @Autowired
    @Qualifier("inMemoryUserService")
    private UserService userService;

    /**第一种实现方式:SubscribableChannel
     * 订阅者接收到消息以后存起来
     */
    @PostConstruct
    public void init(){
        SubscribableChannel subscribableChannel = userMessage.input();
        //消息订阅回调,消费消息
        subscribableChannel.subscribe(message -> {
            //这里的message body应该是字节流,因为我们从user-service-client传的user对象在序列化成字节流以后传过来的
            System.out.println(message);
            byte[] body = (byte[]) message.getPayload();//就是message body
            saveUser(body);
        });

    }




    private void saveUser(byte[] body){
        //反序列化后输出
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body);
        ObjectInputStream objectInputStream = null;
        try {
            objectInputStream = new ObjectInputStream(byteArrayInputStream);
            User user = (User) objectInputStream.readObject();
            System.out.println(user.getId()+","+user.getName());
            //保存user
            userService.saveUser(user);


        } catch (Exception e) {
            throw new RuntimeException();
        }

    }

}

2,第二种实现方式:
• @ServiceActivator:监听器

    //第二种实现方式
    //input是channel的ID
    @ServiceActivator(inputChannel = INPUT)
    public void listen(byte[] data){
        System.out.println("listen from @ServiceActivator...");
        saveUser(data);
    }

3,第三种实现方式:
• @StreamListener:监听器

    //第三种实现方式:
    @StreamListener(INPUT)
    public void onMessage(byte[] data){
        System.out.println("listen from @StreamListener...");
        saveUser(data);
    }

总结:
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.SubscribableChannel;
三个callback交替执行,机会均等。
这三种实现只用到了Kafka的destination,在配置文件中
spring.cloud.stream.bindings.user-message.destination = sf-users
spring cloud stream屏蔽了消息中间件的具体实现(这里是Kafka),将来如果你换成其他的中间件比如rabbitMQ,也不需要改太多的代码。
spring cloud stream的优势:解耦

七、整合rabbitMQ
1,改造user-service-client,消息发送器(采用Stream binder的方式,用消息管道进行消息发送)
(1)增加依赖org.springframework.cloud:spring-cloud-stream-binder-rabbit
(2)配置发送源管道

###spring cloud stream集成rabbit配置
###destination指定 rabbit topic,user-message-out是管道名称
spring.cloud.stream.bindings.user-message-out.destination = sf-users

(3)增加用户消息输出源stream接口

package com.xixi.spring.cloud12.server.user.ribbon.client.stream;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

import java.nio.channels.Channel;

/**
 * 用户消息 Stream接口
 */
public interface UserMessage {


    String OUTPUT= "user-message-out";

    @Output(value = OUTPUT)
    MessageChannel output();

}

(4)激活@EnableBinding

@EnableBinding(UserMessage.class)
public class UserServiceClientApplication

(5)发送请求到rabbitmq

@Autowired
    private UserMessage userMessage;
    @Autowired
    private ObjectMapper objectMapper;

    @PostMapping("/saveUserByRabbit")
    public boolean saveUserByRabbit(@RequestBody User user) throws JsonProcessingException {

        //topic是string,传输对象user是Object类型
        MessageChannel messageChannel = userMessage.output();
        //将user对象序列化成JSON
        String payload =objectMapper.writeValueAsString(user);
        GenericMessage<String> message = new GenericMessage<String>(payload);
        //发送消息到rabbitmq
        return messageChannel.send(message);
    }

(6)启动rabbitmq
./rabbitmq-server

重点
user-service-client的channel:user-message-out
user-service-provider的channel:user-message
因为它们监听的同一个topic,所以客户端发送的消息能够被user-service-provider监听到。

2,改造user-service-provider,消息接收器(采用Stream binder的方式)
(1)替换Kafka的依赖为rabbitmq,
依赖org.springframework.cloud:spring-cloud-stream-binder-rabbit

(2)配置文件无需修改

(3)修改UserMessagingService,因为这次传过来的消息是String,之前写的是byte[],所以加个处理方式。

@Autowired
    private ObjectMapper objectMapper;
    
    /**第一种实现方式:SubscribableChannel
     * 订阅者接收到消息以后存起来
     */
    @PostConstruct
    public void init()throws IOException {
        SubscribableChannel subscribableChannel = userMessage.input();
        //消息订阅回调,消费消息
        subscribableChannel.subscribe(message -> {
            //这里的message body应该是字节流,因为我们从user-service-client传的user对象。在序列化成字节流以后传过来的
            System.out.println("listen from subscribableChannel...");

            MessageHeaders messageHeaders = message.getHeaders();
            String contentType = messageHeaders.get("contentType",String.class);
            if ("text/plain".equals(contentType)){
                try {
                    saveUser2((String) message.getPayload());
                } catch (IOException e) {
                    throw new RuntimeException();
                }
            }else{
                byte[] body = (byte[]) message.getPayload();//就是message body
                saveUser(body);
            }
        });
    }

八、使用spring cloud stream,不需要关注你使用的是哪种中间件,只关注topic即可。强烈解耦!!!!我们的项目现在既有RPC调用,也有远程消息服务。远程消息服务适用于重要不紧急的功能,重要紧急就RPC调用,不重要也不紧急可以考虑定时任务。

九、问答
1,消息的一致性和实时性如何取舍?
消息是允许丢失的,与数据库有点不太一样,大多数消息中间件都有重试机制,同时消息不太强调实时性。比如聊天软件(spring boot聊天室websocket那节),看上去很快,其实不是实时的,是异步的。
消息中间件有持久化,相对于内存型(消费完就完事,不会存储),更加有保障,它会有一个持久化的过程。

2,spring cloud stream里面的消息分区是怎么实现的?
spring cloud stream消息分区依赖于消息中间件,比如说Kafka有分区,partition,
但是rabbitmq就没有分区。
比如数据库的分区,同一个表分别存在3个库里,比如,前10000号在库1,中间10000号在库2,最后10000在库3,根据偏移量分区。

一个请求过来的时候,从它的上下文中就应该知道它去访问哪个库,
oracle中每个库挂不同的磁盘,并行io,可提高读写的速度

3,如果我启动多个consumer进程,使用同样的consumer group进行实时聚合计算,动态增加consumer进程时,如何保证消费的消息仍在原来相同的consumer进程上?
队列模式:如果生产消费模型是点对点,producer A发出message A专门给consumer A用的,producer B专门给consumer B用的,consumer B是消费不到message A。
广播模式:如果是发布订阅者模式,一个消息有多个订阅者,一个producer A发出message A,有多个consumerA,consumerB,consumerC订阅者,大家都监听到这个消息,

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352

推荐阅读更多精彩内容