1.添加依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.8.1.Final</version>
</dependency>
2.配置文件代码
application.yml
# 阿里云组件
aliyun:
accessId:
accessKey:
rocketmq:
namesrv-Addr:
#消费者组
groupName_record_downLoad:
groupName_record_asr:
#主题名称 主题一般是服务器设置好 而不能在代码里去新建topic( 如果没有创建好,生产者往该主题发送消息 会报找不到topic错误)
topic:
#超时时间
timeOut: 6000
#消费端线程数
consume_thread_nums: 20
3.连接配置文件的配置类RocketConfig.java
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Data
@Component
public class RocketConfig {
private String accessKey;
private String secretKey;
private String namesrvaAddr;
private String groupNameRecord;
private String groupNameRecordAsr;
private String topic;
private String timeOut;
private String consumeThreadNums;
public RocketConfig(@Value("${aliyun.accessId}")String accessKey,@Value("${aliyun.accessKey}")String secretKey,@Value("${aliyun.rocketmq.namesrv-Addr}")String namesrvaAddr, @Value("${aliyun.rocketmq.groupName_record_downLoad}")String groupNameRecord,@Value("${aliyun.rocketmq.groupName_record_asr}")String groupNameRecordAsr, @Value("${aliyun.rocketmq.topic}")String topic, @Value("${aliyun.rocketmq.timeOut}")String timeOut, @Value("${aliyun.rocketmq.consume_thread_nums}")String consumeThreadNums) {
this.accessKey = accessKey;
this.secretKey = secretKey;
this.namesrvaAddr = namesrvaAddr;
this.groupNameRecord = groupNameRecord;
this.groupNameRecordAsr = groupNameRecordAsr;
this.topic = topic;
this.timeOut = timeOut;
this.consumeThreadNums = consumeThreadNums;
}
}
4.发送消息的tag配置类
/**
* @Description tag名称
*/
public class RocketTag {
public static final String RECORD_ASR= "recordAsr";
public static final String RECORD = "record";
}
5.生产者代码
import com.aliyun.openservices.ons.api.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import java.util.Date;
import java.util.Properties;
@Slf4j
@Component
public class RocketMqProducer {
private String producerId = "test_producer";
private Producer producer;
private RocketConfig rocketConfig;
public RocketMqProducer(RocketConfig rocketConfig){
this.rocketConfig = rocketConfig;
log.info("初始化启动rocketMq生产者!");
// producer 实例配置初始化
Properties properties = new Properties();
//您在控制台创建的Producer ID
properties.setProperty(PropertyKeyConst.ProducerId, producerId);
// AccessKeyId 阿里云身份验证,在阿里云服务器管理控制台创建。
properties.put(PropertyKeyConst.AccessKey, rocketConfig.getAccessKey());
// AccessKeySecret 阿里云身份验证,在阿里云服务器管理控制台创建。
properties.put(PropertyKeyConst.SecretKey, rocketConfig.getSecretKey());
//设置发送超时时间,单位毫秒。
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis,rocketConfig.getTimeOut());
// 设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
properties.put(PropertyKeyConst.NAMESRV_ADDR, rocketConfig.getNamesrvaAddr());
producer = ONSFactory.createProducer(properties);
// 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
producer.start();
}
/**
* 异步发送消息
* 可靠异步发送:发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式;
* 特点:速度快;有结果反馈;数据可靠;
* 应用场景:异步发送一般用于链路耗时较长,对 rt响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等;
* @param msg
* @return
*/
public boolean send(String tag,String msg){
Message message = new Message(this.rocketConfig.getTopic(), tag, msg.getBytes());
this.producer.sendAsync(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
///消息发送成功
log.info("send message success. msg:{}, topic:{},msgId:{}" ,msg,sendResult.getMessageId(), sendResult.getMessageId());
}
@Override
public void onException(OnExceptionContext context) {
//消息发送失败
log.info("send message failed. execption: {},topic:{},msgId:{}", context.getException(),context.getTopic(), context.getMessageId());
}
});
return true;
}
/**
* 同步发送实体对象消息
* 可靠同步发送:同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式;
* 特点:速度快;有结果反馈;数据可靠;
* 应用场景:应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等;
*/
public boolean sendMsg(String tag,String msg) {
Long startTime = System.currentTimeMillis();
Message message = new Message(rocketConfig.getTopic(), tag, msg.getBytes());
SendResult sendResult =this.producer.send(message);
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + message.getTopic() + " msgId is: " + sendResult.getMessageId());
} else {
log.warn(".sendResult is null.........");
}
Long endTime = System.currentTimeMillis();
System.out.println("单次生产耗时:"+(endTime-startTime)/1000);
return true;
}
/**
* 单向发送
* 单向发送:只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答;此方式发送消息的过程耗时非常短,一般在微秒级别;
* 特点:速度最快,耗时非常短,毫秒级别;无结果反馈;数据不可靠,可能会丢失;
* 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集;
* @return
*/
public boolean sendMsgOneway(String tag,String msg) {
Long startTime = System.currentTimeMillis();
Message message = new Message(rocketConfig.getTopic(), tag, msg.getBytes());
this.producer.sendOneway(message);
Long endTime = System.currentTimeMillis();
System.out.println("单次生产耗时:"+(endTime-startTime)/1000);
return true;
}
/**
* 一般在应用上下文,使用上下文监听器,进行关闭
*/
@PreDestroy
public void shutdown(){
this.producer.shutdown();
}
}
6.消费者代码
import com.aliyun.openservices.ons.api.*;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
@Slf4j
@Component
public class RecordUpLoadConsumer {
/**
* 消费者实体对象
*/
private Consumer consumer;
public RecordUpLoadConsumer(RocketConfig rocketConfig){
// consumer 实例配置初始化
Properties properties = new Properties();
//您在控制台创建的consumer ID
properties.setProperty(PropertyKeyConst.GROUP_ID, rocketConfig.getGroupNameRecord());
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.setProperty(PropertyKeyConst.AccessKey, rocketConfig.getAccessKey());
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.setProperty(PropertyKeyConst.SecretKey, rocketConfig.getSecretKey());
//设置发送超时时间,单位毫秒
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, rocketConfig.getTimeOut());
//设置消费端线程数
properties.put(PropertyKeyConst.ConsumeThreadNums,rocketConfig.getConsumeThreadNums());
// 设置 TCP 接入域名(此处以公共云生产环境为例),设置 TCP 接入域名,进入 MQ 控制台的消费者管理页面,在左侧操作栏单击获取接入点获取
properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, rocketConfig.getNamesrvaAddr());
consumer = ONSFactory.createConsumer(properties);
consumer.subscribe(rocketConfig.getTopic(), RocketTag.RECORD, new MessageListener() {
@SneakyThrows
@Override
public Action consume(Message message, ConsumeContext consumeContext) {
byte[] body = message.getBody();
String msg= new String(body);//获取到接收的消息,由于接收到的是byte数组,所以需要转换成字符串
log.info("Consumer-获取消息-主题topic为={}, 消费消息为={}", message.getTopic(), msg);
return Action.CommitMessage;
}
});//监听第一个topic,new对应的监听器
// consumer.subscribe(topic2, tag, new RocketmqTest2Listener());//监听另外一个topic,new对应的监听器
// 在发送消息前,必须调用start方法来启动consumer,只需调用一次即可,当项目关闭时,自动shutdown
consumer.start();
}
@PreDestroy
public void stop() {
if (consumer != null) {
consumer.shutdown();
}
}
}