第三章: Spring整合RabbitMQ

导入依赖

        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.1.8.RELEASE</version>
        </dependency>

配置生产者

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <!-- 定义连接信息 -->
    <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" virtual-host="/" username="guest" password="guest"/>
    <rabbit:admin connection-factory="connectionFactory"/>

    <!--  定义队列信息  -->
    <rabbit:queue id="test_queue_1" name="test_queue_1" auto-declare="true"/>
    <rabbit:queue id="test_queue_2" name="test_queue_2" auto-declare="true"/>
    <rabbit:queue id="test_queue_3" name="test_queue_3" auto-declare="true"/>

    <!-- 定义交换机 -->
    <rabbit:topic-exchange id="test_exchange" name="test_exchange"  auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding queue="test_queue_1" pattern="#.world"/>
            <rabbit:binding queue="test_queue_2" pattern="hello.#"/>
            <rabbit:binding queue="test_queue_3" pattern="hello.world"/>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>

配置消费者

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <!-- 定义连接信息 -->
    <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" virtual-host="/" username="guest" password="guest"/>
    <rabbit:admin connection-factory="connectionFactory"/>

    <!--  监听器  -->
    <bean id="testListener1" class="TestQueueListener1"/>
    <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
        <rabbit:listener ref="testListener1" queue-names="test_queue_1"/>
    </rabbit:listener-container>

    <bean id="testListener2" class="TestQueueListener2"/>
    <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
        <rabbit:listener ref="testListener2" queue-names="test_queue_2"/>
    </rabbit:listener-container>

    <bean id="testListener3" class="TestQueueListener3"/>
    <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
        <rabbit:listener ref="testListener3" queue-names="test_queue_3"/>
    </rabbit:listener-container>

</beans>

编写监听器

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class TestQueueListener1 implements MessageListener {
    public void onMessage(Message message) {
        String consumerQueue = message.getMessageProperties().getConsumerQueue();
        System.out.println(consumerQueue + ":" + new String(message.getBody()));
    }
}
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class TestQueueListener2 implements MessageListener {
    public void onMessage(Message message) {
        String consumerQueue = message.getMessageProperties().getConsumerQueue();
        System.out.println(consumerQueue + ":" + new String(message.getBody()));
    }
}
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class TestQueueListener3 implements MessageListener {
    public void onMessage(Message message) {
        String consumerQueue = message.getMessageProperties().getConsumerQueue();
        System.out.println(consumerQueue + ":" + new String(message.getBody()));
    }
}

测试代码

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:rabbitmq-producer.xml")
public class ProducerTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        for (int i = 0; i < 5; i++) {
            rabbitTemplate.convertAndSend("test_exchange", "hello.world", "hello, world");
        }
    }
}
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:rabbitmq-consumer.xml")
public class ConsumerTest {

    @Test
    public void test() {
        System.out.println("消费端启动...");
        while (true) {

        }
    }

}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容