rocketmq自测

1、生产者
package com.example.demo.rocketmq;

/**

  • @Description:
  • @author: YuanTong-ZXY
  • @Date: 2019/2/22 9:34
    */
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;

public class Producer {
public static void main(String[] args){
DefaultMQProducer producer = new DefaultMQProducer("Producer");
producer.setNamesrvAddr("192.168.205.196:9876");
try {
producer.start();

        Message msg = new Message("PushTopic",
                "push",
                "1",
                "Just for test.".getBytes());

        SendResult result = producer.send(msg);
        System.out.println("id:" + result.getMsgId() +
                " result:" + result.getSendStatus());

        msg = new Message("PushTopic",
                "push",
                "2",
                "Just for test.zhangxinyu111".getBytes());

        result = producer.send(msg);
        System.out.println("id:" + result.getMsgId() +
                " result:" + result.getSendStatus());

        msg = new Message("PullTopic",
                "pull",
                "1",
                "Just for test.".getBytes());

        result = producer.send(msg);
        System.out.println("id:" + result.getMsgId() +
                " result:" + result.getSendStatus());
    } catch (Exception e) {
        e.printStackTrace();
    }finally{
        producer.shutdown();
    }
}

}

2、消费者
package com.example.demo.rocketmq;

/**

  • @Description:
  • @author: YuanTong-ZXY
  • @Date: 2019/2/22 9:42
    */

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
public static void main(String[] args){
DefaultMQPushConsumer consumer =
new DefaultMQPushConsumer("PushConsumer");
consumer.setNamesrvAddr("192.168.205.196:9876");
try {
//订阅PushTopic下Tag为push的消息
consumer.subscribe("PushTopic", "push");
//程序第一次启动从消息队列头取数据
consumer.setConsumeFromWhere(
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(
new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> list,
ConsumeConcurrentlyContext Context) {
Message msg = list.get(0);
System.out.println(msg.toString());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
);
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容