马哥教育的Wayne老师讲解的python课程非常注重实际。比如他讲解的一个日志分析的案例,就是从运维实际出发,把日志分析的各个环节的原理讲的非常透彻。虽然现在日志分析有很多功能强大的专门软件,但是通过这个案例了解了日志处理的原理,对于工作也有很多的益处。(Wayne老师的课程链接 https://ke.qq.com/course/134017?taid=537476502522753)
Wayne老师的日志分析,每次讲课根据授课场景会有些变化。不过基本的组成部分大致是下面几步:
数据源的文本正则分析、数据类型转换、生产器方式产生数据、窗口函数、队列、线程和分发器、用Logging输出信息、通用的文件装载
尽管最终的程序不到100行,但是上面7个步骤包括了几个重要的模块的使用方法和一些编程技巧,熟练使用之后对于编程能力的提高很有好处。
(按照wayne老师的课堂提示:下面程序只是用于学习用途,还有很多不完善的地方)
下面一步步来,每一步的程序都是可以独立运行的,这样调试起来比较方便
1.数据源及正则分析
数据是网上查到的:故事大全的几天的日志数据。
数据格式:
2017-02-26 00:00:15 222.187.225.152 GET m.gushidaquan.cc/news/yule/1266748_10.html - 80 - 123.125.71.100 Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html) - 200
2017-02-26 00:00:19 222.187.225.152 GET www.gushidaquan.cc/811/2016820207621.html - 80 - 67.229.136.58 Mozilla/4.0 (compatible; MSIE 9.0; Windows NT 6.1) http://www.baidu.com 404
要对数据进行正则分析,效率最高的办法是使用辅助工具来写正则表达式字符串。Wayne老师推荐的是regester(https://deerchao.cn/tools/regester/index.htm),是国人开发的全免费软件。可以离线使用,很方便。
把源文本黏贴到左下方的“源文本”区,上方的“模式”区里面输入正则表达式字符串,然后点击“运行”按钮,在右侧的“结果”区域里面会出现分析得到的信息,非常好用。要注意的一点,就是regester用的是标准正则表达式,要在python程序中使用,要注意正则分组名称前面要加一个大写的P。比如(?<srcip>[\d.]{7,})要改成(?P<srcip>[\d.]{7,})。
对于同一条字符串,同一个分析要求,可以有多种多样的正则表达式字符串,没有什么标准答案,只要能用就行。处理过程中,碰到有问题的字符串还可以记录下来,以便后续优化。
(先下载数据源,从里面任意log文件中截取100条左右的数据,作为测试数据,保存到程序文件目录下,命名为“sample.log”,如果碰到字符集不是utf-8的情况,就用notepad等工具看一下文件的编码格式,在程序中用相应的编码格式打开文件):
代码
import re
pattern = "(?P<datetime>[\d\- :]{19}) (?P<destIP>[\d\.]*) (?P<method>\S*) (?P<url>\S*) - (?P<port>\d*) - " \
"(?P<SrcIP>[\d\.]{7,}) (?P<browser>.*) (?P<refUrl>[\S]*) (?P<status>[\d]*)"
regex = re.compile(pattern)
with open("sample.log") as f:
for line in f:
data = regex.match(line).groupdict()
print(data)
2.数据类型转换
数据类型转换的思路非常有用,适用面非常广。这里的关键式掌握lambda表达式、datetime日期字符串的格式转换。
这里的主要目的,是把日期字符串转换为datatime模块的格式,以便后续时间处理时适用datetime模块的函数。第2个lambda函数的含义是从ops字典中,如果找不到相关的字段对应的转换操作函数,就不进行任何转换,保持原值。
代码:
import re
import datetime
pattern = "(?P<datetime>[\d\- :]{19}) (?P<destIP>[\d\.]*) (?P<method>\S*) (?P<url>\S*) - (?P<port>\d*) - " \
"(?P<SrcIP>[\d\.]{7,}) (?P<browser>.*) (?P<refUrl>[\S]*) (?P<status>[\d]*)"
regex = re.compile(pattern)
ops = {"datetime": lambda datestr: datetime.datetime.strptime(datestr, "%Y-%m-%d %H:%M:%S"), "status":int }
with open("sample.log") as f:
for line in f:
data = {k: ops.get(k, lambda x: x)(v) for k,v in regex.match(line).groupdict().items()}
print(data)
3.生产器方式产生数据
日志数据通常量比较大,如果不采用惰性求值(生成器),对服务器的内存和CPU都会造成较大的压力,还有可能把服务器给搞瘫掉,比较危险。生成器的应用范围非常广,而且也很简单,是一个必须掌握的基本功。
import re
import datetime
pattern = "(?P<datetime>[\d\- :]{19}) (?P<destIP>[\d\.]*) (?P<method>\S*) (?P<url>\S*) - (?P<port>\d*) - " \
"(?P<SrcIP>[\d\.]{7,}) (?P<browser>.*) (?P<refUrl>[\S]*) (?P<status>[\d]*)"
regex = re.compile(pattern)
ops = {"datetime": lambda datestr: datetime.datetime.strptime(datestr, "%Y-%m-%d %H:%M:%S"), "status": int}
def extract(file):
with open(file) as f:
for line in f:
data = regex.match(line)
if data: # some line in log file return None
yield {k: ops.get(k, lambda x: x)(v) for k, v in data.groupdict().items()}
if __name__ == "__main__":
src = extract("sample.log")
for data in src:
print(data)
4.窗口函数
在分析日志的时候,一般是隔一段时间输出前一个统计窗口的统计信息。比如,每隔5秒钟输出前面1分钟内网站的访问量,网站访问的状态统计等。一般间隔(interval,本例是5秒)和窗口的宽带(width,本例是1分钟,即60秒)。这种情况下,每次只需要对一个窗口宽度(width)的数据进行处理,处理之后,要把最老的interval的数据扔掉,补充新的interval的数据,进行下一次统计。
这个过程在“BeautifulSoulpy的时间窗口函数实现(https://www.jianshu.com/p/c9e8d3d9a33f)”讲的比较清楚。
代码:
import re
import datetime
pattern = "(?P<datetime>[\d\- :]{19}) (?P<destIP>[\d\.]*) (?P<method>\S*) (?P<url>\S*) - (?P<port>\d*) - " \
"(?P<SrcIP>[\d\.]{7,}) (?P<browser>.*) (?P<refUrl>[\S]*) (?P<status>[\d]*)"
regex = re.compile(pattern)
ops = {"datetime": lambda datestr: datetime.datetime.strptime(datestr, "%Y-%m-%d %H:%M:%S"), "status": int}
def extract(file):
with open(file) as f:
for line in f:
data = regex.match(line)
if data: # some line in log file return None
yield {k: ops.get(k, lambda x: x)(v) for k, v in data.groupdict().items()}
def windows(src, handler, width, interval):
start = datetime.datetime.strptime("2000-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")
current = start
delta = datetime.timedelta(seconds= width - interval)
buf = []
while True:
try:
data = next(src)
except StopIteration:
break
if data:
current = data['datetime']
buf.append(data)
if (current-start).total_seconds() >= interval:
handler(buf)
start = current
buf = [x for x in buf if x['datetime']>=(current-delta)]
def dataprint(buffer): # handler that just print data
print("*" * 80)
for data in buffer:
print(data)
def datetimeprint(buffer):
print("*" * 80)
for data in buffer:
print(data['datetime'])
def statusinfo(buffer):
print("*" * 80)
status =[x['status'] for x in buffer]
print( {x: status.count(x)/len(status) for x in set(status)})
if __name__ == "__main__":
src = extract("sample.log")
# windows(src, dataprint, 10, 5)
# windows(src, datetimeprint, 10, 5)
windows(src, statusinfo, 10, 5)
5.队列、线程和分发器
日志分析中,会有多种需求,各种需求的width、interval和具体输出的内容不同。需要多个具体的处理函数,主程序要负责调度。这个就是分发器的由来。这里,分发器要做的是对每个统计需求都新产生一个线程,并把提取处理的日志信息发送到该线程对应的数据队列。分发器本身就是一个独立的讲课内容,wayne老师的课程中有专门的一个案例。
代码:
import re
import datetime
from queue import Queue
from threading import Thread
pattern = "(?P<datetime>[\d\- :]{19}) (?P<destIP>[\d\.]*) (?P<method>\S*) (?P<url>\S*) - (?P<port>\d*) - " \
"(?P<SrcIP>[\d\.]{7,}) (?P<browser>.*) (?P<refUrl>[\S]*) (?P<status>[\d]*)"
regex = re.compile(pattern)
ops = {"datetime": lambda datestr: datetime.datetime.strptime(datestr, "%Y-%m-%d %H:%M:%S"), "status": int}
def extract(file):
with open(file) as f:
for line in f:
data = regex.match(line)
if data: # some line in log file return None
yield {k: ops.get(k, lambda x: x)(v) for k, v in data.groupdict().items()}
def windows(src, handler, width, interval):
start = datetime.datetime.strptime("2000-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")
current = start
delta = datetime.timedelta(seconds=width - interval)
buf = []
while True:
# try:
# data = next(src)
# except StopIteration:
# break
data = src.get()
if data:
current = data['datetime']
buf.append(data)
if (current - start).total_seconds() >= interval:
handler(buf)
start = current
buf = [x for x in buf if x['datetime'] >= (current - delta)]
def dataprint(buffer): # handler that just print data
print("*" * 80)
for data in buffer:
print(data)
def datetimeprint(buffer):
print("*" * 80)
for data in buffer:
print(data['datetime'])
def statusinfo(buffer):
print("*" * 80)
status = [x['status'] for x in buffer]
print({x: status.count(x) / len(status) for x in set(status)})
def dispatcher(src):
qs = []
handlers = []
def reg(handler, width, interval):
q = Queue()
qs.append(q)
t = Thread(target=windows, args=(q, handler, width, interval), name=str(handler))
handlers.append(t)
def run():
for t in handlers:
t.start()
for data in src:
for q in qs:
q.put(data)
return reg, run
if __name__ == "__main__":
src = extract("20170224.log")
# windows(src, dataprint, 10, 5)
# windows(src, datetimeprint, 10, 5)
# windows(src, statusinfo, 10, 5)
reg, run = dispatcher(src)
reg(statusinfo, 10, 5)
reg(datetimeprint, 10, 5)
run()
6.用Logging输出信息
日志分析有多种展示形式,可以图形化、可以把存数据库,当然最简单的是直接打印。由于print语句不是线程安全的,因此如果适用print预计,各个线程输出的信息可能会混在一行中,造成数据混乱。模块logging提供了线程安全的输出方式。只要是牵涉到多线程、多进程,都会用到logging模块,也是必须掌握的基本功。
代码:
import re
import datetime
from queue import Queue
from threading import Thread
import logging
logformat="%(threadName)s: %(message)s"
logging.basicConfig(format=logformat, level=logging.INFO)
pattern = "(?P<datetime>[\d\- :]{19}) (?P<destIP>[\d\.]*) (?P<method>\S*) (?P<url>\S*) - (?P<port>\d*) - " \
"(?P<SrcIP>[\d\.]{7,}) (?P<browser>.*) (?P<refUrl>[\S]*) (?P<status>[\d]*)"
regex = re.compile(pattern)
ops = {"datetime": lambda datestr: datetime.datetime.strptime(datestr, "%Y-%m-%d %H:%M:%S"), "status": int}
def extract(file):
with open(file) as f:
for line in f:
data = regex.match(line)
if data: # some line in log file return None
yield {k: ops.get(k, lambda x: x)(v) for k, v in data.groupdict().items()}
def windows(src, handler, width, interval):
start = datetime.datetime.strptime("2000-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")
current = start
delta = datetime.timedelta(seconds=width - interval)
buf = []
while True:
# try:
# data = next(src)
# except StopIteration:
# break
data = src.get()
if data:
current = data['datetime']
buf.append(data)
if (current - start).total_seconds() >= interval:
handler(buf)
start = current
buf = [x for x in buf if x['datetime'] >= (current - delta)]
def dataprint(buffer): # handler that just print data
# print("*" * 80)
for data in buffer:
logging.info("{}".format(str(data)))
def datetimeprint(buffer):
for data in buffer:
logging.info("{}".format(data['datetime']))
def statusinfo(buffer):
status = [x['status'] for x in buffer]
# print({x: status.count(x) / len(status) for x in set(status)})
logging.info("{}".format({x: status.count(x) / len(status) for x in set(status)}))
def dispatcher(src):
qs = []
handlers = []
def reg(handler, width, interval, name):
q = Queue()
qs.append(q)
t = Thread(target=windows, args=(q, handler, width, interval),name=name)
handlers.append(t)
def run():
for t in handlers:
t.start()
for data in src:
for q in qs:
q.put(data)
return reg, run
if __name__ == "__main__":
src = extract("20170224.log")
# windows(src, dataprint, 10, 5)
# windows(src, datetimeprint, 10, 5)
# windows(src, statusinfo, 10, 5)
reg, run = dispatcher(src)
reg(statusinfo, 10, 5, 'statusThread')
reg(datetimeprint, 10, 5, 'dateprintThread')
run()
7.通用的文件装载
日志文件可能是一个日志目录下面还有子目录,子目录中有日志文件,也有其他文件,如果只选择特定目录下面特定文件名后缀的日志文件来处理,也有比较通用的办法,这个在“采蘑菇的下午茶”有详细的介绍。
代码:
import re
import datetime
from queue import Queue
from threading import Thread
import logging
from pathlib import Path
logformat="%(threadName)s: %(message)s"
logging.basicConfig(format=logformat, level=logging.INFO)
pattern = "(?P<datetime>[\d\- :]{19}) (?P<destIP>[\d\.]*) (?P<method>\S*) (?P<url>\S*) - (?P<port>\d*) - " \
"(?P<SrcIP>[\d\.]{7,}) (?P<browser>.*) (?P<refUrl>[\S]*) (?P<status>[\d]*)"
regex = re.compile(pattern)
ops = {"datetime": lambda datestr: datetime.datetime.strptime(datestr, "%Y-%m-%d %H:%M:%S"), "status": int}
def loadfile(filename, encoding):
# print(filename)
with open(filename, encoding=encoding) as f:
for line in f:
data = regex.match(line)
if data: # some line in log file return None
yield {k: ops.get(k, lambda x: x)(v) for k, v in data.groupdict().items()}
def windows(src, handler, width, interval):
start = datetime.datetime.strptime("2000-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")
current = start
delta = datetime.timedelta(seconds=width - interval)
buf = []
while True:
# try:
# data = next(src)
# except StopIteration:
# break
data = src.get()
if data:
current = data['datetime']
buf.append(data)
if (current - start).total_seconds() >= interval:
handler(buf)
start = current
buf = [x for x in buf if x['datetime'] >= (current - delta)]
def dataprint(buffer): # handler that just print data
# print("*" * 80)
for data in buffer:
logging.info("{}".format(str(data)))
def datetimeprint(buffer):
for data in buffer:
logging.info("{}".format(data['datetime']))
def statusinfo(buffer):
status = [x['status'] for x in buffer]
# print({x: status.count(x) / len(status) for x in set(status)})
logging.info("{}".format({x: status.count(x) / len(status) for x in set(status)}))
def dispatcher(src):
qs = []
handlers = []
def reg(handler, width, interval, name):
q = Queue()
qs.append(q)
t = Thread(target=windows, args=(q, handler, width, interval),name=name)
handlers.append(t)
def run():
for t in handlers:
t.start()
for data in src:
for q in qs:
q.put(data)
return reg, run
def load(*args, ext="*.log",recursive=False, encoding='utf-8'):
for pf in args:
print(pf)
p = Path(pf)
if p.exists():
if p.is_dir():
if type:
if isinstance(ext,str):
ext = [ext]
else:
ext = list(ext)
for s in ext:
fl = p.rglob(s) if recursive else p.glob(s)
for f in fl:
yield from loadfile(str(f.absolute()), encoding=encoding)
elif p.is_file():
yield from loadfile(str(p.absolute()), encoding=encoding)
if __name__ == "__main__":
src = load("sample.log")
# src = load('.')
# windows(src, dataprint, 10, 5)
# windows(src, datetimeprint, 10, 5)
# windows(src, statusinfo, 10, 5)
reg, run = dispatcher(src)
reg(statusinfo, 10, 5, 'statusThread')
reg(datetimeprint, 10, 5, 'dateprintThread')
run()
说明
Wayne老师在教学视频里面多次强调敲代码的重要性。一套代码,看看好像懂了,把讲义合上,有很多的语句可能有敲不出来了。要真正理解一个概念,最简单的标准就是:不看讲义,自己能够从头到尾把代码敲出来并且能够编译通过,这样对于相关的知识才算是有了真正的理解。
现在网络上的python教学课程非常多。学习的关键是找一套合适自己的资料,腾讯课堂里面的马哥教育针对不同的学习方向有不同的python教程,好像都是Wayne老师在讲,根据不同的要求对教程进行了取舍,很棒。
资料来源:
1. 马哥教育wayne老师教学视频:Wayne老师的课程链接https://ke.qq.com/course/134017?taid=537476502522753。
2. “采蘑菇的下午茶”的《Python学习之 ---日志分析+数据分发与分析+多线程+queue模块+日志分析综合》:https://blog.csdn.net/qq_40498551/article/details/90181774
3. BeautifulSoulpy的时间窗口函数实现:https://www.jianshu.com/p/c9e8d3d9a33f