KubeEdge分析-cloudcore-cloudhub

概述

cloudhub是负责云端和边端的通信的,目前支持Websocket和QUIC两种方式。

Register

初始化配置,并在beehive中注册。

Start

func (a *cloudHub) Start() {
    messageq := channelq.NewChannelMessageQueue()

    // start dispatch message from the cloud to edge node
    go messageq.DispatchMessage()

    // start the cloudhub server
    if hubconfig.Get().ProtocolWebsocket {
        // TODO delete second param  @kadisi
        go servers.StartCloudHub(api.ProtocolTypeWS, hubconfig.Get(), messageq)
    }

    if hubconfig.Get().ProtocolQuic {
        // TODO delete second param  @kadisi
        go servers.StartCloudHub(api.ProtocolTypeQuic, hubconfig.Get(), messageq)
    }

    if hubconfig.Get().ProtocolUDS {
        // The uds server is only used to communicate with csi driver from kubeedge on cloud.
        // It is not used to communicate between cloud and edge.
        go udsserver.StartServer(hubconfig.Get())
    }
}

这里就干了3件事情,DispatchMessage、StartCloudHub和udsserver.StartServer。

DispatchMessage

messageq(ChannelMessageQueue)中只有一个同步的map

// DispatchMessage gets the message from the cloud, extracts the
// node id from it, gets the channel associated with the node
// and pushes the event on the channel
func (q *ChannelMessageQueue) DispatchMessage() {
    for {
        select {
        case <-beehiveContext.Done():
            klog.Warning("Cloudhub channel eventqueue dispatch message loop stoped")
            return
        default:
        }
        msg, err := beehiveContext.Receive(model.SrcCloudHub)
        if err != nil {
            klog.Info("receive not Message format message")
            continue
        }
        resource := msg.Router.Resource
        tokens := strings.Split(resource, "/")
        numOfTokens := len(tokens)
        var nodeID string
        for i, token := range tokens {
            if token == model.ResNode && i+1 < numOfTokens {
                nodeID = tokens[i+1]
                break
            }
        }
        if nodeID == "" {
            klog.Warning("node id is not found in the message")
            continue
        }
        rChannel, err := q.getRChannel(nodeID)
        if err != nil {
            klog.Infof("fail to get dispatch channel for %s", nodeID)
            continue
        }
        rChannel <- msg
    }
}

从beehiveContext收取group名称为“cloudhub”的message,也就是之前分析中,edgecontroller和devicecontroller的downstreamcontroller发下来的消息。

取出消息后,根据消息中的Router信息,取出目标的nodeId,如果nodeId没取到,则只记录日志。 然后根据NodeId从ChannelMessageQueue的map中取出通道,然后把消息放到通道中。

整个流程是在一个for循环中,因此这个流程会不断重复执行,直到beehiveContext被关闭。

StartCloudHub

// StartCloudHub starts the cloud hub service
func StartCloudHub(protocolType string, config *hubconfig.Configure, messageq *channelq.ChannelMessageQueue) {
    // init certificate
    pool := x509.NewCertPool()
    ok := pool.AppendCertsFromPEM(config.Ca)
    if !ok {
        panic(fmt.Errorf("fail to load ca content"))
    }
    cert, err := tls.X509KeyPair(config.Cert, config.Key)
    if err != nil {
        panic(err)
    }
    tlsConfig := tls.Config{
        ClientCAs:    pool,
        ClientAuth:   tls.RequireAndVerifyClientCert,
        Certificates: []tls.Certificate{cert},
        MinVersion:   tls.VersionTLS12,
        CipherSuites: []uint16{tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256},
    }

    handler.InitHandler(config, messageq)

    svc := server.Server{
        Type:       protocolType,
        TLSConfig:  &tlsConfig,
        AutoRoute:  true,
        ConnNotify: handler.CloudhubHandler.OnRegister,
    }

    switch protocolType {
    case api.ProtocolTypeWS:
        svc.Addr = fmt.Sprintf("%s:%d", config.Address, config.Port)
        svc.ExOpts = api.WSServerOption{Path: "/"}
    case api.ProtocolTypeQuic:
        svc.Addr = fmt.Sprintf("%s:%d", config.Address, config.QuicPort)
        svc.ExOpts = api.QuicServerOption{MaxIncomingStreams: config.MaxIncomingStreams}
    default:
        panic(fmt.Errorf("invalid protocol, should be websocket or quic"))
    }

    klog.Infof("Start cloud hub %s server", protocolType)
    svc.ListenAndServeTLS("", "")
}

