Python操作Kubernetes集群完全指南

Python操作Kubernetes集群完全指南

目录

  1. 基础环境准备
  2. Python Kubernetes客户端介绍
  3. 连接Kubernetes集群
  4. Pod操作实战
  5. Deployment管理
  6. Service资源操作
  7. ConfigMap和Secret管理
  8. 自定义资源定义(CRD)操作
  9. 事件监听和Watch操作
  10. 高级应用场景

基础环境准备

1. 安装必要的包

首先,我们需要安装Python的Kubernetes客户端库:

pip install kubernetes pip install openshift # 可选,用于OpenShift集群

2. 配置文件准备

import os from kubernetes import client, config # 加载kubeconfig配置 config.load_kube_config()

Python Kubernetes客户端介绍

1. 主要模块说明

from kubernetes import client, config, watch from kubernetes.client import ApiClient from kubernetes.client.rest import ApiException

主要模块功能:

  • client: 提供各种API操作接口
  • config: 处理配置文件加载
  • watch: 用于监控资源变化
  • ApiClient: 底层API客户端
  • ApiException: 异常处理

连接Kubernetes集群

示例1:基础连接配置

from kubernetes import client, config def connect_kubernetes(): try: # 加载本地kubeconfig config.load_kube_config() # 创建API客户端 v1 = client.CoreV1Api() # 测试连接 ret = v1.list_pod_for_all_namespaces(limit=1) print("连接成功!发现 {} 个Pod".format(len(ret.items))) return v1 except Exception as e: print(f"连接失败:{str(e)}") return None # 测试连接 api = connect_kubernetes()

示例2:多集群配置

def connect_multiple_clusters(): clusters = { 'prod': '/path/to/prod-kubeconfig', 'dev': '/path/to/dev-kubeconfig' } apis = {} for cluster_name, config_file in clusters.items(): try: config.load_kube_config(config_file=config_file) apis[cluster_name] = client.CoreV1Api() print(f"成功连接到{cluster_name}集群") except Exception as e: print(f"连接{cluster_name}集群失败:{str(e)}") return apis

Pod操作实战

示例3:创建Pod

from kubernetes import client, config def create_pod(name, image, namespace="default"): # 创建Pod对象 pod = client.V1Pod( metadata=client.V1ObjectMeta(name=name), spec=client.V1PodSpec( containers=[ client.V1Container( name=name, image=image, ports=[client.V1ContainerPort(container_port=80)] ) ] ) ) # 获取API实例 v1 = client.CoreV1Api() try: # 创建Pod api_response = v1.create_namespaced_pod( namespace=namespace, body=pod ) print(f"Pod {name} 创建成功") return api_response except ApiException as e: print(f"Pod创建失败:{str(e)}") return None # 使用示例 create_pod("nginx-pod", "nginx:latest")

示例4:查询Pod状态

def get_pod_status(name, namespace="default"): v1 = client.CoreV1Api() try: pod = v1.read_namespaced_pod(name=name, namespace=namespace) return { "name": pod.metadata.name, "status": pod.status.phase, "pod_ip": pod.status.pod_ip, "host_ip": pod.status.host_ip, "start_time": pod.status.start_time, "conditions": [ { "type": condition.type, "status": condition.status } for condition in pod.status.conditions or [] ] } except ApiException as e: print(f"获取Pod状态失败:{str(e)}") return None # 使用示例 status = get_pod_status("nginx-pod") print(status)

Deployment管理

示例5:创建Deployment

def create_deployment(name, image, replicas=3, namespace="default"): # 创建Deployment对象 deployment = client.V1Deployment( metadata=client.V1ObjectMeta(name=name), spec=client.V1DeploymentSpec( replicas=replicas, selector=client.V1LabelSelector( match_labels={"app": name} ), template=client.V1PodTemplateSpec( metadata=client.V1ObjectMeta( labels={"app": name} ), spec=client.V1PodSpec( containers=[ client.V1Container( name=name, image=image, ports=[client.V1ContainerPort(container_port=80)] ) ] ) ) ) ) # 获取API实例 apps_v1 = client.AppsV1Api() try: # 创建Deployment api_response = apps_v1.create_namespaced_deployment( namespace=namespace, body=deployment ) print(f"Deployment {name} 创建成功") return api_response except ApiException as e: print(f"Deployment创建失败:{str(e)}") return None # 使用示例 create_deployment("nginx-deployment", "nginx:latest")

