Spring整合rabbitmq实践(一):基础

Spring整合rabbitmq实践(二):扩展
Spring整合rabbitmq实践(三):源码

1. Rabbitmq基本概念

image

1.1. Rabbitmq中的各个角色

producer:消息生产者;

consumer:消息消费者;

queue:消息队列;

exchange:接收producer发送的消息按照binding规则转发给相应的queue;

binding:exchange与queue之间的关系;

virtualHost:每个virtualHost持有自己的exchange、queue、binding,用户只能在virtualHost粒度控制权限。

1.2. exchange的几种类型

fanout:

群发到所有绑定的queue;

direct:

根据routing key routing到相应的queue,routing不到任何queue的消息扔掉;可以不同的key绑到同一个queue,也可以同一个key绑到不同的queue;

image
image

topic:

类似direct,区别是routing key是由一组以“.”分隔的单词组成,可以有通配符,“*”匹配一个单词,“#”匹配0个或多个单词;

image

headers:

根据arguments来routing。

arguments为一组key-value对,任意设置。

“x-match”是一个特殊的key,值为“all”时必须匹配所有argument,值为“any”时只需匹配任意一个argument,不设置默认为“all”。

2. spring整合rabbitmq

通过以下配置,可以获得最基础的发送消息到queue,以及从queue接收消息的功能。

2.1. maven依赖


<dependency>

    <groupId>org.springframework.amqp</groupId>

    <artifactId>spring-rabbit</artifactId>

    <version>${version}</version>

</dependency>

这个包同时包含了一些其它的包:spring-context、spring-tx、spring-web、spring-messaging、spring-retry、spring-amqp、amqp-client,如果想单纯一点,可以单独引入。

最主要的是以下几个包,

spring-amqp:

Spring AMQP Core.

spring-rabbit:

Spring RabbitMQ Support.

amqp-client:

The RabbitMQ Java client library allows Java applications to interface with RabbitMQ.

个人理解就是,spring-amqp是spring整合的amqp,spring-rabbit是spring整合的rabbitmq(rabbitmq是amqp的一个实现,所以可能spring-rabbit也是类似关系),amqp-client提供操作rabbitmq的java api。

目前最新的是2.0.5.RELEASE版本。如果编译报错,以下信息或许能有所帮助:

(1)


java.lang.ClassNotFoundException: org.springframework.amqp.support.converter.SmartMessageConverter

解决方案:spring-amqp版本改为2.0.5.RELEASE。

(2)


java.lang.IllegalStateException: LifecycleProcessor not initialized - call 'refresh' before invoking lifecycle methods via the context...

解决方案:spring-context版本改为5.0.7.RELEASE。

(3)


java.lang.NoSuchMethodError: org.springframework.util.ObjectUtils.unwrapOptional(Ljava/lang/Object;)Ljava/lang/Object

解决方案:spring-core版本改为5.0.7.RELEASE。

(4)


java.lang.NullPointerException: null

at org.springframework.core.BridgeMethodResolver.findBridgedMethod(BridgeMethodResolver.java:60)

at org.springframework.beans.GenericTypeAwarePropertyDescriptor.<init>(GenericTypeAwarePropertyDescriptor.java:70)

at org.springframework.beans.CachedIntrospectionResults.buildGenericTypeAwarePropertyDescriptor(CachedIntrospectionResults.java:366)

at org.springframework.beans.CachedIntrospectionResults.<init>(CachedIntrospectionResults.java:302)

解决方案:spring-beans版本改为5.0.7.RELEASE。

(5)


Caused by: java.lang.NoSuchMethodError: org.springframework.aop.framework.AopProxyUtils.getSingletonTarget(Ljava/lang/Object;)Ljava/lang/Object;

解决方案:spring-aop版本改为5.0.7.RELEASE。

总之,需要5.0.7.RELEASE版本的spring,及相匹配版本的amqp-client。

2.2. 配置ConnectionFactory

后面所讲的这些bean配置,spring-amqp中都有默认配置,如果不需要修改默认配置,则不用人为配置这些bean。后面这些配置也没有涉及到所有的属性。

这里的ConnectionFactory指的是spring-rabbit包下面的ConnectionFactory接口,不是amqp-client包下面的ConnectionFactory类。


@Configuration

public class MqProducerConfig {

    @Autowired

    @Bean

