Spring Cloud Stream知识点盘点

本文对Spring Cloud Stream,做一个知识点盘点和总结,包括:

• 概念 • Stream注解 • Spring Cloud Integration(Spring Cloud Stream的底层)注解 • Spring Messaging(Spring消息编程模型)注解 • Spring Cloud Stream API

概念

group

组内只有1个实例消费。如果不设置group,则stream会自动为每个实例创建匿名且独立的group——于是每个实例都会消费。

组内单次只有1个实例消费,并且会轮询负载均衡。通常,在将应用程序绑定到给定目标时,最好始终指定consumer group。

destination binder

与外部消息系统通信的组件,为构造 Binding 提供了 2 个方法,分别是 bindConsumer 和 bindProducer ,它们分别用于构造生产者和消费者。Binder使Spring Cloud Stream应用程序可以灵活地连接到中间件,目前spring为kafka、rabbitmq提供binder。

destination binding

Binding 是连接应用程序跟消息中间件的桥梁,用于消息的消费和生产,由binder创建。

partition

TIPS

严格来说这个不是概念,而是一种Stream提高伸缩性、吞吐量的一种方式。不过不想另起标题了,写在这里吧。

一个或多个生产者将数据发送到多个消费者,并确保有共同特征标识的数据由同一个消费者处理。默认是对消息进行hashCode,然后根据分区个数取余,所以对于相同的消息,总会落到同一个消费者上。

注解

Input(Stream)

示例:

public interface Barista { @Input("inboundOrders") SubscribableChannel orders();}

作用:

• 用于接收消息 • 为每个binding生成channel实例 • 指定channel名称 • 在spring容器中生成一个名为inboundOrders,类型为SubscribableChannel的bean • 在spring容器中生成一个类,实现Barista接口。

Output(Stream)

示例:

public interface Source { @Output MessageChannel output();}

作用:

类似Input,只是用来生产消息。

StreamListener(Stream)

示例:

@StreamListener(value = Sink.INPUT, condition = "headers['type']=='dog'")public void handle(String body) { System.out.println("Received: " + body);}

@Bean@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "2"))public MessageSource<String> test() { return () -> { Map<String, Object> map = new HashMap<>(1); map.put("type", "dog"); return new GenericMessage<>("abcdef", map); };}

作用:

用于消费消息

condition的作用:符合条件,才进入处理方法。

condition起作用的两个条件:

• 注解的方法没有返回值 • 方法是一个独立方法,不支持Reactive API

SendTo(messaging)

示例:

// 接收INPUT这个channel的消息,并将返回值发送到OUTPUT这个channel@StreamListener(Sink.INPUT)@SendTo(Source.OUTPUT)public String receive(String receiveMsg) { return "handle...";}

作用:

用于发送消息

InboundChannelAdapter(Integration)

示例:

@Bean@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1"))public MessageSource<String> test() { return () -> new GenericMessage<>("Hello Spring Cloud Stream");}

作用:

表示让定义的方法生产消息。

注:用 InboundChannelAdapter 注解的方法上即使有参数也没用。即下面test方法不要有参数。

• fixedDelay:多少毫秒发送1次 • maxMessagesPerPoll:一次发送几条消息。

ServiceActivator(Integration)

示例:

@ServiceActivator(inputChannel = Sink.INPUT, outputChannel = Source.OUTPUT)public String transform(String payload) { return payload.toUpperCase();}

作用:

表示方法能够处理消息或消息有效内容,监听input消息,用方法体的代码处理,然后输出到output中。

Transformer(Integration)

示例:

@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)public Object transform(String message) { return message.toUpperCase();}

作用:

和 ServiceActivator 类似,表示方法能够转换消息,消息头,或消息有效内容

PollableMessageSource(Stream)

示例代码:

@SpringBootApplication@EnableBinding({ConsumerApplication.PolledProcessor.class})@EnableSchedulingpublic class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); }

@Autowired private PolledProcessor polledProcessor;

@Scheduled(fixedDelay = 5_000) public void poll() { polledProcessor.input().poll(message -> { byte[] bytes = (byte[]) message.getPayload(); String payload = new String(bytes); System.out.println(payload); }); }

public interface PolledProcessor { @Input PollableMessageSource input();

@Output MessageChannel output(); }

@Bean @InboundChannelAdapter(value = "output", poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1")) public MessageSource<String> test() { return () -> { Map<String, Object> map = new HashMap<>(1); map.put("type", "dog"); return new GenericMessage<>("adfdfdsafdsfa", map); }; }}

如果不想自己做byte数组转换,可以添加配置:

spring: cloud: stream: bindings: output: # 指定content-type content-type: text/plain

作用:

允许消费者控制消费速率。

需要java学习路线图的私信笔者“java”领取哦!另外喜欢这篇文章的可以给笔者点个赞,关注一下,每天都会分享Java相关文章!还有不定时的福利赠送,包括整理的学习资料,面试题,源码等~~

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,172评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,346评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,788评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,299评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,409评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,467评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,476评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,262评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,699评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,994评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,167评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,499评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,149评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,387评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,028评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,055评论 2 352

推荐阅读更多精彩内容