k8s watch rest api

k8s rest api对rc、svc、ingress、pod、deployment等都提供的watch接口,可以实时的监听应用部署状态。

在此之前简单先说一下http长连接

分块传输编码(Chunked transfer encoding)

超文本传输协议(HTTP)中的一种数据传输机制,允许HTTP由应用服务器发送给客户端应用( 通常是网页浏览器)的数据可以分成多个部分。分块传输编码只在HTTP协议1.1版本(HTTP/1.1)中提供。
通常,HTTP应答消息中发送的数据是整个发送的,Content-Length消息头字段表示数据的长度。数据的长度很重要,因为客户端需要知道哪里是应答消息的结束,以及后续应答消息的开始。然而,使用分块传输编码,数据分解成一系列数据块,并以一个或多个块发送,这样服务器可以发送数据而不需要预先知道发送内容的总大小。通常数据块的大小是一致的,但也不总是这种情况。

Transfer-Encoding

消息首部指明了将 entity 安全传递给用户所采用的编码形式。

Transfer-Encoding 是一个逐跳传输消息首部,即仅应用于两个节点之间的消息传递,而不是所请求的资源本身。一个多节点连接中的每一段都可以应用不同的Transfer-Encoding 值。如果你想要将压缩后的数据应用于整个连接,那么请使用端到端传输消息首部 Content-Encoding 。

当这个消息首部出现在 HEAD 请求的响应中,而这样的响应没有消息体,那么它其实指的是应用在相应的 GET 请求的应答的值。

Header type Response header
Forbidden header name   yes

语法

Transfer-Encoding: chunked
Transfer-Encoding: compress
Transfer-Encoding: deflate
Transfer-Encoding: gzip
Transfer-Encoding: identity

// Several values can be listed, separated by a comma
Transfer-Encoding: gzip, chunked

指令

chunked

数据以一系列分块的形式进行发送。 Content-Length 首部在这种情况下不被发送。。在每一个分块的开头需要添加当前分块的长度,以十六进制的形式表示,后面紧跟着 '\r\n' ,之后是分块本身,后面也是'\r\n' 。终止块是一个常规的分块,不同之处在于其长度为0。终止块后面是一个挂载(trailer),由一系列(或者为空)的实体消息首部构成。

compress

采用 Lempel-Ziv-Welch (LZW) 压缩算法。这个名称来自UNIX系统的 compress 程序,该程序实现了前述算法。
与其同名程序已经在大部分UNIX发行版中消失一样,这种内容编码方式已经被大部分浏览器弃用,部分因为专利问题(这项专利在2003年到期)。

deflate

采用 zlib 结构 (在 RFC 1950 中规定),和 deflate 压缩算法(在 RFC 1951 中规定)。

gzip

表示采用 Lempel-Ziv coding (LZ77) 压缩算法,以及32位CRC校验的编码方式。这个编码方式最初由 UNIX 平台上的 gzip 程序采用。处于兼容性的考虑, HTTP/1.1 标准提议支持这种编码方式的服务器应该识别作为别名的 x-gzip 指令。
identity
用于指代自身(例如:未经过压缩和修改)。除非特别指明,这个标记始终可以被接受。
示例

分块编码

分块编码主要应用于如下场景,即要传输大量的数据,但是在请求在没有被处理完之前响应的长度是无法获得的。例如,当需要用从数据库中查询获得的数据生成一个大的HTML表格的时候,或者需要传输大量的图片的时候。一个分块响应形式如下:

HTTP/1.1 200 OK 
Content-Type: text/plain 
Transfer-Encoding: chunked

7\r\n
Mozilla\r\n 
9\r\n
Developer\r\n
7\r\n
Network\r\n
0\r\n 
\r\n

HTTP 1.1引入分块传输编码提供了以下几点好处:

  • HTTP分块传输编码允许服务器为动态生成的内容维持HTTP持久连接。通常,持久链接需要服务器在开始发送消息体前发送Content-Length消息头字段,但是对于动态生成的内容来说,在内容创建完之前是不可知的。[动态内容,content-length无法预知]
  • 分块传输编码允许服务器在最后发送消息头字段。对于那些头字段值在内容被生成之前无法知道的情形非常重要,例如消息的内容要使用散列进行签名,散列的结果通过HTTP消息头字段进行传输。没有分块传输编码时,服务器必须缓冲内容直到完成后计算头字段的值并在发送内容前发送这些头字段的值。[散列签名,需缓冲完成才能计算]
  • HTTP服务器有时使用压缩 (gzip或deflate)以缩短传输花费的时间。分块传输编码可以用来分隔压缩对象的多个部分。在这种情况下,块不是分别压缩的,而是整个负载进行压缩,压缩的输出使用本文描述的方案进行分块传输。在压缩的情形中,分块编码有利于一边进行压缩一边发送数据,而不是先完成压缩过程以得知压缩后数据的大小。[gzip压缩,压缩与传输同时进行]

