个人专题目录
七、API简介
1 Producer API简介
1.1 发送消息
MessageProducer.
send(Message message);发送消息到默认目的地,就是创建Producer时指定的目的地。
send(Destination destination, Message message); 发送消息到指定目的地,Producer不建议绑定目的地。也就是创建Producer的时候,不绑定目的地。session.createProducer(null)。
send(Message message, int deliveryMode, int priority, long timeToLive);发送消息到默认目的地,且设置相关参数。deliveryMode-持久化方式(DeliveryMode.PERSISTENT| DeliveryMode.NON_PERSISTENT)。priority-优先级。timeToLive-消息有效期(单位毫秒)。
send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive); 发送消息到指定目的地,且设置相关参数。
1.2 消息有效期
消息过期后,默认会将失效消息保存到“死信队列(ActiveMQ.DLQ)”。
不持久化的消息,在超时后直接丢弃,不会保存到死信队列中。
死信队列名称可配置,死信队列中的消息不能恢复。
死信队列是在activemq.xml中配置的。
1.3 消息优先级
不需特殊关注。
我们可以在发送消息时,指定消息的权重,broker可以建议权重较高的消息将会优先发送给Consumer。在某些场景下,我们通常希望权重较高的消息优先传送;不过因为各种原因,priority并不能决定消息传送的严格顺序(order)。
JMS标准中约定priority可以为09的整数数值,值越大表示权重越高,默认值为4。不过activeMQ中各个存储器对priority的支持并非完全一样。比如JDBC存储器可以支持09,因为JDBC存储器可以基于priority对消息进行排序和索引化;但是对于kahadb/levelDB等这种基于日志文件的存储器而言,priority支持相对较弱,只能识别三种优先级(LOW: < 4,NORMAL: =4,HIGH: > 4)。
1.3.1 开启
在broker端,默认是不存储priority信息的,我们需要手动开启,修改activemq.xml配置文件,在broker标签的子标签policyEntries中增加下述配置:
<policyEntry queue=">" prioritizedMessages="true"/>
不过对于“非持久化”类型的消息(如果没有被swap到临时文件),它们被保存在内存中,它们不存在从文件Paged in到内存的过程,因为可以保证优先级较高的消息,总是在prefetch的时候被优先获取,这也是“非持久化”消息可以担保消息发送顺序的优点。
Broker在收到Producer的消息之后,将会把消息cache到内存,如果消息需要持久化,那么同时也会把消息写入文件;如果通道中Consumer的消费速度足够快(即积压的消息很少,尚未超过内存限制,我们通过上文能够知道,每个通道都可以有一定的内存用来cache消息),那么消息几乎不需要从存储文件中Paged In,直接就能从内存的cache中获取即可,这种情况下,priority可以担保“全局顺序”;不过,如果消费者滞后太多,cache已满,就会触发新接收的消息直接保存在磁盘中,那么此时,priority就没有那么有效了。
在Queue中,prefetch的消息列表默认将会采用“轮询”的方式(roundRobin,注意并不是roundRobinDispatch)[备注:因为Queue不支持任何DispatchPolicy],依次添加到每个consumer的pending buffer中,比如有m1-m2-m3-m4四条消息,有C1-C2两个消费者,那么: m1->C1,m2->C2,m3->C1,m4->C2。这种轮序方式,会对基于权重的消息发送有些额外的影响,假如四条消息的权重都不同,但是(m1,m3)->C1,事实上m2的权重>m3,对于C1而言,它似乎丢失了“顺序性”。
1.3.2 强顺序
<policyEntry queue=">" strictOrderDispatch="true"/>
strictOrderDispatch“严格顺序转发”,这是区别于“轮询”的一种消息转发手段;不过不要误解它为“全局严格顺序”,它只不过是将prefetch的消息依次填满每个consumer的pending buffer。比如上述例子中,如果C1-C2两个消费者的buffer尺寸为3,那么(m1,m2,m3)->C1,(m4)->C2;当C1填充完毕之后,才会填充C2。由此这种策略可以保证buffer中所有的消息都是“权重临近的”、有序的。(需要注意:strictOrderDispatch并非是解决priority消息顺序的问题而生,只是在使用priority时需要关注它)。
1.3.3 严格顺序
policyEntry queue=">" prioritizedMessages="true" useCache="false" expireMessagesPeriod="0" queuePrefetch="1"/>
useCache=false来关闭内存,强制将所有的消息都立即写入文件(索引化,但是会降低消息的转发效率);queuePrefetch=1来约束每个consumer任何时刻只有一个消息正在处理,那些消息消费之后,将会从文件中重新获取,这大大增加了消息文件操作的次数,不过每次读取肯定都是priority最高的消息。
2 Consumer API简介
2.1 消息的确认
Consumer拉取消息后,如果没有做确认acknowledge,此消息不会从MQ中删除。
消息的如果拉去到consumer后,未确认,那么消息被锁定。如果consumer关闭的时候仍旧没有确认消息,则释放消息锁定信息。消息将发送给其他的consumer处理。
消息一旦处理,应该必须确认。类似数据库中的事务管理机制。
2.2 消息的过滤
对消息消费者处理的消息数据进行过滤。这种处理可以明确消费者的角色,细分消费者的功能。
设置过滤:
Session.createConsumer(Destination destination, String messageSelector);
过滤信息为字符串,语法类似SQL92中的where子句条件信息。可以使用诸如AND、OR、IN、NOT IN等关键字。详细内容可以查看javax.jms.Message的帮助文档。
注意:消息的生产者在发送消息的的时候,必须设置可过滤的属性信息,所有的属性信息设置方法格式为:setXxxxProperty(String name, T value)。 其中方法名中的Xxxx是类型,如setObjectProperty/setStringProperty等。