示例6:更新Deployment

def update_deployment(name, new_image, namespace="default"): apps_v1 = client.AppsV1Api() try: # 获取现有deployment deployment = apps_v1.read_namespaced_deployment(name, namespace) # 更新镜像 deployment.spec.template.spec.containers[0].image = new_image # 应用更新 api_response = apps_v1.patch_namespaced_deployment( name=name, namespace=namespace, body=deployment ) print(f"Deployment {name} 更新成功") return api_response except ApiException as e: print(f"Deployment更新失败:{str(e)}") return None # 使用示例 update_deployment("nginx-deployment", "nginx:1.19")

Service资源操作

示例7:创建Service

def create_service(name, selector, port, target_port, namespace="default"): # 创建Service对象 service = client.V1Service( metadata=client.V1ObjectMeta(name=name), spec=client.V1ServiceSpec( selector=selector, ports=[client.V1ServicePort( port=port, target_port=target_port )] ) ) v1 = client.CoreV1Api() try: # 创建Service api_response = v1.create_namespaced_service( namespace=namespace, body=service ) print(f"Service {name} 创建成功") return api_response except ApiException as e: print(f"Service创建失败:{str(e)}") return None # 使用示例 create_service( "nginx-service", {"app": "nginx-deployment"}, 80, 80 )

ConfigMap和Secret管理

示例8:创建ConfigMap

def create_configmap(name, data, namespace="default"): # 创建ConfigMap对象 configmap = client.V1ConfigMap( metadata=client.V1ObjectMeta(name=name), data=data ) v1 = client.CoreV1Api() try: # 创建ConfigMap api_response = v1.create_namespaced_config_map( namespace=namespace, body=configmap ) print(f"ConfigMap {name} 创建成功") return api_response except ApiException as e: print(f"ConfigMap创建失败:{str(e)}") return None # 使用示例 config_data = { "app.properties": """ app.name=myapp app.env=production """ } create_configmap("app-config", config_data)

示例9:创建Secret

import base64 def create_secret(name, data, namespace="default"): # 编码数据 encoded_data = { k: base64.b64encode(v.encode()).decode() for k, v in data.items() } # 创建Secret对象 secret = client.V1Secret( metadata=client.V1ObjectMeta(name=name), type="Opaque", data=encoded_data ) v1 = client.CoreV1Api() try: # 创建Secret api_response = v1.create_namespaced_secret( namespace=namespace, body=secret ) print(f"Secret {name} 创建成功") return api_response except ApiException as e: print(f"Secret创建失败:{str(e)}") return None # 使用示例 secret_data = { "username": "admin", "password": "secret123" } create_secret("app-secrets", secret_data)

自定义资源定义(CRD)操作

示例10:操作CRD资源

def create_custom_resource(group, version, plural, namespace, body): # 获取CustomObjectsApi custom_api = client.CustomObjectsApi() try: # 创建自定义资源 api_response = custom_api.create_namespaced_custom_object( group=group, version=version, namespace=namespace, plural=plural, body=body ) print(f"自定义资源创建成功") return api_response except ApiException as e: print(f"自定义资源创建失败:{str(e)}") return None # 使用示例 custom_resource = { "apiVersion": "stable.example.com/v1", "kind": "CronTab", "metadata": { "name": "my-crontab" }, "spec": { "cronSpec": "* * * * */5", "image": "my-cron-image" } } create_custom_resource( group="stable.example.com", version="v1", plural="crontabs", namespace="default", body=custom_resource )

事件监听和Watch操作

示例11:监听Pod事件

