RocketMQ 代码示例 - 延时发送

生产者:

package com.rocketmq.test;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class ScheduledMessageProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        int totalMessagesToSend = 10;
        for (int i = 0; i < totalMessagesToSend; i++) {
            Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
            // 10秒后传递给consumer
            message.setDelayTimeLevel(3);
            // 发送消息
            producer.send(message);
        }
        producer.shutdown();
    }
}

消费者:

package com.rocketmq.test;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class ScheduledMessageConsumer {

    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("TestTopic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages,
                    ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
                            + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

运行:

mvn exec:java -Dexec.mainClass="com.rocketmq.test.ScheduledMessageConsumer"
mvn exec:java -Dexec.mainClass="com.rocketmq.test.ScheduledMessageProducer"

等待几秒后consumer控制台就会输出:

Receive message[msgId=AC1100016AB25DF7C6F220F1CCB90000] 2ms later
Receive message[msgId=AC1100016AB25DF7C6F220F1CCC50001] 3ms later
Receive message[msgId=AC1100016AB25DF7C6F220F1CCC60002] 3ms later
Receive message[msgId=AC1100016AB25DF7C6F220F1CCC80003] 12ms later
Receive message[msgId=AC1100016AB25DF7C6F220F1CCC90004] 11ms later
Receive message[msgId=AC1100016AB25DF7C6F220F1CCCB0005] 17ms later
Receive message[msgId=AC1100016AB25DF7C6F220F1CCCC0006] 15ms later
Receive message[msgId=AC1100016AB25DF7C6F220F1CCCD0007] 19ms later
Receive message[msgId=AC1100016AB25DF7C6F220F1CCCE0008] 19ms later
Receive message[msgId=AC1100016AB25DF7C6F220F1CCCF0009] 19ms later
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 136,562评论 19 139
  • 姓名:周小蓬 16019110037 转载自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw阅读 34,899评论 13 425
  • 石肖孤岛菩萨 小桥湖水清雅 芦苇亭台闲暇
    京九如邮币收藏阅读 252评论 0 0
  • 数据库基础 什么是数据库 数据库(database)是以某种有组织的方式存储的数据集合,保存有组织的数据的容器。 ...
    WeirdoSu阅读 325评论 0 0

友情链接更多精彩内容