一般情况HTTP的Header包含Content-Length域来指明报文体的长度。有时候服务生成HTTP回应是无法确定消息大小的,比如大文件的下载,或者后台需要复杂的逻辑才能全部处理页面的请求,这时用需要实时生成消息长度,服务器一般使用chunked编码

原理

k8s提供的watch功能是建立在对etcd的watch之上的,当etcd的key-value出现变化时,会通知kube-apiserver,这里的Key-vlaue其实就是k8s资源的持久化。

早期的k8s架构中,kube-apiserver、kube-controller-manager、kube-scheduler、kubelet、kube-proxy,都是直接去watch etcd的,这样就造成etcd的连接数太大(节点成千上万时),对etcd压力太大,浪费资源,因此到了后面,只有kube-apiserver去watch etcd,而kube-apiserver对外提供watch api,也就是kube-controller-manager、kube-scheduler、kubelet、kube-proxy去watch kube-apiserver,这样大大减小了etcd的压力

Watch API

通过k8s 官网 rest api的描述,可以看到,Watch API实际上一个标准的HTTP GET请求,我们以Pod的Watch API为例

HTTP Request

GET /api/v1/watch/namespaces/{namespace}/pods
Path Parameters

Parameter   Description
namespace   object name and auth scope, such as for teams and projects
Query Parameters

Parameter   Description
fieldSelector   A selector to restrict the list of returned objects by their fields. Defaults to everything.
labelSelector   A selector to restrict the list of returned objects by their labels. Defaults to everything.
pretty  If ‘true’, then the output is pretty printed.
resourceVersion When specified with a watch call, shows changes that occur after that particular version of a resource. Defaults to changes from the beginning of history. When specified for list: - if unset, then the result is returned from remote storage based on quorum-read flag; - if it’s 0, then we simply return what we currently have in cache, no guarantee; - if set to non zero, then the result is at least as fresh as given rv.
timeoutSeconds  Timeout for the list/watch call.
watch   Watch for changes to the described resources and return them as a stream of add, update, and remove notifications. Specify resourceVersion.
Response

Code    Description
200 WatchEvent  OK

从上面可以看出Watch其实就是一个GET请求,和一般请求不同的是,它有一个watch的query parameter,也就是kube-apiserver接到这个请求,当发现query parameter里面包含watch,就知道这是一个Watch API,watch参数默认为true。

==返回值是200和WatchEvent。apiserver首先会返回一个200的状态码,建立长连接,然后不断的返回watch event==

服务器端机制

通过watch api涉及到的http源码分析,可以看到,watch支持http长连接和websocket两种方式

func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
    w = httplog.Unlogged(w)

    if wsstream.IsWebSocketRequest(req) {
        w.Header().Set("Content-Type", s.MediaType)
        websocket.Handler(s.HandleWS).ServeHTTP(w, req)
        return
    }
    ...
    framer := s.Framer.NewFrameWriter(w)
    ...
    e := streaming.NewEncoder(framer, s.Encoder)
    ...
    // begin the stream
    w.Header().Set("Content-Type", s.MediaType)
    w.Header().Set("Transfer-Encoding", "chunked")
    w.WriteHeader(http.StatusOK)
    flusher.Flush()

    var unknown runtime.Unknown
    internalEvent := &metav1.InternalEvent{}
    buf := &bytes.Buffer{}
    ch := s.Watching.ResultChan()
    for {
        select {
        case <-cn.CloseNotify():
            return
        case <-timeoutCh:
            return
        case event, ok := <-ch:
            if !ok {
                // End of results.
                return
            }

            obj := event.Object
            s.Fixup(obj)
            if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil {
                // unexpected error
                utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v", err))
                return
            }

            // ContentType is not required here because we are defaulting to the serializer
            // type
            unknown.Raw = buf.Bytes()
            event.Object = &unknown

            // the internal event will be versioned by the encoder
            *internalEvent = metav1.InternalEvent(event)
            if err := e.Encode(internalEvent); err != nil {
                utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v (%#v)", err, e))
                // client disconnect.
                return
            }
            if len(ch) == 0 {
                flusher.Flush()
            }

            buf.Reset()
        }
    }
}

每次调用watch API,kube-apiserver都会建立一个WatchServer,WatchServer通过channel会从etcd里面获取资源的watch event,中间经过一系列的处理(事件的广播)。然后WatchServer通过ServeHTTP将事件发送给client,我们就详细看看ServerHTTP的处理逻辑

首先会查看发送来的请求是不是要求使用websockt,即wsstream.IsWebSocketRequest(req),假如是的话就通过websocket向client发送watch event,也就是说kube-apiserver是支持通过websocket向客户端发送watch event的。

假如不是的话,则首先设置http返回头,Content-Type设置为s.MediaType,一般为json,同时设置Transfer-Encoding为chunked,设置返回码为200(StatusOK),和我们从API分析那一节获取的信息一样,首先会返回一个200的状态吗。

这里比较有意思的是,将Transfer-Encoding设置为chunked,这是http1.1中支持的协议,它会建立一个长连接,同时可以不停的发送数据块,发送数据块的格式是,它首先会发送一个数据块的长度,加上回车符(/r/n),接着发送相应的数据块内容。假如数据块长度为0,则代表数据发送完成,连接断开。显然这里的watch event就是一个个数据块。