from kubernetes import watch def watch_pods(namespace="default"): v1 = client.CoreV1Api() w = watch.Watch() try: for event in w.stream(v1.list_namespaced_pod, namespace=namespace): pod = event['object'] event_type = event['type'] print(f"事件类型: {event_type}") print(f"Pod名称: {pod.metadata.name}") print(f"Pod状态: {pod.status.phase}") print("-------------------") except ApiException as e: print(f"监听失败:{str(e)}") except KeyboardInterrupt: w.stop() print("监听已停止") # 使用示例 # watch_pods() # 此函数会持续运行直到被中断

高级应用场景

示例12:批量操作和错误处理

def batch_create_resources(resources): results = { 'success': [], 'failed': [] } for resource in resources: try: if resource['kind'] == 'Deployment': apps_v1 = client.AppsV1Api() response = apps_v1.create_namespaced_deployment( namespace=resource['namespace'], body=resource['spec'] ) results['success'].append({ 'kind': 'Deployment', 'name': resource['spec'].metadata.name }) elif resource['kind'] == 'Service': v1 = client.CoreV1Api() response = v1.create_namespaced_service( namespace=resource['namespace'], body=resource['spec'] ) results['success'].append({ 'kind': 'Service', 'name': resource['spec'].metadata.name }) except ApiException as e: results['failed'].append({ 'kind': resource['kind'], 'name': resource['spec'].metadata.name, 'error': str(e) }) return results # 使用示例 resources = [ { 'kind': 'Deployment', 'namespace': 'default', 'spec': client.V1Deployment( metadata=client.V1ObjectMeta(name="nginx-deployment"), spec=client.V1DeploymentSpec( replicas=3, selector=client.V1LabelSelector( match_labels={"app": "nginx"} ), template=client.V1PodTemplateSpec( metadata=client.V1ObjectMeta( labels={"app": "nginx"} ), spec=client.V1PodSpec( containers=[ client.V1Container( name="nginx", image="nginx:latest" ) ] ) ) ) ) } ] ### 示例13:资源清理和垃圾回收 ```python def cleanup_resources(namespace="default", label_selector=None): """ 清理指定命名空间下的资源 """ v1 = client.CoreV1Api() apps_v1 = client.AppsV1Api() cleanup_results = { 'pods': [], 'deployments': [], 'services': [], 'errors': [] } try: # 删除Pod pods = v1.list_namespaced_pod( namespace=namespace, label_selector=label_selector ) for pod in pods.items: try: v1.delete_namespaced_pod( name=pod.metadata.name, namespace=namespace ) cleanup_results['pods'].append(pod.metadata.name) except ApiException as e: cleanup_results['errors'].append(f"Pod {pod.metadata.name}: {str(e)}") # 删除Deployment deployments = apps_v1.list_namespaced_deployment( namespace=namespace, label_selector=label_selector ) for deployment in deployments.items: try: apps_v1.delete_namespaced_deployment( name=deployment.metadata.name, namespace=namespace ) cleanup_results['deployments'].append(deployment.metadata.name) except ApiException as e: cleanup_results['errors'].append(f"Deployment {deployment.metadata.name}: {str(e)}") # 删除Service services = v1.list_namespaced_service( namespace=namespace, label_selector=label_selector ) for service in services.items: try: v1.delete_namespaced_service( name=service.metadata.name, namespace=namespace ) cleanup_results['services'].append(service.metadata.name) except ApiException as e: cleanup_results['errors'].append(f"Service {service.metadata.name}: {str(e)}") return cleanup_results except ApiException as e: print(f"清理资源时发生错误:{str(e)}") return None # 使用示例 cleanup_result = cleanup_resources(namespace="default", label_selector="app=nginx") print("清理结果:", cleanup_result)

示例14:资源健康检查和自动修复

