stream这个项目让我们不必通过繁琐的自定义ampq来建立exchange,通道名称,以及队列名称和路由方式。只需要简单几步我们就轻松使用stream完成推送到rabbitmq和kafafa,并完成监听工作。
首先,stream提供了默认的输入和输出通过。如果我们不需要多个通道,可以通过@Enbalebing(Sink.Class)来绑定输入通道。对应的application里面的
# rabbitmq默认地址配置
rabbitmq:
host: asdf.me
port: 5672
username: guest
password: guest
cloud:
stream:
bindings:
input:
destination: push-exchange
output:
destination: push-exchange
这样会自动建立一个exchange为push-exchange名字的输出通道。同理@Enbalebing(Input.Class)是绑定输入通道的。下面创建一个生产者和消费者:
@EnableBinding(Source.class)
public class Producer {
@Autowired
@Output(Source.OUTPUT)
private MessageChannel channel;
public void send() {
channel.send(MessageBuilder.withPayload("22222222222" + UUID.randomUUID().toString()).build());
}
@EnableBinding(Sink.class)
public class Consumer {
@StreamListener(Sink.INPUT)
public void receive(Message<String> message) {
System.out.println("接收到MQ消息:" + JSONObject.toJSONString(message));
}
}
stream默认提供的消费者和生产者接口:
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}
public interface Source {
String OUTPUT = "output";
@Output("output")
MessageChannel output();
}
可以看出,会去找到我们在application.yaml里面定义的input,output下面的destination。分别作为输入和输出通道。我们也可以自己定义接口来实现:
String WS_INPUT = "ws-consumer";
String EMAIL_INPUT = "email-consumer";
String SMS_INPUT = "sms-consumer";
@Input(MqMessageInputConfig.EMAIL_INPUT)
SubscribableChannel emailChannel();
@Input(MqMessageInputConfig.WS_INPUT)
SubscribableChannel wsChannel();
@Input(MqMessageInputConfig.SMS_INPUT)
SubscribableChannel smChannel();
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface MqMessageOutputConfig {
String MESSAGE_OUTPUT = "message-producter";
@Output(MqMessageOutputConfig.MESSAGE_OUTPUT)
MessageChannel outPutChannel();
}
坑1.需要注意的是,最好不要自定义输入输出在同一个类里面。这样,如果我们只调用生产者发送消息。会导致提示Dispatcher has no subscribers for channel。并且会让我们发送消息的次数莫名减少几次。详细情况可以查看gihub官方issue,也给出的这种解决方法 官方解决方式
建立一个testjunit类,然后使用生产者发送消息。消费者监听队列获取消息。
接收到MQ消息:{"headers":{"amqp_receivedDeliveryMode":"PERSISTENT","amqp_receivedRoutingKey":"my-test-channel","amqp_receivedExchange":"my-test-channel","amqp_deliveryTag":1,"amqp_consumerQueue":"my-test-channel.anonymous.vYA2O6ZSQE-S9MOnE0ZoJQ","amqp_redelivered":false,"id":"805e7fc3-a046-e07a-edf5-def58d9c8eab","amqp_consumerTag":"amq.ctag-QwsmRKg5f0DGSp-7wbpYxQ","contentType":"text/plain","timestamp":1523930106483},"payload":"22222222222a7d24456-5b11-4c25-9270-876e7bbc556a"}
坑2.stream生成的exchang默认是topic模式。就是按照前缀匹配,发送消息给对应的队列。
*(星号):可以(只能)匹配一个单词
#(井号):可以匹配多个单词(或者零个)
fanout:广播模式,发送到所有的队列
direct:直传。完全匹配routingKey的队列可以收到消息。
坑3.默认消息异常之后,都会往死消息队列里面写,然而异常是放到一个header里面去的。默认消息队列支持的最大frame_max 是128kb,超过这个大小,服务器就主动给你关闭连接,然后把你的消息会不断的重试。
坑4.看到国内好多博客,使用@Input和@output都是用MessageChannel,这是不对的。@Output对MessageChannel,@Input对应SubscribableChannel 。切记!
坑5.我使用的stream版本是1.2.1,springboot版本时1.5.6。没有办法使用routingkey属性,即在spring.cloud.stream.rabbit这个属性无法显示。应该是我的stream版本偏低吧。遇到这种情况,大家果断换新版本,或者使用自带的ampq来实现吧。
坑6.stream的destination对应生成rabbitmq的exchange。加上了group后,例如destination:wx-consumer,group:queue。那么经过stream后队列名称会变成wx-consumer.queue。如果使用group对应的是持久化队列,不会被rabbitmq删除。