w.Header().Set("Content-Type", s.MediaType)
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)

看看接下里的for循环,首先从channel里面获取event

event, ok := <-ch

然后序列化数据

obj := event.Object
s.EmbeddedEncoder.Encode(obj, buf)
unknown.Raw = buf.Bytes()
event.Object = &unknown

最后将数据发送出去

*internalEvent = metav1.InternalEvent(event)
e.Encode(internalEvent)

具体实现

考虑的http长连接的资源消耗与性能问题,实现pod监听是采用的ws协议

同时在服务多实例时涉及到pod状态合并的问题

例如:通过deployment部署3个pod示例,在3个pod都起来时,deployment才算正常,在3个pod都删除是,deployment才算下线。并且一些java应用启动的时间可能较长,采用livenessprobe探针健康检查是需要探测多次才可探测到启动成功,这个会收到多条modify的消息,诸如此类的情况都要考虑,可根据监控需求具体实现

//定义自己msg消息结构体
type msg struct {
    
}

//存储pod状态
var cache *CacheStatus

var msgChan = make(chan msg, 1024)

//ws协议监听
func WatchPod() {
    
    //ReconnWs为自己封装websocket工具包
    reconnWs := new(ReconnWs)

    u, _ := url.Parse("ws://" + k8surl + "/api/v1/watch/pods?watch=true&pretty=true")

    reconnWs.Dial(u, http.Header{
        "Origin": []string{"http://" + k8surl + "/"},
    })

    done := make(chan struct{})

    //监听信息处理函数
    go func() {
        for {
            m := <-msgChan
            //消息处理函数
            go handler()
        }
    }()

    //pod监听
    go func() {

        if cache == nil {
            cache = NewCache()
        }

        defer func() {
            reconnWs.Close()
            close(done)
        }()
        for {

            var event PodStatus

            _, message, err := reconnWs.ReadMessage()
            if err != nil {
                log.Println("read:", err)
                continue
            }

            if err := jsoniter.Unmarshal(message, &event); err != nil {
                log.Println(err)
                continue
            }

            switch event.Type {

            case Added:
            

            case Deleted:

                

            case Modified:

                

            case Error:


            }

            m := msg{
                //填入获取的状态信息
            }

            msgChan <- m

        }
    }()

    interrupt := make(chan os.Signal, 1)
    signal.Notify(interrupt, os.Interrupt)
    for {

        defer func() {
            Close()
        }()

        select {
        case <-interrupt:
            select {
            case <-done:
            case <-time.After(time.Second):
            }
            return
        }
    }
}


//资源回收
func Close() {
    
}

cache采用的是map结构存储pod状态,getStatuInfo()函数是监控程序启动的时候初始化服务状态数据的函数,可以选择每次启动都从k8s查询一遍,根据自己定义的监听规则填入pod状态,也可以在每次接受到消息时都存入持久化存储(mysql、redis等),每次启动再从持久化存储中查询以及初始化已存在的cache数据

type CacheStatus struct {
    lock *sync.RWMutex
    bm   map[string]Status
}

type Status struct {
    ReadyReplicas    int
    Replicas         int
    Event            string
    Phase            string
    Errmsg           string
    CreatingReplicas int
}

func NewCache() *CacheStatus {

    return &CacheStatus{
        lock: new(sync.RWMutex),
        bm:   getStatuInfo(),
    }

}

func (m *CacheStatus) IsExist(k string) (isExist bool) {

    m.lock.Lock()
    defer m.lock.Unlock()

    if _, ok := m.bm[k]; ok {
        isExist = true
    }

    return
}

func (m *CacheStatus) Get(k string) (status Status) {
    m.lock.RLock()
    defer m.lock.RUnlock()
    if status, ok := m.bm[k]; ok {
        return status
    }
    return
}

func (m *CacheStatus) Set(k string, v Status) {
    m.lock.Lock()
    defer m.lock.Unlock()

    m.bm[k] = v

}

func (m *CacheStatus) Check(k string) bool {
    m.lock.RLock()
    defer m.lock.RUnlock()
    if _, ok := m.bm[k]; !ok {
        return false
    }
    return true
}
func (m *CacheStatus) Delete(k string) {
    m.lock.Lock()
    defer m.lock.Unlock()
    delete(m.bm, k)
}

//range map
func (m *CacheStatus) Each(cb func(string, Status)) {
    m.lock.RLock()
    defer m.lock.RUnlock()
    for k, v := range m.bm {
        cb(k, v)
    }
}

func (m *CacheStatus) String() string {
    str, _ := jsoniter.MarshalToString(m.bm)
    return str
}


func getStatuInfo() (cacheInfo map[string]Status) {

    ....
    
    return
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,335评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,895评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,766评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,918评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,042评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,169评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,219评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,976评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,393评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,711评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,876评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,562评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,193评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,903评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,699评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,764评论 2 351

推荐阅读更多精彩内容