RocketMQ实战

PS:我这里使用的是自定义的RoketMQ进行消息的发送和消费的,原理都差不多,万变不离其宗。


创建配置文件类

  • 首先创建RocketMqConfig、RocketMqProducerConfig、RocketMqConsumerConfig类,里面包含RocketMQ所需的所有配置,等到创建Consumer和Producer的时候可以一键配置,文中所有代码块均省略getter\setter方法

    public class RocketMqConfig {
      private String namesrvAddr = "127.0.0.1:9876";
      private RocketMqProducerConfig producerConfig = new RocketMqProducerConfig();
      private RocketMqConsumerConfig consumerConfig = new RocketMqConsumerConfig();
    }
    
    public class RocketMqProducerConfig {
      private String groupName = "producer";
      private String instanceName = "producer_instance";
      private String topic = "topic";
    }
    
    public class RocketMqConsumerConfig {
      //组名
      private String groupName = "consumer";
      //实例名
      private String instanceName = "consumer_instance";
      // 订阅主题和标签Map
      private Map<String, String> subscriptions = new HashMap<>();
      //设置批量消费,以提升消费吞吐量,默认是1
      private int consumeMessageBatchMaxSize = 1;
    }
    

创建Consumer和Producer

  • 创建RocketMqProducer、RocketMqConsumer类,里面包括构造方法(有参、无参)、start、stop等方法

    public class RocketMqProducer implements MqProducer {
    
      private DefaultMQProducer producer;
      private RocketMqConfig config;
      private boolean isStarted = false;
    
      public RocketMqProducer(RocketMqConfig config) {
          this.config = config;
          producer = new DefaultMQProducer(config.getProducerConfig().getGroupName());
          producer.setInstanceName(config.getProducerConfig().getInstanceName());
          producer.setVipChannelEnabled(false);
          producer.setNamesrvAddr(config.getNamesrvAddr());
      }
        public boolean isStarted() {
          return isStarted;
      }
        public void start() {
          producer.start();
          isStarted = true;
      }
        public void stop() {
          producer.shutdown();
          isStarted = false;
      }
        public MqSendResult send(String tag, AbstractMessage t) {
            t.setTopic(config.getProducerConfig().getTopic());
            t.setTag(tag);
            Message msg = new Message(t.getTopic(), 
                                      t.getTag(), 
                                      t.getKey(),
                                      t.getMessageType().getType(),
                                      t.getBody(),
                                      true);
            SendResult sendResult = producer.send(msg);
            t.setMessageId(sendResult.getMsgId());
    
            // 返回结果
            MqSendResult mqSendResult = new MqSendResult();
            mqSendResult.setSuccess(true);
            mqSendResult.setCode(sendResult.getSendStatus().toString());
            mqSendResult.setMsgId(sendResult.getMsgId());
            return mqSendResult;
        }
    }
    
    public class RocketMqConsumer implements MqConsumer {
      private MqMessageHandler handler = null;
      private DefaultMQPushConsumer consumer;
      private RocketMqConfig config;
      private boolean isStarted = false;
      public RocketMqConsumer(RocketMqConfig config) {
          this.config = config;
          consumer = new DefaultMQPushConsumer(config.getConsumerConfig().getGroupName());
          consumer.setInstanceName(config.getConsumerConfig().getInstanceName());
          consumer.setVipChannelEnabled(false);
          consumer.setConsumeMessageBatchMaxSize(config.getConsumerConfig().getConsumeMessageBatchMaxSize());
          consumer.setNamesrvAddr(config.getNamesrvAddr());
          // 从队列头开始消费
          consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
      }
    
      @Override
      public boolean isStarted() {
          return isStarted;
      }
    
      @Override
      public void start() {
          consumer.registerMessageListener(new MessageListenerConcurrently() {
              @Override
              public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext Context) {
                    for (MessageExt msg : msgs) {
                        System.out.println(msg.getbody());
                    }
                    if (handler != null) {
                        handler.handle(t);
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
              }
          });
    
          try {
              for(Entry<String, String> sub : config.getConsumerConfig().getSubscriptions().entrySet()) {
                  consumer.subscribe(sub.getKey(), sub.getValue());
              }
              consumer.start();
              isStarted = true;
          } catch (Exception ex) {
              isStarted = false;
              throw new Exception(MqLibConstants.LIB_MQ_ROCKETMQ, MqLibExceptionEnum.MQ_CLIENT_EXCEPTION, ex);
          }
      }
    
      @Override
      public void stop() {
          for(Entry<String, String> sub : config.getConsumerConfig().getSubscriptions().entrySet()) {
              consumer.unsubscribe(sub.getKey());
          }
          consumer.shutdown();
          isStarted = false;
      }
    
      @Override
      public void setHandler(MqMessageHandler handler){
          this.handler = handler;
      }
    }
    

消息发送与消费

  • 创建Producer类,在其中创建RocketMqConfig、RocketMqProducer对象;

    public class Producer {
        public static void main(String[] args) {
            RocketMqConfig config = new RocketMqConfig();
            MqProducerClient producer = new MqProducerClient(config);
            producer.start();//启动生产者
            JsonMessage jsonMessage = new JsonMessage();//自定义消息类
            jsonMessage.setData("JsonMessage!");
            producer.send("json", jsonMessage);
        }
    }
    
  • 创建Consumer类,在其中创建RocketMqConfig、RocketMqConsumer对象;

    public class Consumer {
        public static void main(String[] args) {
            RocketMqConfig config = new RocketMqConfig();
            Map<String,String> map = new HashMap();
            map.put(config.getProducerConfig().getTopic(),"json");
            config.getConsumerConfig().setSubscriptions(map);
            MqConsumerClient consumer = new MqConsumerClient(config);
            consumer.start();
        }
    }
    

启动两个类,同时别忘了启动RocketMQ的服务。可以在RocketMQ可视化界面看到生产者、消费者以及消息等信息

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 1. Apache Kafka是一个分布式流平台 1.1 流平台有三个关键功能: 发布和订阅流记录,类似于一个消息...
    程序员日常填坑阅读 1,469评论 0 0
  • RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点: 能够保证严格的消息顺序 提供丰富的消息拉取模式...
    AI乔治阅读 6,261评论 2 5
  • RocketMQ4.X JMS Java消息服务(Java Message Service),Java平台中关于面...
    方穹轩阅读 4,010评论 0 1
  • Kafka入门经典教程-Kafka-about云开发 http://www.aboutyun.com/threa...
    葡萄喃喃呓语阅读 13,691评论 4 54
  • 表情是什么,我认为表情就是表现出来的情绪。表情可以传达很多信息。高兴了当然就笑了,难过就哭了。两者是相互影响密不可...
    Persistenc_6aea阅读 126,902评论 2 7