import time from typing import Dict, List class ResourceHealthChecker: def __init__(self, namespace: str = "default"): self.namespace = namespace self.v1 = client.CoreV1Api() self.apps_v1 = client.AppsV1Api() def check_pod_health(self) -> Dict[str, List[str]]: """ 检查Pod的健康状态 """ unhealthy_pods = [] pending_pods = [] try: pods = self.v1.list_namespaced_pod(namespace=self.namespace) for pod in pods.items: if pod.status.phase == 'Failed': unhealthy_pods.append(pod.metadata.name) elif pod.status.phase == 'Pending': pending_pods.append(pod.metadata.name) return { 'unhealthy': unhealthy_pods, 'pending': pending_pods } except ApiException as e: print(f"检查Pod健康状态时发生错误:{str(e)}") return None def check_deployment_health(self) -> Dict[str, List[str]]: """ 检查Deployment的健康状态 """ unhealthy_deployments = [] try: deployments = self.apps_v1.list_namespaced_deployment(namespace=self.namespace) for deployment in deployments.items: if deployment.status.ready_replicas != deployment.status.replicas: unhealthy_deployments.append(deployment.metadata.name) return { 'unhealthy': unhealthy_deployments } except ApiException as e: print(f"检查Deployment健康状态时发生错误:{str(e)}") return None def auto_repair(self): """ 自动修复不健康的资源 """ repair_actions = [] # 检查并修复Pod pod_health = self.check_pod_health() if pod_health: for unhealthy_pod in pod_health['unhealthy']: try: self.v1.delete_namespaced_pod( name=unhealthy_pod, namespace=self.namespace ) repair_actions.append(f"删除不健康的Pod: {unhealthy_pod}") except ApiException as e: repair_actions.append(f"修复Pod {unhealthy_pod} 失败: {str(e)}") # 检查并修复Deployment deployment_health = self.check_deployment_health() if deployment_health: for unhealthy_deployment in deployment_health['unhealthy']: try: # 重启Deployment patch = { "spec": { "template": { "metadata": { "annotations": { "kubectl.kubernetes.io/restartedAt": datetime.now().isoformat() } } } } } self.apps_v1.patch_namespaced_deployment( name=unhealthy_deployment, namespace=self.namespace, body=patch ) repair_actions.append(f"重启Deployment: {unhealthy_deployment}") except ApiException as e: repair_actions.append(f"修复Deployment {unhealthy_deployment} 失败: {str(e)}") return repair_actions # 使用示例 health_checker = ResourceHealthChecker("default") repair_results = health_checker.auto_repair() print("修复操作:", repair_results)

示例15:自定义控制器实现

from kubernetes import watch import threading import queue class CustomController: def __init__(self, namespace="default"): self.namespace = namespace self.v1 = client.CoreV1Api() self.apps_v1 = client.AppsV1Api() self.event_queue = queue.Queue() self.running = False def start(self): """ 启动控制器 """ self.running = True # 启动事件处理线程 threading.Thread(target=self._process_events).start() # 启动资源监控 threading.Thread(target=self._watch_pods).start() threading.Thread(target=self._watch_deployments).start() def stop(self): """ 停止控制器 """ self.running = False def _watch_pods(self): """ 监控Pod变化 """ w = watch.Watch() while self.running: try: for event in w.stream( self.v1.list_namespaced_pod, namespace=self.namespace ): if not self.running: break self.event_queue.put(('Pod', event)) except Exception as e: print(f"Pod监控异常:{str(e)}") if self.running: time.sleep(5) # 发生错误时等待后重试 def _watch_deployments(self): """ 监控Deployment变化 """ w = watch.Watch() while self.running: try: for event in w.stream( self.apps_v1.list_namespaced_deployment, namespace=self.namespace ): if not self.running: break self.event_queue.put(('Deployment', event)) except Exception as e: print(f"Deployment监控异常:{str(e)}") if self.running: time.sleep(5) def _process_events(self): """ 处理事件队列 """ while self.running: try: resource_type, event = self.event_queue.get(timeout=1) self._handle_event(resource_type, event) except queue.Empty: continue except Exception as e: print(f"事件处理异常:{str(e)}") def _handle_event(self, resource_type, event): """ 处理具体事件 """ event_type = event['type'] obj = event['object'] print(f"收到{resource_type}事件:") print(f" 类型: {event_type}") print(f" 名称: {obj.metadata.name}") if resource_type == 'Pod': self._handle_pod_event(event_type, obj) elif resource_type == 'Deployment': self._handle_deployment_event(event_type, obj) def _handle_pod_event(self, event_type, pod): """ 处理Pod事件 """ if event_type == 'MODIFIED': if pod.status.phase == 'Failed': print(f"检测到Pod {pod.metadata.name} 失败,尝试重启") try: self.v1.delete_namespaced_pod( name=pod.metadata.name, namespace=self.namespace ) except ApiException as e: print(f"重启Pod失败:{str(e)}") def _handle_deployment_event(self, event_type, deployment): """ 处理Deployment事件 """ if event_type == 'MODIFIED': if deployment.status.ready_replicas != deployment.status.replicas: print(f"检测到Deployment {deployment.metadata.name} 副本不一致") # 这里可以添加自定义的处理逻辑 # 使用示例 controller = CustomController("default") controller.start() # 运行一段时间后停止 # time.sleep(3600) # controller.stop()