    public ConnectionFactory amqpConnectionFactory(ConnectionListener connectionListener,

                                                  RecoveryListener recoveryListener,

                                                  ChannelListener channelListener) {

        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();

        connectionFactory.setAddresses("localhost:5672");

        connectionFactory.setUsername("guest");

        connectionFactory.setPassword("guest");

        connectionFactory.setVirtualHost("/");

        connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);

        connectionFactory.setChannelCacheSize(25);

        connectionFactory.setChannelCheckoutTimeout(0);

        connectionFactory.setPublisherReturns(false);

        connectionFactory.setPublisherConfirms(false);

        connectionFactory.addConnectionListener(connectionListener);

        connectionFactory.addChannelListener(channelListener);

        connectionFactory.setRecoveryListener(recoveryListener);

        //connectionFactory.setConnectionCacheSize(1);

        //connectionFactory.setConnectionLimit(Integer.MAX_VALUE);

        return connectionFactory;

    }

}

上面这个bean是spring-amqp的核心,不论是发送消息还是接收消息都需要这个bean,下面描述一下里面这些配置的含义。

setAddresses:设置了rabbitmq的地址、端口,集群部署的情况下可填写多个,“,”分隔。

setUsername:设置rabbitmq的用户名。

setPassword:设置rabbitmq的用户密码。

setVirtualHost:设置virtualHost。

setCacheMode:设置缓存模式,共有两种,CHANNELCONNECTION模式。

CHANNEL模式,程序运行期间ConnectionFactory会维护着一个Connection,所有的操作都会使用这个Connection,但一个Connection中可以有多个Channel,操作rabbitmq之前都必须先获取到一个Channel,否则就会阻塞(可以通过setChannelCheckoutTimeout()设置等待时间),这些Channel会被缓存(缓存的数量可以通过setChannelCacheSize()设置);

CONNECTION模式,这个模式下允许创建多个Connection,会缓存一定数量的Connection,每个Connection中同样会缓存一些Channel,除了可以有多个Connection,其它都跟CHANNEL模式一样。

这里的Connection和Channel是spring-amqp中的概念,并非rabbitmq中的概念,官方文档对Connection和Channel有这样的描述:

Sharing of the connection is possible since the "unit of work" for messaging with AMQP is actually a "channel" (in some ways, this is similar to the relationship between a Connection and a Session in JMS).

关于CONNECTION模式中,可以存在多个Connection的使用场景,官方文档的描述:

The use of separate connections might be useful in some environments, such as consuming from an HA cluster, in conjunction with a load balancer, to connect to different cluster members.

setChannelCacheSize:设置每个Connection中(注意是每个Connection)可以缓存的Channel数量,注意只是缓存的Channel数量,不是Channel的数量上限,操作rabbitmq之前(send/receive message等)要先获取到一个Channel,获取Channel时会先从缓存中找闲置的Channel,如果没有则创建新的Channel,当Channel数量大于缓存数量时,多出来没法放进缓存的会被关闭。

注意,改变这个值不会影响已经存在的Connection,只影响之后创建的Connection。

setChannelCheckoutTimeout:当这个值大于0时,channelCacheSize不仅是缓存数量,同时也会变成数量上限,从缓存获取不到可用的Channel时,不会创建新的Channel,会等待这个值设置的毫秒数,到时间仍然获取不到可用的Channel会抛出AmqpTimeoutException异常。

同时,在CONNECTION模式,这个值也会影响获取Connection的等待时间,超时获取不到Connection也会抛出AmqpTimeoutException异常。

setPublisherReturns、setPublisherConfirms:producer端的消息确认机制(confirm和return),设为true后开启相应的机制,后文详述。

官方文档描述publisherReturns设为true打开return机制,publisherComfirms设为true打开confirm机制,但测试结果(2.0.5.RELEASE版本)是,任意一个设为true,两个都会打开。

addConnectionListener、addChannelListener、setRecoveryListener:添加或设置相应的Listener,后文详述。

setConnectionCacheSize:仅在CONNECTION模式使用,设置Connection的缓存数量。

setConnectionLimit:仅在CONNECTION模式使用,设置Connection的数量上限。

上面的bean配置,除了需要注入的几个listener bean以外,其它设置的都是其默认值(2.0.5.RELEASE版本),后面的bean示例配置也是一样,部分属性不同版本的默认值可能有所不同。

2.3. 配置com.rabbitmq.client.ConnectionFactory

一般不用配置这个bean,这里简单提一下。

