rabbitmq入门一简单队列(Hello World!、Work queues)

@[toc]
docker搭建rabbitm

mq官网

"Hello World!":

官网教程
点对点,一个生产者,一个消费者,一个队列。
特点:

  • 没有交换机概念,生产者和消费者直接通过队列进行交流
在这里插入图片描述

1. mq创建一个队列

  • 安装完rabbitm直接访问 118.25.188.37:15672
  • 进入登入界面:默认密码都guest
  • ==这里不创建队列也行,java中绑定队列如果队列没有会创建==


    在这里插入图片描述

    在这里插入图片描述

    完成后点击add queue


    在这里插入图片描述

2. 创建生产者消费者

  1. 引入依赖
     dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>5.73</version>
        </dependency>
    </dependencies>
  1. 获取mq连接工具类(类似jdbc连接)
  • MQConnectionUtils
public class MQConnectionUtils {
    private static final String IP = "118.25.188.37";
    private static final Integer PORT = 5672;
    private static final String USERNAME = "guest";
    private static final String PASSWORD = "guest";

    public static Connection newConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost(IP);
        //设置端口号
        factory.setPort(5672);
        //设置账号信息,用户名、密码、vhost
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        //创建连接
        Connection connection = factory.newConnection();
        return connection;

    }
}

  1. Producer 生产者
  • Producer
public class Producer {
    private static final String QUEUE_NAME = "mq";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.获取连接
        Connection newConnection = MQConnectionUtils.newConnection();
        // 2.创建通道
        Channel channel = newConnection.createChannel();
        // 3.创建队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String msg = "直接模式消息发送";
        System.out.println("生产者发送消息:" + msg);
        // 4.发送消息
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        channel.close();
        newConnection.close();
    }

}

发送消息后mq会出现待消费的消息


在这里插入图片描述
  1. Customer 消费者
  • Customer
public class Customer {
    private static final String QUEUE_NAME = "mq";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.获取连接
        Connection newConnection = MQConnectionUtils.newConnection();
        // 2.获取通道
        Channel channel = newConnection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msgString = new String(body, "UTF-8");
                System.out.println("消费者获取消息:" + msgString);
            }
        };
        // 3.监听队列  true表示自动应答,false表示手动应答
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

    }

}

消费完后mq消息就没有了

工作队列 Work queues

在这里插入图片描述

与点对点不同的是,消费者由1个变成了两个,消费者集群了
我们这里启动两个消费者


在这里插入图片描述

在这里插入图片描述

然后发送10条消息
看看结果:


在这里插入图片描述
在这里插入图片描述

可以看到实现的是均摊消费

应答模式

channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
这里第二个参数表示应答模式为true,表示自动签收

  • 自动应答:不会在乎消费者对这个消息处理是否成功,都会告诉队列删除该消息,如果消息获取失败的情况,实现自动补偿
  • 手动应答:消费者处理完业务逻辑,手动返回一个ack(通知)告诉队列服务器是否删除该消息

这里我们将 应答模式设置为false
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
然后向消费者发送10个消息

在这里插入图片描述

可以看到消费者接收到了10个消息,但是我现在如果停止消费者
在这里插入图片描述

发现队列中还是有10个消息未消费,原因我我们没有手动返回ask
这里我们需要加上这个channel.basicAck(envelope.getDeliveryTag(), false);

public class Customer {
    private static final String QUEUE_NAME = "mq";

    public static void main(String[] args) throws  Exception {
        System.out.println("消费者2启动");
        // 1.获取连接
        Connection newConnection = MQConnectionUtils.newConnection();
        /* 2.获取通道 */
        Channel channel = newConnection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //监听队列
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msgString = new String(body, "UTF-8");
                System.out.println("消费者获取消息:" + msgString);
                //手动应答
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 3.监听队列  true表示自动应答,false表示手动应答
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);

    }

}

这样就表示消费者接受消息成功了
实现:添加如下代码channel.basicQos(1);

public class Customer {
    private static final String QUEUE_NAME = "mq";

    public static void main(String[] args) throws  Exception {
        System.out.println("消费者2启动");
        // 1.获取连接
        Connection newConnection = MQConnectionUtils.newConnection();
        /* 2.获取通道 */
        Channel channel = newConnection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //表示一次只消费一个消息
        channel.basicQos(1);
        //监听队列
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msgString = new String(body, "UTF-8");
                System.out.println("消费者获取消息:" + msgString);
                //手动应答
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 3.监听队列  true表示自动应答,false表示手动应答
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);

    }

}

公平队列

在上面我们消费者如果集群,消费者接受采用的均摊消费,但每个消费者处理业务时间不同,这样就不能让性能更好的消费者消费更多的消息(能者多劳)

  • 解决方案:消费者都采用应答模式实现公平队列,即谁消费快,消费的消息多

集成springboot

demo 路径结构:


在这里插入图片描述

2.代码测试

创建一个springboot项目,然后加入mq依赖

dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

配置 application.properties

# web端口
server.port=8089
# mq地址
spring.rabbitmq.host=118.25.188.37

测试生产者

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class)
public class ProductTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void senMes() {
        rabbitTemplate.convertAndSend("mq","直接模式消息发送");

    }
}

运行sendMes
无错误后访问mq web管理页面发现多了一条待消费的消息


在这里插入图片描述

编写消费者

@Component
@RabbitListener(queues = "mq")  //指定消费队列
public class Customer1 {

    @RabbitHandler
    public void getMsg(String msg) {
        System.out.println("直接模式消费消息" + msg);
    }
}

然后直接运行main
可以看到效果


在这里插入图片描述

在这里插入图片描述

也可以启动多个消费者等待消息,具体idea启动多个实例请看这里

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 16,066评论 2 11
  • 关于消息队列,从前年开始断断续续看了些资料,想写很久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时...
    中v中阅读 1,986评论 0 20
  • 关于消息队列,从前年开始断断续续看了些资料,想写很久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时...
    Java机械师阅读 553评论 0 2
  • 什么叫消息队列 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂...
    lijun_m阅读 1,376评论 0 1
  • 文章首次整理自 个人博客:RabbitMQ在iOS中的应用 引言 随着公司业务的需要,原来的推送,不能满足现有的要...
    阿木摄影阅读 6,684评论 27 10