概述
cloudhub是负责云端和边端的通信的,目前支持Websocket和QUIC两种方式。
Register
初始化配置,并在beehive中注册。
Start
func (a *cloudHub) Start() {
messageq := channelq.NewChannelMessageQueue()
// start dispatch message from the cloud to edge node
go messageq.DispatchMessage()
// start the cloudhub server
if hubconfig.Get().ProtocolWebsocket {
// TODO delete second param @kadisi
go servers.StartCloudHub(api.ProtocolTypeWS, hubconfig.Get(), messageq)
}
if hubconfig.Get().ProtocolQuic {
// TODO delete second param @kadisi
go servers.StartCloudHub(api.ProtocolTypeQuic, hubconfig.Get(), messageq)
}
if hubconfig.Get().ProtocolUDS {
// The uds server is only used to communicate with csi driver from kubeedge on cloud.
// It is not used to communicate between cloud and edge.
go udsserver.StartServer(hubconfig.Get())
}
}
这里就干了3件事情,DispatchMessage、StartCloudHub和udsserver.StartServer。
DispatchMessage
messageq(ChannelMessageQueue)中只有一个同步的map
// DispatchMessage gets the message from the cloud, extracts the
// node id from it, gets the channel associated with the node
// and pushes the event on the channel
func (q *ChannelMessageQueue) DispatchMessage() {
for {
select {
case <-beehiveContext.Done():
klog.Warning("Cloudhub channel eventqueue dispatch message loop stoped")
return
default:
}
msg, err := beehiveContext.Receive(model.SrcCloudHub)
if err != nil {
klog.Info("receive not Message format message")
continue
}
resource := msg.Router.Resource
tokens := strings.Split(resource, "/")
numOfTokens := len(tokens)
var nodeID string
for i, token := range tokens {
if token == model.ResNode && i+1 < numOfTokens {
nodeID = tokens[i+1]
break
}
}
if nodeID == "" {
klog.Warning("node id is not found in the message")
continue
}
rChannel, err := q.getRChannel(nodeID)
if err != nil {
klog.Infof("fail to get dispatch channel for %s", nodeID)
continue
}
rChannel <- msg
}
}
从beehiveContext收取group名称为“cloudhub”的message,也就是之前分析中,edgecontroller和devicecontroller的downstreamcontroller发下来的消息。
取出消息后,根据消息中的Router信息,取出目标的nodeId,如果nodeId没取到,则只记录日志。 然后根据NodeId从ChannelMessageQueue的map中取出通道,然后把消息放到通道中。
整个流程是在一个for循环中,因此这个流程会不断重复执行,直到beehiveContext被关闭。
StartCloudHub
// StartCloudHub starts the cloud hub service
func StartCloudHub(protocolType string, config *hubconfig.Configure, messageq *channelq.ChannelMessageQueue) {
// init certificate
pool := x509.NewCertPool()
ok := pool.AppendCertsFromPEM(config.Ca)
if !ok {
panic(fmt.Errorf("fail to load ca content"))
}
cert, err := tls.X509KeyPair(config.Cert, config.Key)
if err != nil {
panic(err)
}
tlsConfig := tls.Config{
ClientCAs: pool,
ClientAuth: tls.RequireAndVerifyClientCert,
Certificates: []tls.Certificate{cert},
MinVersion: tls.VersionTLS12,
CipherSuites: []uint16{tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256},
}
handler.InitHandler(config, messageq)
svc := server.Server{
Type: protocolType,
TLSConfig: &tlsConfig,
AutoRoute: true,
ConnNotify: handler.CloudhubHandler.OnRegister,
}
switch protocolType {
case api.ProtocolTypeWS:
svc.Addr = fmt.Sprintf("%s:%d", config.Address, config.Port)
svc.ExOpts = api.WSServerOption{Path: "/"}
case api.ProtocolTypeQuic:
svc.Addr = fmt.Sprintf("%s:%d", config.Address, config.QuicPort)
svc.ExOpts = api.QuicServerOption{MaxIncomingStreams: config.MaxIncomingStreams}
default:
panic(fmt.Errorf("invalid protocol, should be websocket or quic"))
}
klog.Infof("Start cloud hub %s server", protocolType)
svc.ListenAndServeTLS("", "")
}
前几行是初始化证书(用于https)的相关配置。然后初始化了一个handler
InitHandler
func InitHandler(config *hubconfig.Configure, eventq *channelq.ChannelMessageQueue) {
once.Do(func() {
CloudhubHandler = &MessageHandle{
KeepaliveInterval: config.KeepaliveInterval,
WriteTimeout: config.WriteTimeout,
MessageQueue: eventq,
NodeLimit: config.NodeLimit,
}
CloudhubHandler.KeepaliveChannel = make(map[string]chan struct{})
CloudhubHandler.Handlers = []HandleFunc{CloudhubHandler.KeepaliveCheckLoop, CloudhubHandler.MessageWriteLoop}
CloudhubHandler.initServerEntries()
})
}
这个eventq就是之前DispatchMessage的messageq, 这里设置了2个HandleFunc,分别是KeepaliveCheckLoop和MessageWriteLoop。KeepaliveCheckLoop是维持心跳的,这里就不仔细看了。
MessageWriteLoop是处理云端发给边端消息的。
// MessageWriteLoop processes all write requests
func (mh *MessageHandle) MessageWriteLoop(hi hubio.CloudHubIO, info *model.HubInfo, stop chan ExitCode) {
messages, err := mh.MessageQueue.Consume(info)
if err != nil {
klog.Errorf("failed to consume event for node %s, reason: %s", info.NodeID, err.Error())
stop <- messageQueueDisconnect
return
}
for {
msg, err := messages.Get()
if err != nil {
klog.Errorf("failed to consume event for node %s, reason: %s", info.NodeID, err.Error())
if err.Error() == MsgFormatError {
// error format message should not impact other message
messages.Ack()
continue
}
stop <- messageQueueDisconnect
return
}
if model.IsNodeStopped(msg) {
klog.Infof("node %s is stopped, will disconnect", info.NodeID)
messages.Ack()
stop <- nodeStop
return
}
if !model.IsToEdge(msg) {
klog.Infof("skip only to cloud event for node %s, %s, content %s", info.NodeID, dumpMessageMetadata(msg), msg.Content)
messages.Ack()
continue
}
klog.Infof("event to send for node %s, %s, content %s", info.NodeID, dumpMessageMetadata(msg), msg.Content)
trimMessage(msg)
err = hi.SetWriteDeadline(time.Now().Add(time.Duration(mh.WriteTimeout) * time.Second))
if err != nil {
klog.Errorf("SetWriteDeadline error, %s", err.Error())
stop <- hubioWriteFail
return
}
err = mh.hubIoWrite(hi, info.NodeID, msg)
if err != nil {
klog.Errorf("write error, connection for node %s will be closed, affected event %s, reason %s",
info.NodeID, dumpMessageMetadata(msg), err.Error())
stop <- hubioWriteFail
return
}
messages.Ack()
}
从MessageQueue取出消息,然后判断需要发往的节点是否停止、是否是发往边缘节点的。然后调用hubIoWrite向边缘的node上写数据。
InitHandler的最后调用initServerEntries,这个是viaduct模块中的功能,viduct是用来云端和边缘端进行通信的底层模块,这里先不仔细分析了。
Server
回到StartCloudHub中,接下来就起了一个viaduct的server,并执行ListenAndServeTLS开始监听和边缘节点的交互请求。
server在启动的时候,注册了一个OnResigter方法
// OnRegister regist node on first connection
func (mh *MessageHandle) OnRegister(connection conn.Connection) {
nodeID := connection.ConnectionState().Headers.Get("node_id")
projectID := connection.ConnectionState().Headers.Get("project_id")
if _, ok := mh.KeepaliveChannel[nodeID]; !ok {
mh.KeepaliveChannel[nodeID] = make(chan struct{}, 1)
}
io := &hubio.JSONIO{Connection: connection}
go mh.ServeConn(io, &model.HubInfo{ProjectID: projectID, NodeID: nodeID})
}
用于首次收到边缘侧发来的请求后,创建好相关的通道,并调用ServeConn处理消息,这里主要涉及到了viaduct这个中间件的处理流程,这里就不仔细看了。