RocketMQ学习教程:05.Message消息对象【云图智联】

在RocketMQ中,生产者使用Message对象表示一条消息,本文对Message的属性和构造方法进行详细的讲解。

1 Message属性

Message的定义如下所示:

org.apache.rocketmq.common.message.Message

public class Message implements Serializable {

    private String topic;

    private int flag;

    private Map<String, String> properties;

    private byte[] body;

    private String transactionId;

其中:

topic:表示消息要到的发送主题,必填

 flag:选填,消息的标记,完全由应用设置,RocketMQ不做任何处理,类似于memcached中flag的作用。

 properties:消息属性,主要存储一些消息的元数据信息

body:消息的内容,这是一个字节数组,序列化方式由应用决定,例如你可以将一个json转为字节数组,也可以通过protol buffer、hessian编码转为字节数组。

transactionId:事务id,仅在事务消息中使用到

Message数据结构中各个字段都可以通过get、set方式访问,例如访问topic

msg.setTopic("TopicTest”)

msg.getTopic()

2 构造方法

Message提供了以下构造方法:

public Message()

public Message(String topic, byte[] body)

public Message(String topic, String tags, byte[] body)

public Message(String topic, String tags, String keys, byte[] body)

public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK)

上述构造方法中的参数,会被设置到Message的成员变量中。可以看到部分参数在Message成员变量中并没有定义,这些参数都可以可选的,会被设置到properties中:

tags:表示消息的标签,消费者在消费时,可以根据标签进行过滤,需要注意的是,一个生产者,只能指定一个tag

keys:用于建立索引,之后可以通过命令工具/API/或者管理平台查询key,可以为一个消息设置多个key,用空格""进行分割

waitStoreMsgOK:表示发送消息后,是否需要等待消息同步刷新到磁盘上。如果broker配置为ASYNC_MASTER,那么只需要消息在master上刷新到磁盘即可;如果配置为SYNC_MASTER,那么还需要等待slave也刷新到磁盘。需要注意的是,waitStoreMsgOK默认为false,只有将设置为true的情况下,才会等待刷盘成功再返回。

除了通过构造方法给Message成员变量赋值,也可以通过相关set方法进行赋值,通过对应的get方法取值。除了可以设置上述构造方法中出现的参数,还可以设置:

public void setBuyerId(String buyerId)

public void setDelayTimeLevel(int level)

public void putUserProperty(final String name, final String value)

其中:

BuyerId:

DelayTimeLevel:设置消息的延迟级别,0表示不延迟,大于0会延迟特定时间才被消费

putUserProperty:自定义消息属性。前面提到tags、keys、waitStoreMsgOK等,都会设置到properties中。如果开发者有自定义消息属性的需求,可以通过此方法进行设置。

3 消息属性保留关键字

properties字段有一些保留的关键字,这些保留关键字定义在MessageConst类中,用户在自定义消息属性时,需要避开这些关键字。注意,rocketmq还提供了putProperty和setProperties两个方法,但是这里不建议用户使用,因为可能会与rocketmq保留的属性名冲突。 

在MessageConst类中,定义了这些属性的key,例如生产者可能会使用到:

org.apache.rocketmq.common.message.MessageConst

public class MessageConst {

    //消息的key

    public static final String PROPERTY_KEYS = "KEYS";

    //消息的tag

    public static final String PROPERTY_TAGS = "TAGS";

    //是否waitStoreMsgOK

    public static final String PROPERTY_WAIT_STORE_MSG_OK = "WAIT";

    //消息延迟级别

    public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";

    //BUYER_ID

    public static final String PROPERTY_BUYER_ID = "BUYER_ID";

    ...

免费学习视频欢迎关注云图智联:https://e.yuntuzhilian.com/

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。