Rocket MQ系列五 - 发送消息实践

发送普通消息

    普通消息,也叫并发消息,是发送效率最高、使用场景最多的一类消息。发送普通消息的代码如下:
    public static void main(String[] args) throws MQClientException, InterruptedException {

        // 创建默认的生产者
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.start();

        for (int i = 0; i < 128; i++)
            try {
                {
                    // 创建消息对象
                    Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }
        // 关闭
        producer.shutdown();
    }
image.gif

发送顺序消息

    同步发送消息是,根据HashKey将消息发送到指定的分区中,每个分区中的消息都是按照发送顺序保存的,即分区有序。如果Topic的分区被设置为1,这个Topic的消息就是全局有序的。注意,顺序消息的发送必须是单线程,多线程将不再有序。顺序消息的消费和普通消息的消费方式不同。代码如下:
        // 第一步 初始化生产者,配置生产者参数,启动生产者
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

        producer.start();

        // 初始化消息体
        Message msg = new Message("TopicTest",
                "TagA",
                "Hello RocketMQ ".getBytes(RemotingHelper.DEFAULT_CHARSET));

        Integer hashKey = 123;

        // 核心操作 MessageQueueSelector ,根据 hashKey选择当前消息发送到那个分区中
        SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                Integer id = (Integer) arg;
                int index = id % mqs.size();
                return mqs.get(index);
            }
        }, hashKey);

        producer.shutdown();
image.gif

发送延迟消息

    生产者发送消息后,消费者在指定时间才能消费消息,这类消息被称为延迟消息或定时消息。生产者发送延迟消息前需要设置延迟级别,目前开源版本支持18个延迟级别。
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
image.gif
    Broker 在接收用户发送的消息后,首先将消息保存到名为 CHEDULE_TOPIC_XXXX的Topic中。此时,消费者无法消费该延迟消息。然后,由Broker端的定时投递任务定时投递给消费者。

    保存延迟消息的实现逻辑 见类chduleMessageService。按照配置的延迟级别初始化多个任务,每秒执行一次。如果消息投递满足时间条件,那么将消息投递到原始Topic中。消费者此时可以消费该延迟消息。

发送事务消息

    事务消息的发送、消费流程和延迟消息类似,都是先发送到对消费者不可见的Topic中。当事务消息被生产者提交后,会被二次投递到原始Topic中,此时消费者正常消费。事务消息的发送具体分为以下两步:

    第一步:用户发送一个half消息到Broker,Broker设置 queueOffset = 0,即对消费者不可见。

    第二步:用户本地事务处理成功,发送一个commit,消息到Broker,Broker修改queueOffset为正常值,达到重新投递的目的,此时消费者可以正常消费;如果本地事务处理失败,那么将发送一个RollBack消息给Broker,Broker删除half消息。

        // 第一步 初始化事务消息生产者
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        // 第二步 配置生产者的各个参数和Broker回调,检查本地事务处理并启动生产者
        producer.setCheckThreadPoolMaxSize(2);
        producer.setCheckThreadPoolMinSize(2);
        producer.setCheckRequestHoldMax(2000);
        producer.setTransactionCheckListener(new TransactionCheckListener() {
            private AtomicInteger transactionIndex = new AtomicInteger(0);

            @Override
            public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
                int value = transactionIndex.getAndIncrement();
                if (value % 6 == 0) {
                    throw new RuntimeException("Cloud not find db");
                } else if (value % 5 == 0) {
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                } else if (value % 4 == 0) {
                    return LocalTransactionState.COMMIT_MESSAGE;
                }
                return LocalTransactionState.UNKNOW;
            }
        });

        producer.start();

        // 第三步 设置本地事务处理器,发送消息
        Message msg = new Message("TopicTest",
                "tags",
                "KEY",
                ("Hello EocketMQ " + System.currentTimeMillis()).getBytes(RemotingHelper.DEFAULT_CHARSET));

        SendResult sendResult = producer.sendMessageInTransaction(msg, new LocalTransactionExecuter() {
            private AtomicInteger transactionIndex = new AtomicInteger(1);

            @Override
            public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
                int value = transactionIndex.getAndIncrement();
                if (value == 0) {
                    throw new RuntimeException("Cloud not find db");
                } else if (value % 5 == 0) {
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                } else if (value % 4 == 0) {
                    return LocalTransactionState.COMMIT_MESSAGE;
                }
                return LocalTransactionState.UNKNOW;
            }
        }, null);

        System.out.printf("%s%n", sendResult);
image.gif

发送单向消息

    单向消息的生产者只管发送过程,不管发送结果。单向消息主要用于日志传输等消息允许丢失的场景。
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("127.0.0.1:9876;127.0.0.2:9876;");
        producer.setInstanceName("instanceName");
        producer.start();

        Message msg = new Message("TopicTest", "TagA", "OrderID1", "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET));
        producer.sendOneway(msg);

image.gif

批量发送消息

    批量发送消息能提高发送效率,提升系统吞吐量。批量消息发送有一以下3点注意事项:

    (1)消息最好小于1MB

    (2)同一批批量消息的Topic、waitStoreMsgOK属性必须一致

    (3)批量消息不支持延迟消息
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
        producer.start();

        //If you just send messages of no more than 1MiB at a time, it is easy to use batch
        //Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support
        String topic = "BatchTest";
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));

        producer.send(messages);
    }
image.gif
    相对消费者而言,生产者的使用更加简单,一般主要关注消息类型、消息发送方法和方法参数。

    常用的消息类型如下:

| 消息类型 | 优点 | 缺点 | 备注 |
| 普通消息(并发消息) | 性能最好,单机TPS级别为100 000 | 消息的生产和消费都是无序的 | 大部分场景使用 |
| 分区有序消息 | 单分区中消息有序,单机发送TPS万级别 | 单点问题,如果Broker宕机,则会导致发送失败 | 大部分有序消息场景适用 |
| 全局有序消息 | 类似传统的Queue,全部消息有序,单机发送TPS千级别 | 单点问题,如果Broker宕机,则会导致发送失败 | 极少场景使用 |
| 延迟消息 | RocketMQ自身支持,不需要额外使用组件,支持延迟特性 | 不能根据任意时间延迟,使用范围受限。Broker随着延迟级别增大支持越多,CPU压力越大,延迟时间不准确 | 非精确,延迟级别不多的场景,非常方便使用 |
| 事务消息 | RocketMQ自身支持,不需要额外使用组件支持事务特性 | RocketMQ事务是生产者事务,只有生产者参与,如果消费者处理失败则事务失效 | 简单事务处理可以使用 |

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,366评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,521评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,689评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,925评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,942评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,727评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,447评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,349评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,820评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,990评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,127评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,812评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,471评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,017评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,142评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,388评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,066评论 2 355

推荐阅读更多精彩内容