rocketmq 5.x版本分别通过proxy和nameserver端口发送消息

注意1:rocketmq5.x 的v5版本已不支持通过nameserve发送消息,必须使用v2版本的sdk才可以。虽然可以通过v2版本的sdk发送消息,但是不推荐
注意2:不要抱残守缺,要与时俱进。本来不想写的这篇文章,已用了最新版本的rocketmq,那就用最新的sdk和方法,可以总是有些蠢开发,非要用以前的方式。
注意3:用v5版本调用nameserver的方式会报错:java.net.UnknownHostException: rocketmq-b-broker-1.rocketmq-b-broker.test.svc):no such host.
注意4:哪怕用v5的sdk,当并发较大的时候,也会报错java.net.UnknownHostException: rocketmq-b-broker-1.rocketmq-b-broker.test.svc):no such host. 个人感觉不像是rocketmq问题,有点像是k8s解析dns太慢导致的。一笔糊涂账。
注意5:个人对于阿里系的开源软件一向不感冒,这种公司只是一家为了赚钱而赚钱的公司,哪怕富可敌国,也没有抬头看星星的理念。当一个东西没利润的时候,他们说抛弃就抛弃。真心不如kafka,可惜人微言轻,不是架构师。说不上话

v5版本sdk通过proxy发送消息

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "strconv"
    "time"

    rmq_client "github.com/apache/rocketmq-clients/golang/v5"
    "github.com/apache/rocketmq-clients/golang/v5/credentials"
)

const (
    Topic    = "testtopic"
    Endpoint = "192.168.228.130:8081"
)

func producer_v5() {
    os.Setenv("mq.consoleAppender.enabled", "true")
    rmq_client.ResetLogger()
    producer, err := rmq_client.NewProducer(&rmq_client.Config{
        Endpoint:    Endpoint,
        Credentials: &credentials.SessionCredentials{},
    },
        rmq_client.WithTopics(Topic),
    )
    if err != nil {
        log.Fatal(err)
    }
    // start producer
    err = producer.Start()
    if err != nil {
        log.Fatal(err)
    }
    // graceful stop producer
    defer producer.GracefulStop()

    for i := 0; i < 10; i++ {
        // new a message
        msg := &rmq_client.Message{
            Topic: Topic,
            Body:  []byte("this is a message : " + strconv.Itoa(i)),
        }
        // set keys and tag
        msg.SetKeys("a", "b")
        msg.SetTag("ab")
        // send message in sync
        resp, err := producer.Send(context.TODO(), msg)
        if err != nil {
            log.Fatal(err)
        }
        for i := 0; i < len(resp); i++ {
            fmt.Printf("%#v\n", resp[i])
        }
        // wait a moment
        time.Sleep(time.Second * 1)
    }
}

v2版本sdk通过proxy发送消息

package  main
import  (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
func  producer_v2()  {
p,  err  := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.31.162:9876"}))
if err !=  nil  {
panic("生成 producer 失败")
}
if  err  = p.Start(); err !=  nil  {
panic("启动 producer 失败")
}
res,  err  := p.SendSync(context.Background(), primitive.NewMessage("RocketMQ",  []byte("this is RocketMQ")))
if err !=  nil  {
fmt.Printf("发送失败: %s\n", err)
}  else  {
fmt.Printf("发送成功: %s\n", res.String())
}
if  err  = p.Shutdown(); err !=  nil  {
panic("关闭 producer 失败")
}
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容