示例16:资源指标监控

from kubernetes.client import CustomObjectsApi import time class MetricsCollector: def __init__(self): self.custom_api = CustomObjectsApi() def get_node_metrics(self): """ 获取节点资源使用指标 """ try: metrics = self.custom_api.list_cluster_custom_object( group="metrics.k8s.io", version="v1beta1", plural="nodes" ) node_metrics = {} for item in metrics['items']: node_name = item['metadata']['name'] node_metrics[node_name] = { 'cpu': item['usage']['cpu'], 'memory': item['usage']['memory'] } return node_metrics except ApiException as e: print(f"获取节点指标失败:{str(e)}") return None def get_pod_metrics(self, namespace="default"): """ 获取Pod资源使用指标 """ try: metrics = self.custom_api.list_namespaced_custom_object( group="metrics.k8s.io", version="v1beta1", namespace=namespace, plural="pods" ) pod_metrics = {} for item in metrics['items']: pod_name = item['metadata']['name'] containers = {} for container in item['containers']: containers[container['name']] = { 'cpu': container['usage']['cpu'], 'memory': container['usage']['memory'] } pod_metrics[pod_name] = containers return pod_metrics except ApiException as e: print(f"获取Pod指标失败:{str(e)}") return None def monitor_resources(self, interval=30): """ 持续监控资源使用情况 """ while True: print("\n=== 资源使用情况 ===") # 获取节点指标 node_metrics = self.get_node_metrics() if node_metrics: print("\n节点资源使用情况:") for node_name, metrics in node_metrics.items(): print(f"\n节点: {node_name}") print(f"CPU使用: {metrics['cpu']}") print(f"内存使用: {metrics['memory']}") # 获取Pod指标 pod_metrics = self.get_pod_metrics() if pod_metrics: print("\nPod资源使用情况:") for pod_name, containers in pod_metrics.items(): print(f"\nPod: {pod_name}") for container_name, metrics in containers.items(): print(f"容器: {container_name}") print(f"CPU使用: {metrics['cpu']}") print(f"内存使用: {metrics['memory']}") time.sleep(interval) # 使用示例 collector = MetricsCollector() # collector.monitor_resources() # 持续监控

最佳实践和注意事项

  1. 错误处理
  • 始终使用try-except块处理API调用
  • 实现重试机制处理临时性故障
  • 记录详细的错误信息便于调试
  1. 性能优化
  • 使用批量操作代替单个操作
  • 实现合适的缓存机制
  • 避免频繁的API调用
  1. 安全考虑
  • 使用最小权限原则
  • 保护敏感信息(如密钥和证书)
  • 实现适当的认证和授权机制
  1. 可维护性
  • 模块化代码结构
  • 完善的日志记录
  • 清晰的代码注释

总结

本文详细介绍了如何使用Python操作Kubernetes集群,包括:

  1. 基础环境配置
  2. 常见资源操作
  3. 高级应用场景
  4. 自动化运维实践
  5. 监控和告警实现

通过这些示例和最佳实践,可以构建强大的Kubernetes自动化工具和运维系统。

本文使用 文章同步助手 同步

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容