注意1:rocketmq5.x 的v5版本已不支持通过nameserve发送消息,必须使用v2版本的sdk才可以。虽然可以通过v2版本的sdk发送消息,但是不推荐
注意2:不要抱残守缺,要与时俱进。本来不想写的这篇文章,已用了最新版本的rocketmq,那就用最新的sdk和方法,可以总是有些蠢开发,非要用以前的方式。
注意3:用v5版本调用nameserver的方式会报错:java.net.UnknownHostException: rocketmq-b-broker-1.rocketmq-b-broker.test.svc):no such host.
注意4:哪怕用v5的sdk,当并发较大的时候,也会报错java.net.UnknownHostException: rocketmq-b-broker-1.rocketmq-b-broker.test.svc):no such host. 个人感觉不像是rocketmq问题,有点像是k8s解析dns太慢导致的。一笔糊涂账。
注意5:个人对于阿里系的开源软件一向不感冒,这种公司只是一家为了赚钱而赚钱的公司,哪怕富可敌国,也没有抬头看星星的理念。当一个东西没利润的时候,他们说抛弃就抛弃。真心不如kafka,可惜人微言轻,不是架构师。说不上话
v5版本sdk通过proxy发送消息
package main
import (
"context"
"fmt"
"log"
"os"
"strconv"
"time"
rmq_client "github.com/apache/rocketmq-clients/golang/v5"
"github.com/apache/rocketmq-clients/golang/v5/credentials"
)
const (
Topic = "testtopic"
Endpoint = "192.168.228.130:8081"
)
func producer_v5() {
os.Setenv("mq.consoleAppender.enabled", "true")
rmq_client.ResetLogger()
producer, err := rmq_client.NewProducer(&rmq_client.Config{
Endpoint: Endpoint,
Credentials: &credentials.SessionCredentials{},
},
rmq_client.WithTopics(Topic),
)
if err != nil {
log.Fatal(err)
}
// start producer
err = producer.Start()
if err != nil {
log.Fatal(err)
}
// graceful stop producer
defer producer.GracefulStop()
for i := 0; i < 10; i++ {
// new a message
msg := &rmq_client.Message{
Topic: Topic,
Body: []byte("this is a message : " + strconv.Itoa(i)),
}
// set keys and tag
msg.SetKeys("a", "b")
msg.SetTag("ab")
// send message in sync
resp, err := producer.Send(context.TODO(), msg)
if err != nil {
log.Fatal(err)
}
for i := 0; i < len(resp); i++ {
fmt.Printf("%#v\n", resp[i])
}
// wait a moment
time.Sleep(time.Second * 1)
}
}
v2版本sdk通过proxy发送消息
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
func producer_v2() {
p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.31.162:9876"}))
if err != nil {
panic("生成 producer 失败")
}
if err = p.Start(); err != nil {
panic("启动 producer 失败")
}
res, err := p.SendSync(context.Background(), primitive.NewMessage("RocketMQ", []byte("this is RocketMQ")))
if err != nil {
fmt.Printf("发送失败: %s\n", err)
} else {
fmt.Printf("发送成功: %s\n", res.String())
}
if err = p.Shutdown(); err != nil {
panic("关闭 producer 失败")
}
}