生产者:
package com.rocketmq.test;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
int totalMessagesToSend = 10;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// 10秒后传递给consumer
message.setDelayTimeLevel(3);
// 发送消息
producer.send(message);
}
producer.shutdown();
}
}
消费者:
package com.rocketmq.test;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class ScheduledMessageConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TestTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages,
ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
+ (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
运行:
mvn exec:java -Dexec.mainClass="com.rocketmq.test.ScheduledMessageConsumer"
mvn exec:java -Dexec.mainClass="com.rocketmq.test.ScheduledMessageProducer"
等待几秒后consumer控制台就会输出:
Receive message[msgId=AC1100016AB25DF7C6F220F1CCB90000] 2ms later
Receive message[msgId=AC1100016AB25DF7C6F220F1CCC50001] 3ms later
Receive message[msgId=AC1100016AB25DF7C6F220F1CCC60002] 3ms later
Receive message[msgId=AC1100016AB25DF7C6F220F1CCC80003] 12ms later
Receive message[msgId=AC1100016AB25DF7C6F220F1CCC90004] 11ms later
Receive message[msgId=AC1100016AB25DF7C6F220F1CCCB0005] 17ms later
Receive message[msgId=AC1100016AB25DF7C6F220F1CCCC0006] 15ms later
Receive message[msgId=AC1100016AB25DF7C6F220F1CCCD0007] 19ms later
Receive message[msgId=AC1100016AB25DF7C6F220F1CCCE0008] 19ms later
Receive message[msgId=AC1100016AB25DF7C6F220F1CCCF0009] 19ms later