python实现交换机自动化巡检

手头有一些华为的接入和汇聚交换机需要做定期的巡检,正好最近在学习python,于是乎按照网上前辈们的思路用python操作一番,主要用了pexpect、xlwt、xlrd这几个模块,由于pexpect在windows环境下执行有问题,所以下面的代码全程是在linux环境下执行。
直接上代码,如下:

import re
import os
import xlwt
import xlrd
import pexpect
import datetime
from xlwt import *

counter0 = 1    # 计数器,全局变量
counter1 = 1
counter2 = 1


class Tools:
    @staticmethod
    def directory_check(arg_path):  # 待生成巡检文档路径检查,没有则创建
        today = datetime.date.today().strftime('%Y%m%d')
        path = arg_path + today + '/'
        if not os.path.exists(path):
            os.mkdir(path, 0o755)
        return path

    @staticmethod
    def generator(arg_sheet, arg_n):    # 生成器,读取保存有交换机信息的表格,返回交换机名称/用户名/ip/密码
        i = 1
        while i <= arg_n:
            row_list = arg_sheet.row_values(i)
            sw_sysname = row_list[1]
            sw_username = row_list[2]
            sw_ip = row_list[3]
            sw_pwd = row_list[4]
            i = i + 1
            yield sw_sysname, sw_username, sw_ip, sw_pwd

    @staticmethod
    def get_filename(file_dir):    # 获取已生成的交换机巡检文本文件,返回文件名列表
        path = Tools.directory_check(file_dir)
        txt_list = []
        for files in os.walk(path):
            for file in files:
                for inspection_txt in file:
                    inspection_txt = ''.join(inspection_txt)
                    if os.path.splitext(inspection_txt)[-1] == ".txt":
                        txt_list.append(inspection_txt)
            return txt_list


class Switch:
    def __init__(self, system_name, user_name, ip_address, user_password):
        self.system_name = system_name
        self.user_name = user_name
        self.user_password = user_password
        self.ip_address = ip_address

    def login(self, arg_path):    # telnet登录交换机
        path = Tools.directory_check(arg_path)
        try:
            self.child = pexpect.spawn('telnet %s' % self.ip_address)
            self.child.logfile = open(path + '%s--%s--log.txt' % (self.system_name, self.ip_address), 'wb+')
            self.child.expect('[Uu]sername:')  # 提示输入用户名
            self.child.sendline("%s" % self.user_name)
            self.child.expect('[Pp]assword:')  # 提示输入密码
            self.child.sendline("%s" % self.user_password)
        except Exception:
            print("Some error occurred when telnet to the remote device!")
            exit(1)
        else:
            return self.child

    def inspect(self, command):    # 执行巡检命令
        sign = "---- More ----"
        while True:
            index = self.child.expect([sign, '<%s>' % self.system_name])
            if index == 0:
                self.child.send(" ")
                continue
            elif index == 1:
                self.child.sendline(command)
                break


