Spring Cloud Stream介绍

Spring Cloud Stream是创建消息驱动微服务应用的框架。Spring Cloud Stream是基于spring boot创建,用来建立单独的/工业级spring应用,使用spring integration提供与消息代理之间的连接。本文提供不同代理中的中间件配置,介绍了持久化发布订阅机制,以及消费组以及分割的概念。 将注解@EnableBinding加到应用上就可以实现与消息代理的连接,@StreamListener注解加到方法上,使之可以接收处理流的事件。

1.1 官方参考文档

原版:

http://docs.spring.io/spring-cloud-stream/docs/current-SNAPSHOT/reference/htmlsingle/#_main_concepts

翻译:

http://blog.csdn.net/phyllisy/article/details/51352868

1.2 API操作手册

1.2.1 生产者示例

PS:生产者yml配置


spring: 

  cloud: 

  stream:

  instanceCount: 3

  bindings:

  output_channel:  #输出  生产者 

  group: queue-1 #指定相同的exchange-1和不同的queue 表示广播模式 #指定相同的exchange和相同的queue表示集群负载均衡模式

  destination: exchange-1 # kafka:发布订阅模型里面的topic rabbitmq: exchange的概念(但是exchange的类型那里设置呢?) 

  binder: rabbit_cluster

  binders: 

  rabbit_cluster: 

  type: rabbit

  environment: 

  spring: 

  rabbitmq: 

  host: 192.168.1.27

  port: 5672

  username: guest

  password: guest

  virtual-host: /

PS: Barista接口为自定义管道


package bhz.spring.cloud.stream;

import org.springframework.cloud.stream.annotation.Input;

import org.springframework.cloud.stream.annotation.Output;

import org.springframework.messaging.MessageChannel;

import org.springframework.messaging.SubscribableChannel;

/**

 * <B>中文类名:</B><BR>

 * <B>概要说明:</B><BR>

* 这里的Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称。

* 通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。

 * @author bhz(Alienware)

 * @since 2015年11月22日

 */

public interface Barista {

 String INPUT_CHANNEL = "input_channel";

 String OUTPUT_CHANNEL = "output_channel";

 //注解@Input声明了它是一个输入类型的通道,名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。这一名字与上述配置app2的配置文件中position1应该一致,表明注入了一个名字叫做input_channel的通道,它的类型是input,订阅的主题是position2处声明的mydest这个主题 

 @Input(Barista.INPUT_CHANNEL)

 SubscribableChannel loginput();

 //注解@Output声明了它是一个输出类型的通道,名字是output_channel。这一名字与app1中通道名一致,表明注入了一个名字为output_channel的通道,类型是output,发布的主题名为mydest。 

 @Output(Barista.OUTPUT_CHANNEL)

 MessageChannel logoutput();

} 

PS: 生产者消息投递


package bhz.spring.cloud.stream;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.messaging.support.MessageBuilder;

import org.springframework.stereotype.Service;

@Service 

public class RabbitmqSender { 

 @Autowired

 private Barista source;

 // 发送消息 

 public String sendMessage(Object message){

 try{

 source.logoutput().send(MessageBuilder.withPayload(message).build());

 System.out.println("发送数据:" + message);

 }catch (Exception e){

 e.printStackTrace();

 }

 return null;

 }

} 

PS: Spring Boot应用入口


package bhz.spring.cloud.stream;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication 

@EnableBinding(Barista.class) 

public class ProducerApplication { 

 public static void main(String[] args) {

 SpringApplication.run(ProducerApplication.class, args);

 }

} 

1.2.2 消费者示例

PS:消费者yml配置


