rocketmq入门的demo

最简单的demo,编写一个消息监听和消息发送。namesrv,broker,producer,consuer都是一个。本文的意思在于初学者可以根据文章的代码,操作复制出一个入门例子出来。

  • producer。 该节点是用于发送消息。
  • consumer。该节点用于接受发送的消息。
  • namesrv。 rocketmq的生产者和消费者都不会记录broker的实际地址,所以broker的地址会放在namesrv节点。broker启动的时候,把自己的地址写进namesrv,producer和consumer启动的时候会从namesrv中读取broker的地址。
  • broker。 该节点主要是接受生产者的消息,然后发送给消费者,并且还会存储记录消息。

一,下载

http://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.4.0/rocketmq-all-4.4.0-bin-release.zip

二,启动rocketmq服务

  • 2.1 先启动nameserver
>>bin/nameserver
  • 2.2 启动broker
>>bin/mqbroker -n localhost:9876 
>>nohup sh mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true > broker_log.log 2>&1 &

三,编写简单java代码

  • 3.1 maven 依赖
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>
  • 3.2 java代码生产者和消费者
public class ConsumerMain {
    public static void main(String[] args) throws  Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("topicName","*");
        consumer.registerMessageListener(new MessageListenerConcurrently(){
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.println(list);
                System.out.println(consumeConcurrentlyContext);
                return null;
            }
        });
        consumer.start();
        System.out.println("消费者启动");
    }
}
public class ProducerMain {
    public static void main( String[] args ) throws     Exception {
         DefaultMQProducer producer = new DefaultMQProducer("ConsumerGroup");
         producer.setNamesrvAddr("127.0.0.1:9876");
         producer.start();
         producer.setSendMsgTimeout(30000);
         for (int i = 0; i < 50000000; i++) {
             Message msg = new Message("topicName" ,("Hello_RocketMQ " + i).getBytes("UTF-8"));
             SendResult sendResult = producer.send(msg);
             System.out.printf("%s%n", sendResult);
             Thread.sleep(3000);
         }
         System.out.println("生产者发送了");
         producer.shutdown();
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容