class WorkBook:
    def __init__(self):
        self.power1_status = None
        self.power2_status = None
        self.power3_status = None
        self.power4_status = None
        self.up_time = None
        self.temperature = None
        self.fan_status = None
        self.cpu_status = None
        self.memory_status = None
        self.eth_trunk_0 = None
        self.eth_trunk_1 = None
        self.eth_trunk_2 = None
        self.eth_trunk_3 = None
        self.eth_trunk_4 = None
        self.eth_trunk_5 = None
        self.eth_trunk_6 = None
        self.log_counts = None
        self.routingtable_counts = None

    def matching(self, dir_txt):    # 逐行遍历,正则匹配关键字
        file = open(dir_txt, 'r+')
        inspection_list = file.readlines()
        i = 1
        for line in inspection_list:
            if re.search(r'0(\s+)PWR(I|1)(\s+)', line):
                self.power1_status = line.split()[2] + ' ' + line.split()[4] + ' ' + line.split()[5]
            if re.search(r'0(\s+)PWR(II|2)(\s+)', line):
                self.power2_status = line.split()[2] + ' ' + line.split()[4] + ' ' + line.split()[5]
            if re.search(r'1(\s+)PWR(I|1)(\s+)', line):
                self.power3_status = line.split()[2] + ' ' + line.split()[4] + ' ' + line.split()[5]
            if re.search(r'1(\s+)PWR(II|2)(\s+)', line):
                self.power4_status = line.split()[2] + ' ' + line.split()[4] + ' ' + line.split()[5]
            if ': uptime is' in line:
                self.up_time = re.search(r': uptime is(.*)', line).group(1)
            if 'Lower(C)' in line:
                self.temperature = inspection_list[i + 2].split()[4]
            if 'CurrentTemperature' in line:
                self.temperature = inspection_list[i + 1].split()[1]
            if 'FanID' in line:
                self.fan_status = inspection_list[i + 1].split()[3]
            if 'five minutes:' in line:
                self.cpu_status = re.search(r'five minutes:(.*)', line, re.I).group(1)
            if 'Memory Using Percentage Is:' in line:
                self.memory_status = re.search(r'Memory Using Percentage Is:(.*)', line).group(1)
            if re.match(r'Eth-Trunk0(\s+(up|down))', line):
                self.eth_trunk_0 = line.split()[1] + ' ' + line.split()[2]
            if re.match(r'Eth-Trunk1(\s+(up|down))', line):
                self.eth_trunk_1 = line.split()[1] + ' ' + line.split()[2]
            if re.match(r'Eth-Trunk2(\s+(up|down))', line):
                self.eth_trunk_2 = line.split()[1] + ' ' + line.split()[2]
            if re.match(r'Eth-Trunk3(\s+(up|down))', line):
                self.eth_trunk_3 = line.split()[1] + ' ' + line.split()[2]
            if re.match(r'Eth-Trunk4(\s+(up|down))', line):
                self.eth_trunk_4 = line.split()[1] + ' ' + line.split()[2]
            if re.match(r'Eth-Trunk5(\s+(up|down))', line):
                self.eth_trunk_5 = line.split()[1] + ' ' + line.split()[2]
            if re.match(r'Eth-Trunk6(\s+(up|down))', line):
                self.eth_trunk_6 = line.split()[1] + ' ' + line.split()[2]
            if 'Current messages :' in line:
                self.log_counts = line.split()[3]
            if 'Routes :' in line:
                self.routingtable_counts = line.split()[5]
            i += 1

    def setting_style(self):    # 输出巡检表格的样式设置
        self.workbook = xlwt.Workbook()
        self.style = XFStyle()
        pattern = Pattern()
        pattern.pattern = Pattern.SOLID_PATTERN
        pattern.pattern_fore_colour = Style.colour_map['blue']
        self.style.pattern = pattern
        borders = xlwt.Borders()
        borders.left = 1
        borders.right = 1
        borders.top = 1
        borders.bottom = 1
        self.style.borders = borders

        self.style1 = XFStyle()
        borders = xlwt.Borders()
        borders.left = 1
        borders.right = 1
        borders.top = 1
        borders.bottom = 1
        self.style1.borders = borders

        self.style3 = XFStyle()
        borders = xlwt.Borders()
        borders.left = 1
        borders.right = 1
        borders.top = 1
        borders.bottom = 1
        self.style3.borders = borders
        al = xlwt.Alignment()
        al.horz = 0x02
        al.vert = 0x01
        self.style3.alignment = al

        self.SwitchCheck = self.workbook.add_sheet('SwitchCheck', cell_overwrite_ok=True)

        first_col = self.SwitchCheck.col(0)
        sec_col = self.SwitchCheck.col(1)
        thr_col = self.SwitchCheck.col(2)
        for_col = self.SwitchCheck.col(3)
        first_col.width = 150 * 25
        sec_col.width = 100 * 25
        thr_col.width = 120 * 25
        for_col.width = 320 * 25

    def write_workbook_header(self):    # 输出巡检表格的表头
        self.SwitchCheck.write(0, 0, '设备名称', self.style)
        self.SwitchCheck.write(0, 1, '管理IP', self.style)
        self.SwitchCheck.write(0, 2, '巡检项目', self.style)
        self.SwitchCheck.write(0, 3, '巡检结果', self.style)

    def write_workbook_body(self, name, ip):    # 输出巡检表格的详细内容
        global counter0
        global counter1
        global counter2
        checklist_head = [
            '电源0-1',
            '电源0-2',
            '电源1-1',
            '电源1-2',
            '运行时间',
            '运行温度',
            '风扇',
            'CPU利用率',
            '内存利用率',
            '聚合端口0',
            '聚合端口1',
            '聚合端口2',
            '聚合端口3',
            '聚合端口4',
            '聚合端口5',
            '聚合端口6',
            '日志条目',
            '路由表条目',
        ]
        checklist_content = [
            self.power1_status,
            self.power2_status,
            self.power3_status,
            self.power4_status,
            self.up_time,
            self.temperature,
            self.fan_status,
            self.cpu_status,
            self.memory_status,
            self.eth_trunk_0,
            self.eth_trunk_1,
            self.eth_trunk_2,
            self.eth_trunk_3,
            self.eth_trunk_4,
            self.eth_trunk_5,
            self.eth_trunk_6,
            self.log_counts,
            self.routingtable_counts
        ]
        for check_item_h in checklist_head:
            self.SwitchCheck.write(counter1, 2, check_item_h, self.style1)
            counter1 += 1
        for check_item_c in checklist_content:
            self.SwitchCheck.write(counter2, 3, check_item_c, self.style1)
            counter2 += 1
        self.SwitchCheck.write_merge(counter0, counter1-1, 0, 0, name, self.style3)
        self.SwitchCheck.write_merge(counter0, counter1-1, 1, 1, ip, self.style3)
        counter0 = counter0 + len(checklist_head)


