# log_toboth.py
import logging
# 创建记录器,将其命名为Test
logger = logging.getLogger("Test")
# 默认日志级别为WARNING,这里改为DEBUG
logger.setLevel(logging.DEBUG)
# 创建FileHandler处理器,将日志消息输出到文件中,并设置特定的消息格式和日期格式
logfile = logging.FileHandler(filename='mylog.log',mode='w')
formatter1 = logging.Formatter(fmt='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',datefmt='%a, %d %b %Y %H:%M:%S')
logfile.setFormatter(formatter1)
# 创建StreamHandler处理器,将WARNING或更高级别的日志消息输出到控制台,设置特定消息格式
console = logging.StreamHandler()
console.setLevel(logging.WARNING)
formatter2 = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console.setFormatter(formatter2)
# 将处理器添加到记录器
logger.addHandler(logfile)
logger.addHandler(console)
# 输出不同级别的日志消息
logger.debug('debug,调式信息')
logger.info('info,一般信息')
logger.warning('waring,警告信息')
logger.error('error,错误信息')
logger.critical('critical,致命的错误信息')
# log_traceback.py
import logging
# 创建记录器
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.DEBUG)
# 创建格式化器
formatter = logging.Formatter('%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s - %(message)s')
# 创建FileHandler处理器,将日志消息输出到文件
file_handler = logging.FileHandler('traceback.log')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# 测试异常处理的函数
def func(num1, num2):
try:
x = num1 * num2
y = num1 / num2
return x, y
except Exception:
# 输出日志消息
logger.error('出现异常!', exc_info=True)
logger.info('已记录日志!')
if __name__ == '__main__':
func(2,0) # 除以0触发异常
# sysinfo_tolog.py
from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingScheduler
# 从sysinfo_bypsutil.py文件导入report()函数
from sysinfo_bypsutil import report
from sysinfo_bypsutil import gather_monitor_data
import logging
# 日志文件基本设置
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.WARNING, filename='sys_overload.log')
'''定义要执行的任务'''
def monjob():
print('监测时间: %s' % datetime.now())
report()
logging.warning('执行一次监测! ')
data = gather_monitor_data()
threshold = 2 # 过载阈值(百分比)
if data['cpu_percent'] > threshold:
logging.warning(f"CPU过载!使用率达 {data['cpu_percent']}%")
if data['mem_percent'] > threshold:
logging.warning(f"内存过载!使用率达 {data['mem_percent']}%")
if data['disk_percent'] > threshold:
logging.warning(f"磁盘空间紧张!使用率达 {data['disk_percent']}%")
if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_job(monjob, 'interval', minutes=5)
# 给出强制退出的组合键,兼顾Linux和Windows平台
print('按 Ctrl+{0} 键退出'.format('Break' if os.name == 'nt' else 'C '))
# 先运行一次定义的任务,再启动调度器
monjob()
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
print('已退出!')
exit()
# sysinfo_bypsutil.py
import psutil
import socket
import logging
'''通用的字节转换函数'''
def bytes2human(n):
symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
prefix = {}
for i, s in enumerate(symbols):
prefix[s] = 1 << (i + 1) * 10
for s in reversed(symbols):
if n >= prefix[s]:
value = float(n) / prefix[s]
return '%.1f%s' % (value, s)
return "%sB" % n
'''获取CPU信息'''
def get_cpu_info():
cpu_count = psutil.cpu_count()
cpu_percent = psutil.cpu_percent(interval=2)
return dict(cpu_count=cpu_count, cpu_percent=cpu_percent)
'''获取内存信息'''
def get_memory_info():
virtual_mem = psutil.virtual_memory()
mem_total = bytes2human(virtual_mem.total)
mem_used = bytes2human(virtual_mem.total * virtual_mem.percent)
mem_free = bytes2human(virtual_mem.free + virtual_mem.buffers + virtual_mem.cached)
mem_percent = virtual_mem.percent
return dict(mem_total=mem_total, mem_used=mem_used,mem_free=mem_free,mem_percent=mem_percent)
'''获取磁盘信息'''
def get_disk_info():
disk_usage = psutil.disk_usage('/')
disk_total = bytes2human(disk_usage.total)
disk_used = bytes2human(disk_usage.used)
disk_free = bytes2human(disk_usage.free)
disk_percent = disk_usage.percent
disk_io = psutil.disk_io_counters()
disk_read = bytes2human(disk_io.read_bytes)
disk_write = bytes2human(disk_io.write_bytes)
return dict(disk_total=disk_total,disk_used=disk_used,disk_free=disk_free, disk_percent=disk_percent,disk_read=disk_read,disk_write=disk_write)
'''获取网络信息'''
def get_net_info():
net_io = psutil.net_io_counters()
net_sent = bytes2human(net_io.bytes_sent)
net_recv = bytes2human(net_io.bytes_recv)
return dict(net_sent=net_sent,net_recv=net_recv)
'''汇集系统信息'''
def gather_monitor_data():
data = {}
data.update(get_cpu_info())
data.update(get_memory_info())
data.update(get_disk_info())
data.update(get_net_info())
return data
'''报告结果'''
def report():
# 获取主机名
hostname = socket.gethostname()
data = gather_monitor_data()
data.update(dict(hostname=hostname))
# 输出系统信息
print(f"{hostname}主机系统信息")
print("—————————————————————————")
print(f"CPU数量:{data['cpu_count']}")
print(f"CPU使用率:{data['cpu_percent']}%")
print("—————————————————————————")
print(f"内存总量:{data['mem_total']}")
print(f"已用内存:{data['mem_used']}")
print(f"空闲内存:{data['mem_free']}")
print(f"内存使用率:{data['mem_percent']}%")
print("—————————————————————————")
print(f"磁盘空间总量:{data['disk_total']}")
print(f"磁盘已用空间:{data['disk_used']}")
print(f"磁盘剩余空间:{data['disk_free']}")
print(f"磁盘空间使用率:{data['disk_percent']}%")
print(f"磁盘读取数据:{data['disk_read']}")
print(f"磁盘写入数据:{data['disk_write']}")
print("—————————————————————————")
print(f"网卡发送数据:{data['net_sent']}")
print(f"网卡接收数据:{data['net_recv']}")
if __name__ == '__main__':
report()