MQ应用

消息发送方式

同步发送消息

同步发送消息是指,Producer发送一条消息后,会在收到MQ返回的ack后才发送下一条消息,该方式的消息可靠性最高,但是消息发送效率太低


同步发送消息.png
public class SyncProducer {
    public static void main(String[] args) throws Exception{
        //创建一个producer, 参数为producer group
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        // 指定namesever地址
        producer.setNamesrvAddr("127.0.0.1:9876");

        // 设置当发送失败时重试发送的次数,默认两次
        producer.setRetryTimesWhenSendFailed(3);

        // 设置发送超时时间
        producer.setSendMsgTimeout(5000);
        // 开启生产者
        producer.start();

        // 发送消息
        for(int i =0;i<10;i++){
            byte[] body = ("Hi," + i).getBytes();
            Message msg = new Message("topic","tag",body);
            SendResult sendResult = producer.send(msg);
            System.out.println(sendResult);
        }
        // 关闭生产者
        producer.shutdown();
    }
}

异步发送消息

异步发送消息是指,Producer发出消息后无需等待MQ返回ack,直接发送下一条消息,该方式的消息可靠性可以得到保障,消息发送效率也可以


异步发送消息.png
public class AsyncProducer {
    public static void main(String[] args) throws Exception{
        //创建一个producer, 参数为producer group
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        // 指定namesever地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 设置发送超时时间
        producer.setSendMsgTimeout(5000);
        // 开启生产者
        producer.start();

        for(int i=0;i<10;i++){
            byte[] body = ("Hi," + i).getBytes();
            try{
                Message msg = new Message("async-topic","async-tag",body);
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.println(sendResult);
                    }

                    @Override
                    public void onException(Throwable throwable) {
                        throwable.printStackTrace();
                    }
                });
            }catch (Exception e){
                e.printStackTrace();
            }
        }
        // 因为是异步的,所以需要主线程休眠一会等待异步任务
        TimeUnit.SECONDS.sleep(3);
        producer.shutdown();
    }
}

单向发送消息

单向发送消息是指,Producer仅发负责发送消息,不等待,不处理MQ的ack,该发送方式MQ也不返回ack,该方式消息发送效率最高,但是消息可靠性差。


单向发送.png
public class OnewayProducer {
    public static void main(String[] args) throws Exception{
        //创建一个producer, 参数为producer group
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        // 指定namesever地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 设置发送超时时间
        producer.setSendMsgTimeout(5000);
        // 开启生产者
        producer.start();

        // 发送消息
        for(int i =0;i<10;i++){
            byte[] body = ("Hi," + i).getBytes();
            Message msg = new Message("oneway-topic","oneway-tag",body);
            // 方法没有返回值
            producer.sendOneway(msg);
        }
        // 关闭生产者
        producer.shutdown();
    }
}

消息消费

public class SomeConsumer {
    public static void main(String[] args) throws Exception{
        // 定义一个push消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("someTopic","*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            // 一但broker中有了其订阅的消息就会触发该方法的执行
            // 方法返回值为当前consumer消费的状态
            // 这里虽然为一个列表,但是每次默认只能消费一条消息,通过 consumer.getConsumeMessageBatchMaxSize();可以得到默认值,也可以改成批量消费
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                // 逐条消费消息
                for(MessageExt msg:list){
                    System.out.println(msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }

        });
        consumer.start();
    }
}
Consumer的pullBatchSize属性与consumeMessageBatchMaxSize属性是否设置越大越好,当然不是
- pullBatchSize值设置的越大,Consumer每拉取一次需要的时间就会越长,且在网络上传输问题的可能性就越高,若在拉取过程中出现问题,那么本批次所有的消息都需要全部重新拉取。
- consumerMessageBatchMaxSize值设置的越大,Consumer的消息并发消费能力越低,且这批被消费的消息具有相同的消费结果,因为consumerMessageBatchSize指定的一批消息只会使用一个线程进行处理,且在处理过程中只要有一个消息处理异常,则这批消息需要全部重新再次消费处理。

有序性分类

根据有序范围的不同,Rocketmq可以严格的保证消息的有序性:分区有序性与全局有序性。

  • 当发送和消费参与的queue只有一个时所保证的有序性是整个Topic中的消息顺序,称为全局有序。


    image.png
  • 如果有多个queue参与,其仅可保证在该queue分区队列上的消息顺序,则称为分区有序。


    image.png
public class SyncProducer {
    public static void main(String[] args) throws Exception{
        //创建一个producer, 参数为producer group
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        // 指定namesever地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 开启生产者
        producer.start();

        // 发送消息
        for(int i =0;i<10;i++){
            Integer orderId = i;
            byte[] body = ("Hi," + i).getBytes();
            Message msg = new Message("topic","tag",body);
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                    Integer id = (Integer) o;
                    int index = id % list.size();
                    return list.get(index);
                }
            },orderId);
            System.out.println(sendResult);
        }
        // 关闭生产者
        producer.shutdown();
    }
}

延迟消息

当消息写入到Broker后,在指定的时长后才可以被消费处理,称为延迟消息
采用rocketmq的延迟消息可以实现定时任务的功能,而不用使用定时器,典型的应用场景是,电商交易中超时未支付关闭订单的场景


image.png

事务消息

代码举例

public class TransactionProducer {

    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                        new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}
public class TransactionListenerImpl implements TransactionListener {

    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();


    /**
     * 回调方法
     * @param msg
     * @param arg
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    /**
     * 消息回查方法:
     * 1、回调操作返回UNKNOW
     * 2、TC没有收到TM的最终全局事务确认指令
     * @param msg
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

事务消息场景举例

工行用户A向建行用户B转账1万元


image.png

问题点:这里 1,2,3 没有实现原子性,那么A账号没有扣款成功,但是消息已经发送成功了,这时候就会导致B的账号增加了1万元,就会出现问题,这时候就需要事务消息来解决这个问题。


image.png

该分布式事务的解决方案是依赖于XA模式的,上图中的第三步与TC向Broker发送预提交消息,这里的预提交消息(半事务消息)就是消费者还不能消费的消息。当执行到图中第9步骤的时候,才会真正的写入消息到Broker中,简单理解TC就是管理各个分支事务的状态,这里可以看到工行系统,Broker系统是两个分支事务。TM是事务管理者,一般由Producer担任。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容