前几行是初始化证书(用于https)的相关配置。然后初始化了一个handler

InitHandler

func InitHandler(config *hubconfig.Configure, eventq *channelq.ChannelMessageQueue) {
    once.Do(func() {
        CloudhubHandler = &MessageHandle{
            KeepaliveInterval: config.KeepaliveInterval,
            WriteTimeout:      config.WriteTimeout,
            MessageQueue:      eventq,
            NodeLimit:         config.NodeLimit,
        }

        CloudhubHandler.KeepaliveChannel = make(map[string]chan struct{})
        CloudhubHandler.Handlers = []HandleFunc{CloudhubHandler.KeepaliveCheckLoop, CloudhubHandler.MessageWriteLoop}

        CloudhubHandler.initServerEntries()
    })
}

这个eventq就是之前DispatchMessage的messageq, 这里设置了2个HandleFunc,分别是KeepaliveCheckLoop和MessageWriteLoop。KeepaliveCheckLoop是维持心跳的,这里就不仔细看了。
MessageWriteLoop是处理云端发给边端消息的。


// MessageWriteLoop processes all write requests
func (mh *MessageHandle) MessageWriteLoop(hi hubio.CloudHubIO, info *model.HubInfo, stop chan ExitCode) {
    messages, err := mh.MessageQueue.Consume(info)
    if err != nil {
        klog.Errorf("failed to consume event for node %s, reason: %s", info.NodeID, err.Error())
        stop <- messageQueueDisconnect
        return
    }
    for {
        msg, err := messages.Get()
        if err != nil {
            klog.Errorf("failed to consume event for node %s, reason: %s", info.NodeID, err.Error())
            if err.Error() == MsgFormatError {
                // error format message should not impact other message
                messages.Ack()
                continue
            }
            stop <- messageQueueDisconnect
            return
        }

        if model.IsNodeStopped(msg) {
            klog.Infof("node %s is stopped, will disconnect", info.NodeID)
            messages.Ack()
            stop <- nodeStop
            return
        }
        if !model.IsToEdge(msg) {
            klog.Infof("skip only to cloud event for node %s, %s, content %s", info.NodeID, dumpMessageMetadata(msg), msg.Content)
            messages.Ack()
            continue
        }
        klog.Infof("event to send for node %s, %s, content %s", info.NodeID, dumpMessageMetadata(msg), msg.Content)

        trimMessage(msg)
        err = hi.SetWriteDeadline(time.Now().Add(time.Duration(mh.WriteTimeout) * time.Second))
        if err != nil {
            klog.Errorf("SetWriteDeadline error, %s", err.Error())
            stop <- hubioWriteFail
            return
        }
        err = mh.hubIoWrite(hi, info.NodeID, msg)
        if err != nil {
            klog.Errorf("write error, connection for node %s will be closed, affected event %s, reason %s",
                info.NodeID, dumpMessageMetadata(msg), err.Error())
            stop <- hubioWriteFail
            return
        }
        messages.Ack()
    }

从MessageQueue取出消息,然后判断需要发往的节点是否停止、是否是发往边缘节点的。然后调用hubIoWrite向边缘的node上写数据。

InitHandler的最后调用initServerEntries,这个是viaduct模块中的功能,viduct是用来云端和边缘端进行通信的底层模块,这里先不仔细分析了。

Server

回到StartCloudHub中,接下来就起了一个viaduct的server,并执行ListenAndServeTLS开始监听和边缘节点的交互请求。

server在启动的时候,注册了一个OnResigter方法

// OnRegister regist node on first connection
func (mh *MessageHandle) OnRegister(connection conn.Connection) {
    nodeID := connection.ConnectionState().Headers.Get("node_id")
    projectID := connection.ConnectionState().Headers.Get("project_id")

    if _, ok := mh.KeepaliveChannel[nodeID]; !ok {
        mh.KeepaliveChannel[nodeID] = make(chan struct{}, 1)
    }

    io := &hubio.JSONIO{Connection: connection}
    go mh.ServeConn(io, &model.HubInfo{ProjectID: projectID, NodeID: nodeID})
}

用于首次收到边缘侧发来的请求后,创建好相关的通道,并调用ServeConn处理消息,这里主要涉及到了viaduct这个中间件的处理流程,这里就不仔细看了。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容