阿里云rocketmq连接

1.添加依赖

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.5.2</version>
        </dependency>

        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.8.8.1.Final</version>
        </dependency>

2.配置文件代码

application.yml

# 阿里云组件
aliyun:
  accessId: 
  accessKey: 
  rocketmq:
    namesrv-Addr: 
    #消费者组
    groupName_record_downLoad: 
    groupName_record_asr: 
    #主题名称 主题一般是服务器设置好 而不能在代码里去新建topic( 如果没有创建好,生产者往该主题发送消息 会报找不到topic错误)
    topic: 
    #超时时间
    timeOut: 6000
    #消费端线程数
    consume_thread_nums: 20

3.连接配置文件的配置类RocketConfig.java

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Data
@Component
public class RocketConfig {
    private String accessKey;
    private String secretKey;
    private String namesrvaAddr;
    private String groupNameRecord;
    private String groupNameRecordAsr;
    private String topic;
    private String timeOut;
    private String consumeThreadNums;

    public RocketConfig(@Value("${aliyun.accessId}")String accessKey,@Value("${aliyun.accessKey}")String secretKey,@Value("${aliyun.rocketmq.namesrv-Addr}")String namesrvaAddr, @Value("${aliyun.rocketmq.groupName_record_downLoad}")String groupNameRecord,@Value("${aliyun.rocketmq.groupName_record_asr}")String groupNameRecordAsr, @Value("${aliyun.rocketmq.topic}")String topic, @Value("${aliyun.rocketmq.timeOut}")String timeOut, @Value("${aliyun.rocketmq.consume_thread_nums}")String consumeThreadNums) {
        this.accessKey = accessKey;
        this.secretKey = secretKey;
        this.namesrvaAddr = namesrvaAddr;
        this.groupNameRecord = groupNameRecord;
        this.groupNameRecordAsr = groupNameRecordAsr;
        this.topic = topic;
        this.timeOut = timeOut;
        this.consumeThreadNums = consumeThreadNums;
    }
}

4.发送消息的tag配置类

/**
 * @Description tag名称
 */
public class RocketTag {
     public static final String RECORD_ASR= "recordAsr";
     public static final String RECORD = "record";
}

5.生产者代码

import com.aliyun.openservices.ons.api.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PreDestroy;
import java.util.Date;
import java.util.Properties;

@Slf4j
@Component
public class RocketMqProducer {
    private String producerId = "test_producer";
    private Producer producer;

    private RocketConfig rocketConfig;

    public RocketMqProducer(RocketConfig rocketConfig){
        this.rocketConfig = rocketConfig;

        log.info("初始化启动rocketMq生产者!");
        // producer 实例配置初始化
        Properties properties = new Properties();
        //您在控制台创建的Producer ID
        properties.setProperty(PropertyKeyConst.ProducerId, producerId);
        // AccessKeyId 阿里云身份验证,在阿里云服务器管理控制台创建。
        properties.put(PropertyKeyConst.AccessKey, rocketConfig.getAccessKey());
        // AccessKeySecret 阿里云身份验证,在阿里云服务器管理控制台创建。
        properties.put(PropertyKeyConst.SecretKey, rocketConfig.getSecretKey());
        //设置发送超时时间,单位毫秒。
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis,rocketConfig.getTimeOut());
        // 设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, rocketConfig.getNamesrvaAddr());