这个ConnectionFactory是rabbit api中的ConnectionFactory类,这里面是连接rabbitmq节点的Connection配置。


    /** Default user name */

    public static final String DEFAULT_USER = "guest";

    /** Default password */

    public static final String DEFAULT_PASS = "guest";

    /** Default virtual host */

    public static final String DEFAULT_VHOST = "/";

    /** Default maximum channel number;

    *  zero for unlimited */

    public static final int    DEFAULT_CHANNEL_MAX = 0;

    /** Default maximum frame size;

    *  zero means no limit */

    public static final int    DEFAULT_FRAME_MAX = 0;

    /** Default heart-beat interval;

    *  60 seconds */

    public static final int    DEFAULT_HEARTBEAT = 60;

    /** The default host */

    public static final String DEFAULT_HOST = "localhost";

    /** 'Use the default port' port */

    public static final int    USE_DEFAULT_PORT = -1;

    /** The default non-ssl port */

    public static final int    DEFAULT_AMQP_PORT = AMQP.PROTOCOL.PORT;

    /** The default ssl port */

    public static final int    DEFAULT_AMQP_OVER_SSL_PORT = 5671;

    /** The default TCP connection timeout: 60 seconds */

    public static final int    DEFAULT_CONNECTION_TIMEOUT = 60000;

    /**

    * The default AMQP 0-9-1 connection handshake timeout. See DEFAULT_CONNECTION_TIMEOUT

    * for TCP (socket) connection timeout.

    */

    public static final int    DEFAULT_HANDSHAKE_TIMEOUT = 10000;

    /** The default shutdown timeout;

    *  zero means wait indefinitely */

    public static final int    DEFAULT_SHUTDOWN_TIMEOUT = 10000;

    /** The default continuation timeout for RPC calls in channels: 10 minutes */

    public static final int    DEFAULT_CHANNEL_RPC_TIMEOUT = (int) MINUTES.toMillis(10);

    /** The default network recovery interval: 5000 millis */

    public static final long  DEFAULT_NETWORK_RECOVERY_INTERVAL = 5000;

    private static final String PREFERRED_TLS_PROTOCOL = "TLSv1.2";

    private static final String FALLBACK_TLS_PROTOCOL = "TLSv1";

    ...

如果想修改这些配置,可以按如下方式配置:


    @Autowired

    @Bean

    public ConnectionFactory connectionFactory(com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory) {

        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitConnectionFactory);

        return connectionFactory;

    }

    @Bean

    public com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory() {

        com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();

        connectionFactory.setAutomaticRecoveryEnabled(false);

        connectionFactory.setUsername("guest");

        connectionFactory.setPassword("guest");

        connectionFactory.setVirtualHost("/");

        return connectionFactory;

    }

2.4. 配置AmqpTemplate

consumer端如果通过@RabbitListener注解的方式接收消息,不需要这个bean。

不建议直接通过ConnectionFactory获取Channel操作rabbitmq,建议通过amqpTemplate操作。


    @Autowired

    @Bean

    public AmqpTemplate amqpTemplate(ConnectionFactory amqpConnectionFactory,

                                    RabbitTemplate.ReturnCallback returnCallback,

                                    RabbitTemplate.ConfirmCallback confirmCallback,

                                    RetryTemplate retryTemplate,

                                    MessageConverter messageConverter

                                    ){

        RabbitTemplate rabbitTemplate = new RabbitTemplate();

        rabbitTemplate.setConnectionFactory(amqpConnectionFactory);

        rabbitTemplate.setRetryTemplate(retryTemplate);

        rabbitTemplate.setMessageConverter(messageConverter);

        rabbitTemplate.setChannelTransacted(false);

        rabbitTemplate.setReturnCallback(returnCallback);

        rabbitTemplate.setConfirmCallback(confirmCallback);

        rabbitTemplate.setMandatory(false);

        return rabbitTemplate;

    }

setConnectionFactory:设置spring-amqp的ConnectionFactory。

setRetryTemplate:设置重试机制,详情见后文。

setMessageConverter:设置MessageConverter,用于java对象与Message对象(实际发送和接收的消息对象)之间的相互转换,详情见后文。

setChannelTransacted:打开或关闭Channel的事务,关于amqp的事务后文描述。

setReturnCallback、setConfirmCallback:return和confirm机制的回调接口,后文详述。

setMandatory:设为true使ReturnCallback生效。

2.5. 配置RabbitListenerContainerFactory

