从0到1:Python打造MySQL专家系统(1)
本博客是赖明星
所撰写的Python Linux系统管理与自动化运维
其中最后一章的详细分析,从0到1的重构MySQL专家系统。
首先我们先了解我们要做什么
本系统是围绕MySQL的专家系统进行介绍。可以说是MySQL数据库的健康检查。所谓“健康检查”,就是通过对数据库的配置参数进行算法分析,为用户提供最优化的解决方案以解决数据库的各种潜在问题。
我们要检查什么
数据库检查:
- 服务器相关:
cpu
io
内存 磁盘 网络 - 数据库相关: 数据库的参数配置,主从复制的性能
- 业务相关: 表结构是否合理、 SQL语句、索引
怎么检查,怎么评分,怎么给建议
- 检查:
- 对于主键索引来说:
- 扫描MySQL库里面所有的表,看看是否存在主键,唯⼀索引和
primary key
- 在xxx 库里面xxx表,缺乏主键,建议添加
- 扫描MySQL库里面所有的表,看看是否存在主键,唯⼀索引和
- cpu利用率:
- ⼀段时间(t)内,
cpu
的利用率超过了阙值的时间,t_over
,t_over / t
超过了我们的默认阙值,我们就标记这⼀段时间, -
cpu
利⽤率超过了80%,建议做数据库迁移
- ⼀段时间(t)内,
- 用户弱密码:
- 做⼀个密码彩虹表,如果添加⽤户的时候,命中了彩虹表里面的值,提示用户,密码太简单,建议修改
- 评分
- 可以增加检查项
- 如果扣分,需要提示风险
main.py
测试用例:
usecase:
python test.py --host 127.0.0.1 --user root --password yourpassword --port 3306
先贴代码:
#!/usr/bin/python
#-*- coding: UTF-8 -*-
from __future__ import print_function
import argparse
import logging
import logging.config
import os
import sys
import traceback
pkg_root = os.path.realpath(os.path.join(os.path.realpath(__file__),
os.path.pardir,
os.path.pardir))
sys.path.append(pkg_root)
from health_checker.client.env import Env
from health_checker.client.database.mysql import DatabaseManager
from health_checker.client.client import Client
from health_checker.server.health_checker_server import HealthCheckerServer
log_cnf = os.path.join(pkg_root, 'conf', 'logging.cnf')
logging.config.fileConfig(log_cnf, disable_existing_loggers=False)
logging.basicConfig()
LOG = logging.getLogger(__name__)
def _argparse():
"""
argument parser
"""
parser = argparse.ArgumentParser(description='health checker for MySQL database')
parser.add_argument('--host', action='store', dest='host', required=True,
help='connect to host')
parser.add_argument('--user', action='store', dest='user', required=True,
help='user for login')
parser.add_argument('--password', action='store', dest='password',
required=True, help='password to use when connecting to server')
parser.add_argument('--port', action='store', dest='port', default=3306,
type=int, help='port number to use for connection or 3306 for default')
parser.add_argument('--conn_size', action='store', dest='conn_size', default=5,
type=int, help='how much connection for database usage')
parser.add_argument('-v', '--version', action='version', version='%(prog)s 0.1')
return parser.parse_args()
def main():
""" entry point """
try:
parser = _argparse()
# d = dict(host="59.111.124.115", user='laimingxing', password='laimingxing', port=3306, size=3)
# Env.database = DatabaseManager(host=parser.host, user=parser.user, password=parser.password, port=parser.port)
Env.database = DatabaseManager(host='127.0.0.1', user='root', password='fsy768394890', port=3306)
server = HealthCheckerServer(Client())
server.do_health_check()
server.get_summary()
except Exception, exc:
print(exc)
LOG.error(traceback.format_exc())
if __name__ == '__main__':
main()
from future import print_function 用法
在开头加上from future import print_function这句之后,即使在python2.X,使用print就得像python3.X那样加括号使用。python2.X中print不需要括号,而在python3.X中则需要。
# python2.7
print "Hello world"
# python3
print("Hello world")
os.path.join()函数:
路径拼接,连接两个或更多的路径名组件
In [25]: path1 = 'hello'
In [26]: path2 = 'world'
In [27]: path3 = '!'
In [28]: path = path1 + path2 + path3
In [29]: add_path = os.path.join(path1, path2, path3)
In [30]: print (path)
helloworld!
In [31]: print(add_path)
hello\world\!
- os.path.realpath(file)
获取当前执行脚本的绝对路径。
argparse模块解析
-
argparse
是一个Python
模块:命令行选项、参数和子命令解析器。 -
argparse
模块可以让人轻松编写用户友好的命令行接口。程序定义它需要的参数,然后argparse
将弄清如何从sys.argv
解析出那些参数。argparse
模块还会自动生成帮助和使用手册,并在用户给程序传入无效参数时报出错误信息。 - 使用流程:
-
创建解析器
parser = argparse.ArgumentParser(description='health checker for MySQL database')
使用
argparse
的第一步是创建一个ArgumentParser
]对象。ArgumentParser
对象包含将命令行解析成 Python 数据类型所需的全部信息。 -
添加参数
parser.add_argument('--host', action='store', dest='host', required=True,help='connect to host')
通过
add_argument()
方法给程序添加参数信息 -
解析参数
parser.parse_args()
ArgumentParser
通过parse_args()
解析参数
-
1. 数据库专家系统的客户端设计(client端)
客户端的主要任务就是接收服务端发送过来的消息,并进行解析。解析完成后进行相应的判断, 进行评分和生成修复意见。
1.1 实现MySQL数据库连接池
建立数据库连接池的好处简单来说就是随用随取,需要连接时从数据库中取出,使用完之后断开连接返回连接池。
# -*- coding:UTF-8 -*-
import logging
import Queue
import MySQLdb
LOG = logging.getLogger(__name__)
class ConnectionPool(object):
def __init__(self, **kwargs):
self.size = kwargs.get('size', 10)
self.kwargs = kwargs
self.conn_queue = Queue.Queue(maxsize=self.size)
for i in range(self.size):
self.conn_queue.put(self._create_new_conn())
def _create_new_conn(self):
return MySQLdb.connect(host=self.kwargs.get('host', '127.0.0.1'),
user=self.kwargs.get('user'),
passwd=self.kwargs.get('password'),
port=self.kwargs.get('port', 3306),
connect_timeout=5)
def _put_conn(self, conn):
self.conn_queue.put(conn)
def _get_conn(self):
conn = self.conn_queue.get()
if conn is None:
self._create_new_conn()
return conn
def exec_sql(self, sql):
conn = self._get_conn()
try:
with conn as cur:
cur.execute(sql)
return cur.fetchall()
except MySQLdb.ProgrammingError as e:
LOG.error("execute sql ({0}) error {1}".format(sql, e))
raise e
except MySQLdb.OperationalError as e:
# create connection if connection has interrupted
conn = self._create_new_conn()
raise e
finally:
self._put_conn(conn)
def __del__(self):
try:
while True:
conn = self.conn_queue.get_nowait()
if conn:
conn.close()
except Queue.Empty:
pass
- Python中的
**kwargs
是什么?- 在Python中的代码中经常会见到这两个词
args
和kwargs
,前面通常还会加上一个或者两个星号。 - Python中的
args
是arguments
的缩写,表示位置参数;kwargs
是keyword arguments
的缩写,表示关键字参数。 -
args
的类型为<type 'tuple'>
;kwargs
的类型为<type 'dict'>
; - 通常
*args
必须放在**kwargs
的前面,因为位置参数在关键字参数的前面。
- 在Python中的代码中经常会见到这两个词
In [15]: def printSores(student, *scores):
...: print('Student Name:{0}'.format(student))
...: for score in scores:
...: print(score)
...:
In [16]: printSores("Tom", 100, 98, 95, 92, 99)
Student Name:Tom
100
98
95
92
99
同时推荐Python传入不固定的参数给函数,或者传入很多的内容给函数,常用在构造函数中可以看看实例加以运行理解,这里不多赘述。
-
多个线程并发访问连接池使用的Queue模块
Python的Queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的同步。
常用方法:
- Queue.qsize() 返回队列的大小
- Queue.empty() 如果队列为空,返回True,反之False
- Queue.full() 如果队列满了,返回True,反之False,Queue.full 与 maxsize 大小对应
- Queue.get([block[, timeout]])获取队列,timeout等待时间
- Queue.get_nowait() 相当于Queue.get(False),非阻塞方法
- Queue.put(item) 写入队列,timeout等待时间
- Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号。每个get()调用得到一个任务,接下来task_done()调用告诉队列该任务已经处理完毕。
- Queue.join() 实际上意味着等到队列为空,再执行别的操作
-
Python中的
fetchone()
和fetchall()
方法-
fetchone() :
返回单个的元组,也就是一条记录(row),如果没有结果 , 则返回 None
cur.execute("select user,password from user where user='%s'" %name) arr= cur.fetchone() ----此时 通过 arr[0],arr[1]可以依次访问user,password
-
fetchall() :
返回多个元组,即返回多条记录(rows) 返回的是二维元组,如果没有结果,则返回 ()
cur.execute("select * from user")
-
1.2 处理MySQL数据库连接异常
处理数据库连接时,MySQLdb可能会抛出两个异常,分别是
ProgrammingError
和OperationalError
.
-
ProgrammingError
表示SQL语句存在语法问题 -
OperationalError
表示数据库连接中断。连接终端时,我们要重新创建连接。
1.3 使用装饰器检查参数 util.py
# -*- coding: utf8 -*-
import inspect
import logging
import functools
import re
import psutil
LOG = logging.getLogger(__name__)
def lower_case_with_underscores(name):
"""
convert camel case to under_line case
CamelCase -> camel_case
link: (http://stackoverflow.com/questions/1175208/
elegant-python-function-to-convert-camelcase-to-camel-case)
"""
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
def get_disk_capacity(path):
"""
通过MySQL的变量datadir获取数据盘的路径,再使用psutil获取数据盘的空间
In [1]: import psutil
In [2]: psutil.disk_usage('/ebs/mysql_data')
Out[2]: sdiskusage(total=214643507200, used=16532504576, free=198111002624, percent=7.7)
"""
return psutil.disk_usage(path).total
def check_required_args(parameters):
"""check parameters of action"""
def decorated(f):
"""decorator"""
@functools.wraps(f)
def wrapper(*args, **kwargs):
"""wrapper"""
# inspect.getcallargs(func[, *args][, **kwds]):
# 将args和kwds参数到绑定到为func的参数名,作为func函数形参的值;
func_args = inspect.getcallargs(f, *args, **kwargs)
kwargs = func_args.get('kwargs')
for item in parameters:
if kwargs.get(item) is None:
message = "check required args failed, `{0}` is not found in {1}".format(item, f.__name__)
LOG.error(message)
raise Exception(message)
return f(*args, **kwargs)
return wrapper
return decorated
用装饰器来实现,函数参数的强制类型检查。就不用为每个消息都编写一个检查器了
-
functools.wraps
定义函数装饰器参照于出处。理解Python装饰器(Decorator)
装饰器本质上是一个 Python 函数或类,它可以让其他函数或类在不需要做任何代码修改的前提下增加额外功能。
它经常用于有切面需求的场景,比如:插入日志、性能测试、事务处理、缓存、权限校验等场景
-
第一种:普通不带参数的装饰器
def use_logging(func): def wrapper(): logging.warn("%s is running" % func.__name__) return func() # 把 foo 当做参数传递进来时,执行func()就相当于执行foo() return wrapper def foo(): print('i am foo') foo = use_logging(foo) # 因为装饰器 use_logging(foo) 返回的时函数对象 wrapper,这条语句相当于 foo = wrapper foo() # 执行foo()就相当于执行 wrapper()
use_logging 就是一个装饰器,它一个普通的函数,它把执行真正业务逻辑的函数 func 包裹在其中,看起来像 foo 被 use_logging 装饰了一样,use_logging 返回的也是一个函数,这个函数的名字叫 wrapper。在这个例子中,函数进入和退出时 ,被称为一个横切面,这种编程方式被称为面向切面的编程。
@ 符号就是装饰器的语法糖,它放在函数开始定义的地方,这样就可以省略最后一步再次赋值的操作。
-
def use_logging(func):
def wrapper():
logging.warn("%s is running" % func.__name__)
return func()
return wrapper
@use_logging
def foo():
print("i am foo")
foo()
输出
In [71]: foo()
WARNING:root:foo is running
i am foo
有了 @ ,我们就可以省去foo = use_logging(foo)这一句了
直接调用 foo() 即可得到想要的结果。
-
业务逻辑函数 foo 需要参数的情况:
def foo(name): print("i am %s" % name)
我们可以在定义 wrapper 函数的时候指定参数:
def wrapper(name): logging.warn("%s is running" % func.__name__) return func(name) return wrapper
这样 foo 函数定义的参数就可以定义在 wrapper 函数中。
当装饰器不知道 foo 到底有多少个参数时,我们可以用
*args
来代替:def wrapper(*args): logging.warn("%s is running" % func.__name__) return func(*args) return wrapper
如果 foo 函数还定义了一些关键字参数呢?比如:
def foo(name, age=None, height=None): print("I am %s, age %s, height %s" % (name, age, height))
这时,你就可以把
wrapper
函数指定关键字函数:def wrapper(*args, **kwargs): # args是一个数组,kwargs一个字典 logging.warn("%s is running" % func.__name__) return func(*args, **kwargs) return wrapper
- 第二种:带参数的装饰器
装饰器还有更大的灵活性,例如带参数的装饰器,在上面的装饰器调用中,该装饰器接收唯一的参数就是执行业务的函数 foo 。装饰器的语法允许我们在调用时,提供其它参数,比如@decorator(a)。这样,就为装饰器的编写和使用提供了更大的灵活性。比如,我们可以在装饰器中指定日志的等级,因为不同业务函数可能需要的日志级别是不一样的。
def use_logging(level): def decorator(func): def wrapper(*args, **kwargs): if level == "warn": logging.warn("%s is running" % func.__name__) elif level == "info": logging.info("%s is running" % func.__name__) return func(*args) return wrapper return decorator @use_logging(level="warn") def foo(name='foo'): print("i am %s" % name) foo()
上面的 use_logging 是允许带参数的装饰器。它实际上是对原有装饰器的一个函数封装,并返回一个装饰器。我们可以将它理解为一个含有参数的闭包。当我 们使用@use_logging(level="warn")调用的时候,Python 能够发现这一层的封装,并把参数传递到装饰器的环境中。
@use_logging(level="warn") 等价于 @decorator
1.4 利用Python的动态语言特性执行命令
-
Python内置的dir函数会返回属性的列表
In [72]: class Person(object): ...: def __init__(self, name): ...: self.name = name ...: def get_first_name(self): ...: return self.name.split()[0] ...: def get_last_name(self): ...: return self.name.split()[-1] ...: In [73]: jason = Person('Jason Statham') In [74]: dir(jason) Out[74]: ['__class__', '__delattr__', '__dict__', '__doc__', '__format__', '__getattribute__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', 'get_first_name', 'get_last_name', 'name']
-
已知
jason
对象所拥有的属性,我们可以用hasattr
和getattr
函数来测试某个属性是否存在并获取该属性,如下所示:In [76]: hasattr(jason, 'get_first_name') Out[76]: True In [77]: action = getattr(jason, 'get_first_name') In [78]: action() Out[78]: 'Jason' In [79]: action = getattr(jason, 'get_last_name') In [80]: action() Out[80]: 'Statham'
在MySQL健康检查器中,我们利用了Python的自省功能,用
hasattr
函数判断客户端是否拥有相应的属性,然后调用getattr
来获得该消息的处理函数。这样就免去了我们使用if/else
语句处理不同的消息
1.5 利用__call__
方法实现可调用对象
如果在类中实现了 call 方法,那么实例对象也将成为一个可调用对象。也就是说,我们可以像普通函数一样调用一个类对象。
class CheckSafeReplication(object):
def __init__(self, params):
self.params= params
def get_slave_status(self):
res = {}
slave_status_dict = Env.database.get_slave_status_dict()
res['slave_io_running'] = slave_status_dict['Slave_IO_Running']
res['slave_sql_running'] = slave_status_dict['Slave_SQL_Running']
res['last_io_error'] = slave_status_dict['Last_IO_Error']
res['last_sql_error'] = slave_status_dict['Last_SQL_Error']
return res
def __call__(self):
res = dict(is_slave=Env.database.is_slave)
if Env.database.is_slave:
res.update(Env.database.get_multi_variables_value('relay_log_recovery',
'relay_log_info_repository'))
res.update(self.get_slave_status())
return res
因为CheckSafeReplication
实现了__call__
方法,因为我们可以向函数一样调用CheckSafeReplication
的类对象,如下所示:
def check_safe_replication(msg):
obj = CheckSafeReplication(msg)
return obj()
详细解释可看Python __call__
详解
1.6 Python 的 property
在Python中没有像Java那样的getter和setter的用法。
因为我们可以直接修改对象的属性,如下所示:
In [97]: class Person(object):
...: def __init__(self, name, age):
...: self.name = name
...: self.age = age
...:
In [98]: jason = Person('Jason Statham' , 50)
In [99]: jason.age = -1
In [100]: jason.age
Out[100]: -1
上面这段程序中,虽然age
的取值从 50 变成了 -1 , 但是不符合逻辑,年龄和可能为-1,为了避免这种错误发生,我们可以参照java,项属性设置为私有,并提供一个getter
和setter
In [102]: class Person(object):
...: def __init__(self, name, age):
...: self.name = name
...: self._age = age
...: def get_age(self):
...: return self._age
...: def set_age(self, age):
...: if age < 0 or age > 100:
...: raise ValueError('age is illegal')
...: self._age = age
上面这种方法我们实现了像java中的set和get方法,但是不够Pythonic(不够Python范儿)。
我们也可以很Python范儿,就是使用property
装饰器将方法当做属性访问,从而提供更加友好的访问方式。如下:
In [110]: class Person(object):
...: def __init__(self, name, age):
...: self.name = name
...: self.age = age
...: @property
...: def age(self):
...: return self._age
...: @age.setter
...: def age(self, age):
...: if age < 0 or age > 100:
...: raise ValueError('age is illegal')
...: self._age = age
...:
In [111]: jason = Person('Jason Statham' , 50)
In [112]: jason.age = -1
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-112-8f6f46022edd> in <module>()
----> 1 jason.age = -1
<ipython-input-110-7a772ee4ed7c> in age(self, age)
9 def age(self, age):
10 if age < 0 or age > 100:
---> 11 raise ValueError('age is illegal')
12 self._age = age
13
ValueError: age is illegal
In [113]: jason.age = 20
In [114]: jason.age
Out[114]: 20
上面这段代码中,age.setter
装饰器为age
属性创建了一个setter
方法。当我们修改age
时,这个setter
方法将会自动调用。
从0到1:Python打造MySQL专家系统(2)
2. 数据库专家系统的服务端设计(server端)
2.1 将相同的操作提升到父类中
本数据库专家系统分为很多个检查项,每个检查项里面包含多个检查点。对于每一个检查项,在专家系统的服务端中都是一个Worker
。每个Work
本身独立,又有一些相同点,相同的地方比如打日志,处理异常。这个时候,我们可以把共性提取出来,在父类中实现(GenericWorker
)。每一个子类Worker
只需要继承GenericWorker
这个父类,就实现了记录时间,打印日志和处理异常。这样一来,每个Worker
就可以专注于具体的业务逻辑。
2.2 在Python中实现map-reduce模型
为每个Worker
分配一个线程,等线程结束之后再将各个Worker
的结果汇总。这是一个典型的map-reduce
框架。在map
阶段,将各个任务分发出去;在reduce
阶段,将多个任务的执行结果汇总起来。
我们举个栗子,以1-100的和为例说明在Python中实现一个map-reduce
框架。首先我们将1~100分成10个区间,然后将每个区间分配给1个线程执行。我们一共要使用10个线程,等待10个线程都结束以后我们再将所有的结果汇总起来。
In [1]: def Cal(object):
...: def __init__(self, start, end):
...: self.result = 0
...: self.start = start
...: self.end = end
...: def map(self):
...: for i in range(self.start, self.end):
...: self.result += i
...: def reduce(self, other):
...: self.reduce += other.result
在这个Cal
类中,初始化了区间的起点和终点(start
和end
)。
map
函数是我们的业务逻辑,在MySQL数据库专家系统中,我们将在map
函数中进行数据库检查和评分。
可以手动此处。一文搞懂python的map、reduce函数