        producer = ONSFactory.createProducer(properties);
        // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
        producer.start();
    }

    /**
     * 异步发送消息
     * 可靠异步发送:发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式;
     * 特点:速度快;有结果反馈;数据可靠;
     * 应用场景:异步发送一般用于链路耗时较长,对 rt响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等;
     * @param msg
     * @return
     */
    public boolean send(String tag,String msg){
        Message message = new Message(this.rocketConfig.getTopic(), tag, msg.getBytes());
        this.producer.sendAsync(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                ///消息发送成功
                log.info("send message success. msg:{}, topic:{},msgId:{}" ,msg,sendResult.getMessageId(), sendResult.getMessageId());
            }

            @Override
            public void onException(OnExceptionContext context) {
                //消息发送失败
                log.info("send message failed. execption: {},topic:{},msgId:{}", context.getException(),context.getTopic(), context.getMessageId());
            }
        });
        return true;
    }

    /**
     * 同步发送实体对象消息
     * 可靠同步发送:同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式;
     * 特点:速度快;有结果反馈;数据可靠;
     * 应用场景:应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等;
     */
    public boolean sendMsg(String tag,String msg) {
        Long startTime = System.currentTimeMillis();
        Message message = new Message(rocketConfig.getTopic(), tag, msg.getBytes());
        SendResult sendResult =this.producer.send(message);
        if (sendResult != null) {
            System.out.println(new Date() + " Send mq message success. Topic is:" + message.getTopic() + " msgId is: " + sendResult.getMessageId());
        } else {
            log.warn(".sendResult is null.........");
        }
        Long endTime = System.currentTimeMillis();
        System.out.println("单次生产耗时:"+(endTime-startTime)/1000);
        return true;
    }

    /**
     * 单向发送
     * 单向发送:只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答;此方式发送消息的过程耗时非常短,一般在微秒级别;
     * 特点:速度最快,耗时非常短,毫秒级别;无结果反馈;数据不可靠,可能会丢失;
     * 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集;
     * @return
     */
    public boolean sendMsgOneway(String tag,String msg) {
        Long startTime = System.currentTimeMillis();
        Message message = new Message(rocketConfig.getTopic(), tag, msg.getBytes());
        this.producer.sendOneway(message);
        Long endTime = System.currentTimeMillis();
        System.out.println("单次生产耗时:"+(endTime-startTime)/1000);
        return true;
    }

    /**
     * 一般在应用上下文,使用上下文监听器,进行关闭
     */
    @PreDestroy
    public void shutdown(){
        this.producer.shutdown();
    }
}

6.消费者代码

import com.aliyun.openservices.ons.api.*;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PreDestroy;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;

@Slf4j
@Component
public class RecordUpLoadConsumer {
    /**
     * 消费者实体对象
     */
    private Consumer consumer;

    public RecordUpLoadConsumer(RocketConfig rocketConfig){
        // consumer 实例配置初始化
        Properties properties = new Properties();
        //您在控制台创建的consumer ID
        properties.setProperty(PropertyKeyConst.GROUP_ID, rocketConfig.getGroupNameRecord());
        // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
        properties.setProperty(PropertyKeyConst.AccessKey, rocketConfig.getAccessKey());
        // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
        properties.setProperty(PropertyKeyConst.SecretKey, rocketConfig.getSecretKey());
        //设置发送超时时间,单位毫秒
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, rocketConfig.getTimeOut());
        //设置消费端线程数
        properties.put(PropertyKeyConst.ConsumeThreadNums,rocketConfig.getConsumeThreadNums());
        // 设置 TCP 接入域名(此处以公共云生产环境为例),设置 TCP 接入域名,进入 MQ 控制台的消费者管理页面,在左侧操作栏单击获取接入点获取
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, rocketConfig.getNamesrvaAddr());
        consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe(rocketConfig.getTopic(), RocketTag.RECORD, new MessageListener() {
            @SneakyThrows
            @Override
            public Action consume(Message message, ConsumeContext consumeContext) {
                byte[] body = message.getBody();
                String msg= new String(body);//获取到接收的消息,由于接收到的是byte数组,所以需要转换成字符串
                log.info("Consumer-获取消息-主题topic为={}, 消费消息为={}", message.getTopic(), msg);
                return Action.CommitMessage;
            }
        });//监听第一个topic,new对应的监听器
//      consumer.subscribe(topic2, tag, new RocketmqTest2Listener());//监听另外一个topic,new对应的监听器
        // 在发送消息前,必须调用start方法来启动consumer,只需调用一次即可,当项目关闭时,自动shutdown
        consumer.start();
    }

    @PreDestroy
    public void stop() {
        if (consumer != null) {
            consumer.shutdown();
        }
    }
}
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容