这个bean仅在consumer端通过@RabbitListener注解的方式接收消息时使用,每一个@RabbitListener注解的方法都会由这个RabbitListenerContainerFactory创建一个MessageListenerContainer,负责接收消息。


    @Autowired

    @Bean

    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory cachingConnectionFactory,

                                                                              ErrorHandler errorHandler,

                                                                              MessageConverter messageConverter) {

        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

        factory.setConnectionFactory(cachingConnectionFactory);

        factory.setMessageConverter(messageConverter);

        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);

        factory.setConcurrentConsumers(1);

        factory.setMaxConcurrentConsumers(1);

        factory.setPrefetchCount(250);

        factory.setChannelTransacted(false);

        factory.setTxSize(1);

        factory.setDefaultRequeueRejected(true);

        factory.setErrorHandler(errorHandler);

        return factory;

    }

setConnectionFactory:设置spring-amqp的ConnectionFactory。

setMessageConverter:对于consumer端,MessageConverter也可以在这里配置。

setAcknowledgeMode:设置consumer端的应答模式,共有三种:NONE、AUTO、MANUAL。

NONE,无应答,这种模式下rabbitmq默认consumer能正确处理所有发出的消息,所以不管消息有没有被consumer收到,有没有正确处理都不会恢复;

AUTO,由Container自动应答,正确处理发出ack信息,处理失败发出nack信息,rabbitmq发出消息后将会等待consumer端的应答,只有收到ack确认信息才会把消息清除掉,收到nack信息的处理办法由setDefaultRequeueRejected()方法设置,所以在这种模式下,发生错误的消息是可以恢复的。

MANUAL,基本同AUTO模式,区别是需要人为调用方法给应答。

setConcurrentConsumers:设置每个MessageListenerContainer将会创建的Consumer的最小数量,默认是1个。

setMaxConcurrentConsumers:设置每个MessageListenerContainer将会创建的Consumer的最大数量,默认等于最小数量。

setPrefetchCount:设置每次请求发送给每个Consumer的消息数量。

setChannelTransacted:设置Channel的事务。

setTxSize:设置事务当中可以处理的消息数量。

setDefaultRequeueRejected:设置当rabbitmq收到nack/reject确认信息时的处理方式,设为true,扔回queue头部,设为false,丢弃。

setErrorHandler:实现ErrorHandler接口设置进去,所有未catch的异常都会由ErrorHandler处理。

2.6. sending messages

AmqpTamplate里面有下面几个方法可以向queue发送消息:


void send(Message message) throws AmqpException;

void send(String routingKey, Message message) throws AmqpException;

void send(String exchange, String routingKey, Message message) throws AmqpException;

这里,exchange必须存在,否则消息发不出去,会看到错误日志,但不影响程序运行:


03:57:38.700 [AMQP Connection 127.0.0.1:5672] ERROR [CachingConnectionFactory.java:956] - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'xxx' in vhost 'test', class-id=60, method-id=40)

Message是org.springframework.amqp.core.Message类,spring-amqp发送和接收的都是这个Message。


package org.springframework.amqp.core;

public class Message implements Serializable {

private static final long serialVersionUID = -7177590352110605597L;

private static final String ENCODING = Charset.defaultCharset().name();

private final MessageProperties messageProperties;

private final byte[] body;

public Message(byte[] body, MessageProperties messageProperties) { //NOSONAR

this.body = body; //NOSONAR

this.messageProperties = messageProperties;

}

...

}

从Message类源码可以看到消息内容放在byte[]里面,MessageProperties对象包含了非常多的一些其它信息,如Header、exchange、routing key等。

这种方式,需要将消息内容(String,或其它Object)转换为byte[],示例:


amqpTemplate.send(exchange, routingKey, new Message(JSON.toJSONString(event).getBytes(), MessagePropertiesBuilder.newInstance().build()));

也可以直接调用下面几个方法,Object将会自动转为Message对象发送:


void convertAndSend(Object message) throws AmqpException;

void convertAndSend(String routingKey, Object message) throws AmqpException;

void convertAndSend(String exchange, String routingKey, Object message)

    throws AmqpException;

2.7. receiving messages

有两种方法接收消息:

1.polling consumer,轮询调用方法一次获取一条;

2.asynchronous consumer,listener异步接收消息。

polling consumer

直接通过AmqpTemplate的方法从queue获取消息,有如下方法:


Message receive() throws AmqpException;

Message receive(String queueName) throws AmqpException;

Message receive(long timeoutMillis) throws AmqpException;

Message receive(String queueName, long timeoutMillis) throws AmqpException;

如果queue里面没有消息,会立刻返回null;传入timeoutMillis参数后可阻塞等待一段时间。

如果想直接从queue获取想要的java对象,可调用下面这一组方法:


    Object receiveAndConvert() throws AmqpException;