if __name__ == "__main__":
    cmd_list = [
        "display cpu-usage",
        "display memory",
        "display temperature all",
        "display environment",
        "display fan",
        "display power",
        "display ip routing-table",
        "display interface brief",
        "dis version",
        "display log",
        "quit"
    ]    # 巡检命令列表

    log_path = "/home/test/inspection/"    # 生成巡检日志文件的路径
    excel_path = '/home/test/sw_info.xlsx'    # 存放交换机登录信息的路径
    excel = xlrd.open_workbook(excel_path)    # 打开excel
    sheet = excel.sheet_by_index(0)    # 获取第一个sheet数据
    rows = sheet.nrows    # 获取sheet行数
    gen = Tools.generator(sheet, rows - 1)    # 实例化上面Tools类中定义的生成器
    for item in gen:
        sw = Switch(item[0], item[1], item[2], item[3])
        sw.login(log_path)
        for cmd in cmd_list:
            sw.inspect(cmd)

    list_target = Tools.get_filename(log_path)
    WB = WorkBook()
    WB.setting_style()
    for item in list_target:
        sw_name = item.split('--')[0]
        sw_ip = item.split('--')[1]
        WB.matching(Tools.directory_check(log_path) + item)
        WB.write_workbook_header()
        WB.write_workbook_body(sw_name, sw_ip)
        WB.workbook.save('%sSwitchInspection.xls' % log_path)

说明:
1、程序会自动读取存放有交换机登录信息的excel表格,表格具体路径为 "/home/test/sw_info.xlsx";
从表格中获取交换机的ip、sysname、username、password信息,然后telnet到目标交换机,执行相关巡检命令;
表格中待巡检交换机的信息按照固定格式,逐行填写,如下:

image.png

如图中所示,表格中的交换机名称对应代码中的system_name, 用户名对应user_name, IP对应ip_address, 密码对应user_password,按照格式填写即可;
2、程序逐个telnet到交换机上执行完巡检命令后,会在 "/home/test/inspection/当前日期/" 的路径下按照不同交换机逐个生成以交换机名称和ip地址命名的txt文件,保存巡检命令执行的结果记录,如Huawei-sw-01--192.168.x.x--log.txt;
3、程序逐个读取上面的txt巡检结果文件,正则匹配关键字来获取不同巡检项目的结果数据,并填入新生成的excel表格中,新生成的excel表格路径为 "/home/test/inspection/SwitchInspection.xls"。
4、最终效果如下:
image.png

多个交换机的巡检结果会自动向下生成。
5、最后
代码总体上参考了下面这位前辈的处理思路,我按照面向对象的思想改了下代码块,整体来说写的比较乱,面向对象学得还是不到家,大家将就看看吧
参考链接:https://blog.csdn.net/weixin_42551826/article/details/81625375

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。