要开始创建Spring Cloud Stream应用程序

要开始创建Spring Cloud Stream应用程序,请访问Spring Initializr并创建一个名为“GreetingSource”的新Maven项目。在下拉菜单中选择Spring Boot {supported-spring-boot-version}。在“ 搜索依赖关系”文本框中键入Stream Rabbit或Stream Kafka,具体取决于您要使用的binder。

接下来,在与GreetingSourceApplication类相同的包中创建一个新类GreetingSource。给它以下代码:

import org.springframework.cloud.stream.annotation.EnableBinding;

import org.springframework.cloud.stream.messaging.Source;

import org.springframework.integration.annotation.InboundChannelAdapter;

@EnableBinding(Source.class)

public class GreetingSource {

    @InboundChannelAdapter(Source.OUTPUT)

    public String greet() {

        return "hello world " + System.currentTimeMillis();

    }

}

@EnableBinding注释是触发Spring Integration基础架构组件的创建。具体来说,它将创建一个Kafka连接工厂,一个Kafka出站通道适配器,并在Source界面中定义消息通道:

public interface Source {

  String OUTPUT = "output";

  @Output(Source.OUTPUT)

  MessageChannel output();

}

自动配置还创建一个默认轮询器,以便每秒调用greet()方法一次。标准的Spring Integration@InboundChannelAdapter注释使用返回值作为消息的有效内容向源的输出通道发送消息。

要测试驱动此设置,请运行Kafka消息代理。一个简单的方法是使用Docker镜像:

# On OS X

$ docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`docker-machine ip \`docker-machine active\`` --env ADVERTISED_PORT=9092 spotify/kafka

# On Linux

$ docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=localhost --env ADVERTISED_PORT=9092 spotify/kafka

构建应用程序:

./mvnw clean package

消费者应用程序以类似的方式进行编码。返回Initializr并创建另一个名为LoggingSink的项目。然后在与类LoggingSinkApplication相同的包中创建一个新类LoggingSink,并使用以下代码:

import org.springframework.cloud.stream.annotation.EnableBinding;

import org.springframework.cloud.stream.annotation.StreamListener;

import org.springframework.cloud.stream.messaging.Sink;

@EnableBinding(Sink.class)

public class LoggingSink {

    @StreamListener(Sink.INPUT)

    public void log(String message) {

        System.out.println(message);

    }

}

构建应用程序:

./mvnw clean package

要将GreetingSource应用程序连接到LoggingSink应用程序,每个应用程序必须共享相同的目标名称。启动这两个应用程序如下所示,您将看到消费者应用程序打印“hello world”和时间戳到控制台:

cd GreetingSource

java -jar target/GreetingSource-0.0.1-SNAPSHOT.jar --spring.cloud.stream.bindings.output.destination=mydest

cd LoggingSink

java -jar target/LoggingSink-0.0.1-SNAPSHOT.jar --server.port=8090 --spring.cloud.stream.bindings.input.destination=mydest

(不同的服务器端口可以防止两个应用程序中用于维护Spring Boot执行器端点的HTTP端口的冲突。)

LoggingSink应用程序的输出将如下所示:

[          main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8090 (http)

[          main] com.example.LoggingSinkApplication      : Started LoggingSinkApplication in 6.828 seconds (JVM running for 7.371)

hello world 1458595076731

hello world 1458595077732

hello world 1458595078733

hello world 1458595079734

hello world 1458595080735

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容