Object receiveAndConvert(String queueName) throws AmqpException;

Object receiveAndConvert(long timeoutMillis) throws AmqpException;

Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;

<T> T receiveAndConvert(ParameterizedTypeReference<T> type) throws AmqpException;

<T> T receiveAndConvert(String queueName, ParameterizedTypeReference<T> type) throws AmqpException;

<T> T receiveAndConvert(long timeoutMillis, ParameterizedTypeReference<T> type) throws AmqpException;

<T> T receiveAndConvert(String queueName, long timeoutMillis, ParameterizedTypeReference<T> type)

throws AmqpException;

后面4个方法是带泛型的,示例如下:


Foo<Bar<Baz, Qux>> foo =

    rabbitTemplate.receiveAndConvert(new ParameterizedTypeReference<Foo<Bar<Baz, Qux>>>() { });

使用这四个方法需要配置org.springframework.amqp.support.converter.SmartMessageConverter,这是一个接口,Jackson2JsonMessageConverter已经实现了这个接口,所以只要将Jackson2JsonMessageConverter设置到RabbitTemplate中即可。

asynchronous consumer

有多种方式可以实现,详情参考官方文档。

最简单的实现方式是@RabbitListener注解,示例:


@Component

public class RabbitMqListener {

    @RabbitListener(queues = "queueName")

    public void listen(Message message) {

        JSON.parseObject(new String(message.getBody()), typeReference);

    }

}

这里接收消息的对象用的是Message,也可以是自定义的java对象,但调用Converter转换失败会报错。

注解上指定的queue必须是已经存在并且绑定到某个exchange的,否则会报错:


03:46:59.746 [SimpleAsyncTaskExecutor-1] WARN  o.s.a.r.l.BlockingQueueConsumer [BlockingQueueConsumer.java:565] - Failed to declare queue:xxx

03:46:59.747 [SimpleAsyncTaskExecutor-1] WARN  o.s.a.r.l.BlockingQueueConsumer [BlockingQueueConsumer.java:479] - Queue declaration failed; retries left=3

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[xxx]

如果在@RabbitListener注解中指明binding信息,就能自动创建queue、exchange并建立binding关系。

direct和topic类型的exchange需要routingKey,示例:


@RabbitListener(bindings = @QueueBinding(

        value = @Queue(value = "myQueue", durable = "true"),

        exchange = @Exchange(value = "auto.exch", durable = "true"),

        key = "orderRoutingKey.#")

  )

fanout类型的exchange,示例:


@RabbitListener(bindings = @QueueBinding(

        value = @Queue(value = "myQueue", durable = "true"),

        exchange = @Exchange(value = "auto.exch", durable = "true", type = ExchangeTypes.FANOUT)

        )

  )

2.0版本之后,可以指定多个routingKey,示例:


key = { "red", "yellow" }

并且支持arguments属性,可用于headers类型的exchange,示例:


@RabbitListener(bindings = @QueueBinding(

        value = @Queue(value = "auto.headers", autoDelete = "true",

                        arguments = @Argument(name = "x-message-ttl", value = "10000",

                                                type = "java.lang.Integer")),

        exchange = @Exchange(value = "auto.headers", type = ExchangeTypes.HEADERS, autoDelete = "true"),

        arguments = {

                @Argument(name = "x-match", value = "all"),

                @Argument(name = "foo", value = "bar"),

                @Argument(name = "baz")

        })

)

@Queue有两个参数exclusive和autoDelete顺便解释一下:

exclusive,排他队列,只对创建这个queue的Connection可见,Connection关闭queue删除;

autoDelete,没有consumer对这个queue消费时删除。

对于这两种队列,durable=true是不起作用的。

另外,如果注解申明的queue和exchange及binding关系都已经存在,但与已存在的设置不同,比如,已存在的exchange的是direct类型,这里尝试改为fanout类型,结果是不会有任何影响,不论是修改或者新增参数都不会生效。

如果queue存在,exchange存在,但没有binding,那么程序启动后会自动建立起binding关系。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,637评论 18 139
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 15,892评论 2 11
  • Spring整合rabbitmq实践(一):基础Spring整合rabbitmq实践(三):源码 3. 扩展实践 ...
    jinchaolv阅读 10,042评论 1 7
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,351评论 2 34
  • 前言 在微服务架构的系统中,我们通常会使用轻量级的消息代理来构建一个共用的消息主题让系统中所有微服务实例都连接上来...
    Chandler_珏瑜阅读 6,572评论 2 39