消息发送方式
同步发送消息
同步发送消息是指,Producer发送一条消息后,会在收到MQ返回的ack后才发送下一条消息,该方式的消息可靠性最高,但是消息发送效率太低

同步发送消息.png
public class SyncProducer {
public static void main(String[] args) throws Exception{
//创建一个producer, 参数为producer group
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 指定namesever地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 设置当发送失败时重试发送的次数,默认两次
producer.setRetryTimesWhenSendFailed(3);
// 设置发送超时时间
producer.setSendMsgTimeout(5000);
// 开启生产者
producer.start();
// 发送消息
for(int i =0;i<10;i++){
byte[] body = ("Hi," + i).getBytes();
Message msg = new Message("topic","tag",body);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
// 关闭生产者
producer.shutdown();
}
}
异步发送消息
异步发送消息是指,Producer发出消息后无需等待MQ返回ack,直接发送下一条消息,该方式的消息可靠性可以得到保障,消息发送效率也可以

异步发送消息.png
public class AsyncProducer {
public static void main(String[] args) throws Exception{
//创建一个producer, 参数为producer group
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 指定namesever地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 设置发送超时时间
producer.setSendMsgTimeout(5000);
// 开启生产者
producer.start();
for(int i=0;i<10;i++){
byte[] body = ("Hi," + i).getBytes();
try{
Message msg = new Message("async-topic","async-tag",body);
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable throwable) {
throwable.printStackTrace();
}
});
}catch (Exception e){
e.printStackTrace();
}
}
// 因为是异步的,所以需要主线程休眠一会等待异步任务
TimeUnit.SECONDS.sleep(3);
producer.shutdown();
}
}
单向发送消息
单向发送消息是指,Producer仅发负责发送消息,不等待,不处理MQ的ack,该发送方式MQ也不返回ack,该方式消息发送效率最高,但是消息可靠性差。

单向发送.png
public class OnewayProducer {
public static void main(String[] args) throws Exception{
//创建一个producer, 参数为producer group
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 指定namesever地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 设置发送超时时间
producer.setSendMsgTimeout(5000);
// 开启生产者
producer.start();
// 发送消息
for(int i =0;i<10;i++){
byte[] body = ("Hi," + i).getBytes();
Message msg = new Message("oneway-topic","oneway-tag",body);
// 方法没有返回值
producer.sendOneway(msg);
}
// 关闭生产者
producer.shutdown();
}
}
消息消费
public class SomeConsumer {
public static void main(String[] args) throws Exception{
// 定义一个push消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("someTopic","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
// 一但broker中有了其订阅的消息就会触发该方法的执行
// 方法返回值为当前consumer消费的状态
// 这里虽然为一个列表,但是每次默认只能消费一条消息,通过 consumer.getConsumeMessageBatchMaxSize();可以得到默认值,也可以改成批量消费
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 逐条消费消息
for(MessageExt msg:list){
System.out.println(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
Consumer的pullBatchSize属性与consumeMessageBatchMaxSize属性是否设置越大越好,当然不是
- pullBatchSize值设置的越大,Consumer每拉取一次需要的时间就会越长,且在网络上传输问题的可能性就越高,若在拉取过程中出现问题,那么本批次所有的消息都需要全部重新拉取。
- consumerMessageBatchMaxSize值设置的越大,Consumer的消息并发消费能力越低,且这批被消费的消息具有相同的消费结果,因为consumerMessageBatchSize指定的一批消息只会使用一个线程进行处理,且在处理过程中只要有一个消息处理异常,则这批消息需要全部重新再次消费处理。
有序性分类
根据有序范围的不同,Rocketmq可以严格的保证消息的有序性:分区有序性与全局有序性。
-
当发送和消费参与的queue只有一个时所保证的有序性是整个Topic中的消息顺序,称为全局有序。
image.png -
如果有多个queue参与,其仅可保证在该queue分区队列上的消息顺序,则称为分区有序。
image.png
public class SyncProducer {
public static void main(String[] args) throws Exception{
//创建一个producer, 参数为producer group
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 指定namesever地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 开启生产者
producer.start();
// 发送消息
for(int i =0;i<10;i++){
Integer orderId = i;
byte[] body = ("Hi," + i).getBytes();
Message msg = new Message("topic","tag",body);
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
Integer id = (Integer) o;
int index = id % list.size();
return list.get(index);
}
},orderId);
System.out.println(sendResult);
}
// 关闭生产者
producer.shutdown();
}
}
延迟消息
当消息写入到Broker后,在指定的时长后才可以被消费处理,称为延迟消息
采用rocketmq的延迟消息可以实现定时任务的功能,而不用使用定时器,典型的应用场景是,电商交易中超时未支付关闭订单的场景

image.png
事务消息
代码举例
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
/**
* 回调方法
* @param msg
* @param arg
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
/**
* 消息回查方法:
* 1、回调操作返回UNKNOW
* 2、TC没有收到TM的最终全局事务确认指令
* @param msg
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
事务消息场景举例
工行用户A向建行用户B转账1万元

image.png
问题点:这里 1,2,3 没有实现原子性,那么A账号没有扣款成功,但是消息已经发送成功了,这时候就会导致B的账号增加了1万元,就会出现问题,这时候就需要事务消息来解决这个问题。

image.png
该分布式事务的解决方案是依赖于XA模式的,上图中的第三步与TC向Broker发送预提交消息,这里的预提交消息(半事务消息)就是消费者还不能消费的消息。当执行到图中第9步骤的时候,才会真正的写入消息到Broker中,简单理解TC就是管理各个分支事务的状态,这里可以看到工行系统,Broker系统是两个分支事务。TM是事务管理者,一般由Producer担任。

