stream这个项目让我们不必通过繁琐的自定义ampq来建立exchange,通道名称,以及队列名称和路由方式。只需要简单几步我们就轻松使用stream完成推送到rabbitmq和kafafa,并完成监听工作。
首先,stream提供了默认的输入和输出通过。如果我们不需要多个通道,可以通过@Enbalebing(Sink.Class)来绑定输入通道。对应的application里面的
# rabbitmq默认地址配置
rabbitmq:
host: asdf.me
port: 5672
username: guest
password: guest
cloud:
stream:
bindings:
input:
destination: push-exchange
output:
destination: push-exchange
这样会自动建立一个exchange为push-exchange名字的输出通道。同理@Enbalebing(Input.Class)是绑定输入通道的。下面创建一个生产者和消费者:
@EnableBinding(Source.class)
public class Producer {
@Autowired
@Output(Source.OUTPUT)
private MessageChannel channel;
public void send() {
channel.send(MessageBuilder.withPayload("22222222222" + UUID.randomUUID().toString()).build());
}
@EnableBinding(Sink.class)
public class Consumer {
@StreamListener(Sink.INPUT)
public void receive(Message<String> message) {
System.out.println("接收到MQ消息:" + JSONObject.toJSONString(message));
}
}
stream默认提供的消费者和生产者接口:
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}
public interface Source {
String OUTPUT = "output";
@Output("output")
MessageChannel output();
}
可以看出,会去找到我们在application.yaml里面定义的input,output下面的destination。分别作为输入和输出通道。我们也可以自己定义接口来实现:
String WS_INPUT = "ws-consumer";
String EMAIL_INPUT = "email-consumer";
String SMS_INPUT = "sms-consumer";
@Input(MqMessageInputConfig.EMAIL_INPUT)
SubscribableChannel emailChannel();
@Input(MqMessageInputConfig.WS_INPUT)
SubscribableChannel wsChannel();
@Input(MqMessageInputConfig.SMS_INPUT)
SubscribableChannel smChannel();
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface MqMessageOutputConfig {
String MESSAGE_OUTPUT = "message-producter";
@Output(MqMessageOutputConfig.MESSAGE_OUTPUT)
MessageChannel outPutChannel();
}
坑1.需要注意的是,最好不要自定义输入输出在同一个类里面。这样,如果我们只调用生产者发送消息。会导致提示Dispatcher has no subscribers for channel。并且会让我们发送消息的次数莫名减少几次。详细情况可以查看gihub官方issue,也给出的这种解决方法 官方解决方式
建立一个testjunit类,然后使用生产者发送消息。消费者监听队列获取消息。
接收到MQ消息:{"headers":{"amqp_receivedDeliveryMode":"PERSISTENT","amqp_receivedRoutingKey":"my-test-channel","amqp_receivedExchange":"my-test-channel","amqp_deliveryTag":1,"amqp_consumerQueue":"my-test-channel.anonymous.vYA2O6ZSQE-S9MOnE0ZoJQ","amqp_redelivered":false,"id":"805e7fc3-a046-e07a-edf5-def58d9c8eab","amqp_consumerTag":"amq.ctag-QwsmRKg5f0DGSp-7wbpYxQ","contentType":"text/plain","timestamp":1523930106483},"payload":"22222222222a7d24456-5b11-4c25-9270-876e7bbc556a"}
坑2.stream生成的exchang默认是topic模式。就是按照前缀匹配,发送消息给对应的队列。
image
image