spring: 

  cloud: 

  stream:

  instanceCount: 3

  bindings:

  input_channel:  #输出  生产者 

  destination: exchange-1 # kafka:发布订阅模型里面的topic rabbitmq: exchange的概念(但是exchange的类型那里设置呢?) 

  group: queue-1 #指定相同的exchange-1和不同的queue 表示广播模式 #指定相同的exchange和相同的queue表示集群负载均衡模式

  binder: rabbit_cluster

  consumer:

  concurrency: 1

  rabbit: 

  bindings:

  input_channel:

  consumer:

  transacted:  **true**

  txSize: 10

  acknowledgeMode: MANUAL

  durableSubscription:  **true**

  maxConcurrency: 20

  recoveryInterval: 3000

  binders: 

  rabbit_cluster: 

  type: rabbit

  environment: 

  spring: 

  rabbitmq: 

  host: 192.168.1.27

  port: 5672

  username: guest

  password: guest

  virtual-host: /             

PS: Barista接口为自定义管道


package bhz.spring.cloud.stream;

import org.springframework.cloud.stream.annotation.Input;

import org.springframework.cloud.stream.annotation.Output;

import org.springframework.messaging.MessageChannel;

import org.springframework.messaging.SubscribableChannel;

/**

 * <B>中文类名:</B><BR>

 * <B>概要说明:</B><BR>

* 这里的Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称。

* 通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。

 * @author bhz(Alienware)

 * @since 2015年11月22日

 */

public interface Barista {

 String INPUT_CHANNEL = "input_channel";

 String OUTPUT_CHANNEL = "output_channel";

 //注解@Input声明了它是一个输入类型的通道,名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。这一名字与上述配置app2的配置文件中position1应该一致,表明注入了一个名字叫做input_channel的通道,它的类型是input,订阅的主题是position2处声明的mydest这个主题 

 @Input(Barista.INPUT_CHANNEL)

 SubscribableChannel loginput();

 //注解@Output声明了它是一个输出类型的通道,名字是output_channel。这一名字与app1中通道名一致,表明注入了一个名字为output_channel的通道,类型是output,发布的主题名为mydest。 

 @Output(Barista.OUTPUT_CHANNEL)

 MessageChannel logoutput();

} 

PS: 消费者消息获取


package bhz.spring.cloud.stream;

import java.io.IOException;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.amqp.rabbit.support.CorrelationData;

import org.springframework.amqp.support.AmqpHeaders;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.cloud.stream.annotation.EnableBinding;

import org.springframework.cloud.stream.annotation.StreamListener;

import org.springframework.cloud.stream.binding.ChannelBindingService;

import org.springframework.cloud.stream.config.ChannelBindingServiceConfiguration;

import org.springframework.cloud.stream.endpoint.ChannelsEndpoint;

import org.springframework.integration.channel.PublishSubscribeChannel;

import org.springframework.integration.channel.RendezvousChannel;

import org.springframework.messaging.Message;

import org.springframework.messaging.MessageChannel;

import org.springframework.messaging.SubscribableChannel;

import org.springframework.messaging.core.MessageReceivingOperations;

import org.springframework.messaging.core.MessageRequestReplyOperations;

import org.springframework.messaging.support.ChannelInterceptor;

import org.springframework.stereotype.Service;

import com.rabbitmq.client.Channel;

@EnableBinding(Barista.class)

@Service

public class RabbitmqReceiver { 

 @Autowired

 private Barista source;

 @StreamListener(Barista.INPUT_CHANNEL)

 public void receiver( Message message) {

 //广播通道

 //PublishSubscribeChannel psc = new PublishSubscribeChannel();

 //确认通道

 //RendezvousChannel rc = new RendezvousChannel();

 Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);

 Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);

 System.out.println("Input Stream 1 接受数据:" + message);

 try {

 channel.basicAck(deliveryTag, false);

 } catch (IOException e) {

 e.printStackTrace();

 }

 }

} 

PS: Spring Boot应用入口


package bhz.spring.cloud.stream;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.cloud.stream.annotation.EnableBinding;

import org.springframework.transaction.annotation.EnableTransactionManagement;

@SpringBootApplication 

@EnableBinding(Barista.class)

@EnableTransactionManagement

public class ConsumerApplication { 

 public static void main(String[] args) {

 SpringApplication.run(ConsumerApplication.class, args);

 }

} 

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。