为啥会有这个代理服务?
主要是出Demo演示用的, 因为时间赶, 想偷懒, 直接写个服务做数据转发,
实现什么功能?
1:客户端1从源MQTT Server 订阅消息, 然后将消息统一丢到Channel
2:客户端2从Channel 消费消息, 再将消息publish 到目标服务器
为什么不直接从源MQTT Server 订阅
现有物联网平台的MQTT 客户端网络组件只支持通过TCP 协议连接MQTT Server, 遇到一个搞特殊的设备商,必须要走Websocket 协议连接他们的MQTT Server,因为赶时间出Demo演示, 平台支持推到下个版本了,先临时解决Demo演示问题
需要注意哪些问题?
设备商的MQTT Server 服务器是个不稳定因素, 数据转发服务要避免这些不稳定因素, 需要实时监控连接状态, 做到断线后能自动重连,
代码不多就全贴了
package main
import (
"flag"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
log "github.com/sirupsen/logrus"
"time"
)
// CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o zhaoguanproxy main.go
type MqttMsg struct {
topic string `json:"topic"`
payload []byte `json:"payload"`
}
var (
proxySourceAddr string
sourceUserName string
sourcePassword string
proxyTargetAddr string
targetUserName string
targetPassword string
msgChan = make(chan MqttMsg, 16)
)
func init() {
flag.StringVar(&proxySourceAddr, "s", fmt.Sprintf("wss://%s:%d/mqtt", "127.0.0.1", 1889), "")
flag.StringVar(&sourceUserName, "su", "user", "")
flag.StringVar(&sourcePassword, "sp", "123456", "")
flag.StringVar(&proxyTargetAddr, "t", fmt.Sprintf("tcp://%s:%d", "127.0.0.1", 1883), "")
flag.StringVar(&targetUserName, "tu", "admin", "")
flag.StringVar(&targetPassword, "tp", "admin", "")
flag.Parse()
}
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
mqttMessage := MqttMsg{
topic: msg.Topic(),
payload: msg.Payload(),
}
msgChan <- mqttMessage
}
var messageTargetPubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
log.Infof("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}
var connectSourceHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
log.Info("Source Connected")
reSub(client)
}
var connectTargetHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
log.Info("Target Connected")
}
var connectSourceLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
log.Infof("Source Connect lost: %v at %s", err, time.Now().Format("2006-01-02 15:04:05 MST Mon"))
}
var connectTargetLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
log.Infof("Target Connect lost: %v at %s", err, time.Now().Format("2006-01-02 15:04:05 MST Mon"))
}
func main() {
err, client := subscrible()
if err != nil {
for {
err, c := subscrible()
if err == nil {
client = c
break
}
time.Sleep(10 * time.Second)
}
}
go monitor(true, client, 10 *time.Second, "监控到和源broker断开, 并重连")
go proxyMsg()
select {}
}
//监控断线处理
func monitor( isSource bool, client mqtt.Client, d time.Duration, msg string) {
tick := time.NewTicker(d)
for {
select {
case <-tick.C:
if ! client.IsConnected() || !client.IsConnectionOpen(){
log.Info(msg)
client.Connect()
if isSource {
reSub(client)
}
}
}
}
}
func subscrible() (error, mqtt.Client) {
log.Info(fmt.Sprintf("proxy form %s to %s at %s", proxySourceAddr, proxyTargetAddr, time.Now().Format("2006-01-02 15:04:05 MST Mon")))
opts := mqtt.NewClientOptions()
opts.AddBroker(proxySourceAddr)
opts.SetClientID("clientId")
opts.SetUsername(sourceUserName)
opts.SetPassword(sourcePassword)
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectSourceHandler
opts.OnConnectionLost = connectSourceLostHandler
opts.SetCleanSession(true)
opts.SetKeepAlive(30 * time.Second)
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
return token.Error(), nil
}
return nil, client
}
func reSub(client mqtt.Client ) {
log.Infof("订阅:%v", client.IsConnected())
topics := []string{
"test",
"web/180/point",
"web/180/fall",
"web/test/upline",
"web/test/downline",
}
for _, v := range topics {
go client.Subscribe(v, 1, nil)
}
}
func proxyMsg() {
opts := mqtt.NewClientOptions()
opts.AddBroker(proxyTargetAddr)
opts.SetClientID("J01MT05B2112000126")
opts.SetUsername(targetUserName)
opts.SetPassword(targetPassword)
opts.SetDefaultPublishHandler(messageTargetPubHandler)
opts.OnConnect = connectTargetHandler
opts.OnConnectionLost = connectTargetLostHandler
opts.SetCleanSession(true)
opts.SetKeepAlive(30 * time.Second)
opts.AutoReconnect = true
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Error("目的borker 连接失败")
}
go monitor(false, client, 10 * time.Second,"监控到和目的broker断开, 并开始重连")
for {
select {
case data := <-msgChan:
publish(client, data)
default:
}
}
}
/**
发送消息
*/
func publish(client mqtt.Client, data MqttMsg) {
client.Publish(fmt.Sprintf("%s%s", "/proxy/", data.topic), 0, false, data.payload)
}