K8S Python API 调用三方API的统一接入方式

使用partialmethod统一接入第三方的API调用,包括 kruise-rollout,Alibaba Cgroups

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date    : 2023-09-25 19:10:37
import datetime
import yaml
import time, arrow
from kubernetes import client, utils
from kubernetes.stream import stream
from functools import partialmethod


class MyK8sError(Exception):
    pass


class MyK8s:
    def __init__(
        self,
        host,
        token,
        verify_ssl=False,
    ):
        """
        参考 https://github.com/kubernetes-client/python/blob/release-24.0/examples/remote_cluster.py
        """
        aConfiguration = client.Configuration()
        aConfiguration.host = host
        aConfiguration.verify_ssl = verify_ssl
        aConfiguration.api_key = {"authorization": "Bearer " + token}
        aApiClient = client.ApiClient(aConfiguration)
        self.api_client = aApiClient
        self.corev1 = client.CoreV1Api(aApiClient)
        self.appv1 = client.AppsV1Api(aApiClient)
        self.netv1 = client.NetworkingV1Api(aApiClient)
        self.co_api = client.CustomObjectsApi(aApiClient)
        self.batchv1 = client.BatchV1Api(aApiClient)
        # 接入第三方对象在此处接入
        custom_objects = dict(
            cgroup=("resources.alibabacloud.com", "v1alpha1", "cgroups"),
            rollout=("rollouts.kruise.io", "v1alpha1", "rollouts"),
        )
        for object_name, object_attrs in custom_objects.items():
            for op in ["get", "create", "patch", "delete", "list"]:
                setattr(
                    MyK8s,
                    "{0}_{1}{2}".format(op, object_name,"s" if op == "list" else ""),
                    partialmethod(
                        getattr(MyK8s, "{}_custom_object".format(op)),
                        group=object_attrs[0],
                        api_version=object_attrs[1],
                        plural=object_attrs[2],
                    ),
                )

    def _client_api(self, kind):
        if kind in (
            "controller_revision",
            "daemon_set",
            "deployment",
            "replica_set",
            "stateful_set",
        ):
            return self.appv1
        elif kind in ("pod", "service"):
            return self.corev1
        elif kind == "ingress":
            return self.netv1
        elif kind == "rollout":
            return self.co_api
        elif kind == "job":
            return self.batchv1
        else:
            raise MyK8sError("不支持操作类型为{}的对象".format(kind))

    def get_custom_object(self, group, api_version, plural, namespace, name, **kw):
        func = getattr(self.co_api, "get_namespaced_custom_object")
        return func(group, api_version, namespace, plural, name, **kw)

    def delete_custom_object(self, group, api_version, plural, namespace, name, **kw):
        func = getattr(self.co_api, "delete_namespaced_custom_object")
        return func(group, api_version, namespace, plural, name, **kw)

    def list_custom_object(
        self, group, api_version, plural, namespace, name, null=False, **kw
    ):
        selector = "metadata.name={}".format(name)
        func = getattr(self.co_api, "list_namespaced_custom_object")
        items = func(
            group, api_version, namespace, plural, field_selector=selector, **kw
        )["items"]
        if not items:
            if null:
                return None
            else:
                raise MyK8sError("{0} namespace has not found {1} {2}".format(namespace, name, plural))
        if len(items) > 1:
            raise MyK8sError("{0} namespace has found multi {1} {2}".format(namespace, name, plural))
        return items[0]

    def patch_custom_object(
        self, group, api_version, plural, namespace, name, json_data, **kw
    ):
        func = getattr(self.co_api, "patch_namespaced_custom_object")
        return func(group, api_version, namespace, plural, name, json_data, **kw)

    def create_custom_object(
        self, group, api_version, plural, namespace, json_data, **kw
    ):
        func = getattr(self.co_api, "create_namespaced_custom_object")
        return func(group, api_version, namespace, plural, json_data, **kw)

myk8s = MyK8s(
    "https://xxxxxx",
    "tokenxxxxxxxxxxx"
)


if __name__ == "__main__":
    object_name = "deploy_name"
    namespace = "basic"
    object_type = "deployment"
    container_name = "app_name"
    cpu_limit = "1000m"
    memory_limit = "1024Mi"
    data = {
        "apiVersion": "resources.alibabacloud.com/v1alpha1",
        "kind": "Cgroups",
        "metadata": {
            "name": object_name,
            "namespace": namespace,
        },
        "spec": {
            object_type: {
                "containers": [{"name": container_name}],
                "name": object_name,
                "namespace": namespace,
            }
        },
    }
    data["spec"][object_type]["containers"][0]["cpu"] = cpu_limit
    data["spec"][object_type]["containers"][0]["memory"] = memory_limit

    res = myk8s.create_cgroup(namespace=namespace, json_data=data)
    print(res)
    # res = myk8s.delete_cgroup(namespace=namespace, name="scm-dubbodemo-2022.1000.233")
    # print(res)
    try:
        res = myk8s.get_cgroup(namespace=namespace, name=object_name)
    except Exception as e:
        if e.reason == "Not Found":
            pass
        raise
    res["spec"][object_type]["containers"][0]["cpu"] = "1200m"
    res["spec"][object_type]["containers"][0]["memory"] = "1500Mi"
    res = myk8s.patch_cgroup(namespace=namespace, name=object_name, json_data= res)
    print(res)


Ref: CustomObjectsApi

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容