马哥教育网站日志分析分步详解

马哥教育的Wayne老师讲解的python课程非常注重实际。比如他讲解的一个日志分析的案例,就是从运维实际出发,把日志分析的各个环节的原理讲的非常透彻。虽然现在日志分析有很多功能强大的专门软件,但是通过这个案例了解了日志处理的原理,对于工作也有很多的益处。(Wayne老师的课程链接 https://ke.qq.com/course/134017?taid=537476502522753

Wayne老师的日志分析,每次讲课根据授课场景会有些变化。不过基本的组成部分大致是下面几步:

数据源的文本正则分析、数据类型转换、生产器方式产生数据、窗口函数、队列、线程和分发器、用Logging输出信息、通用的文件装载

尽管最终的程序不到100行,但是上面7个步骤包括了几个重要的模块的使用方法和一些编程技巧,熟练使用之后对于编程能力的提高很有好处。

(按照wayne老师的课堂提示:下面程序只是用于学习用途,还有很多不完善的地方)

下面一步步来,每一步的程序都是可以独立运行的,这样调试起来比较方便

1.数据源及正则分析

数据是网上查到的:故事大全的几天的日志数据

数据格式:

2017-02-26 00:00:15 222.187.225.152 GET m.gushidaquan.cc/news/yule/1266748_10.html - 80 - 123.125.71.100 Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html) - 200

2017-02-26 00:00:19 222.187.225.152 GET www.gushidaquan.cc/811/2016820207621.html - 80 - 67.229.136.58 Mozilla/4.0 (compatible; MSIE 9.0; Windows NT 6.1) http://www.baidu.com 404

要对数据进行正则分析,效率最高的办法是使用辅助工具来写正则表达式字符串。Wayne老师推荐的是regester(https://deerchao.cn/tools/regester/index.htm),是国人开发的全免费软件。可以离线使用,很方便。

把源文本黏贴到左下方的“源文本”区,上方的“模式”区里面输入正则表达式字符串,然后点击“运行”按钮,在右侧的“结果”区域里面会出现分析得到的信息,非常好用。要注意的一点,就是regester用的是标准正则表达式,要在python程序中使用,要注意正则分组名称前面要加一个大写的P。比如(?<srcip>[\d.]{7,})要改成(?P<srcip>[\d.]{7,})。

对于同一条字符串,同一个分析要求,可以有多种多样的正则表达式字符串,没有什么标准答案,只要能用就行。处理过程中,碰到有问题的字符串还可以记录下来,以便后续优化。

(先下载数据源,从里面任意log文件中截取100条左右的数据,作为测试数据,保存到程序文件目录下,命名为“sample.log”,如果碰到字符集不是utf-8的情况,就用notepad等工具看一下文件的编码格式,在程序中用相应的编码格式打开文件):

代码

import re

pattern = "(?P<datetime>[\d\- :]{19}) (?P<destIP>[\d\.]*) (?P<method>\S*) (?P<url>\S*) - (?P<port>\d*) - " \

          "(?P<SrcIP>[\d\.]{7,}) (?P<browser>.*) (?P<refUrl>[\S]*) (?P<status>[\d]*)"

regex = re.compile(pattern)

with open("sample.log") as f:

    for line in f:

        data = regex.match(line).groupdict()

        print(data)

2.数据类型转换

数据类型转换的思路非常有用,适用面非常广。这里的关键式掌握lambda表达式、datetime日期字符串的格式转换。

这里的主要目的,是把日期字符串转换为datatime模块的格式,以便后续时间处理时适用datetime模块的函数。第2个lambda函数的含义是从ops字典中,如果找不到相关的字段对应的转换操作函数,就不进行任何转换,保持原值。

代码:

import re

import datetime

pattern = "(?P<datetime>[\d\- :]{19}) (?P<destIP>[\d\.]*) (?P<method>\S*) (?P<url>\S*) - (?P<port>\d*) - " \

          "(?P<SrcIP>[\d\.]{7,}) (?P<browser>.*) (?P<refUrl>[\S]*) (?P<status>[\d]*)"

regex = re.compile(pattern)

ops = {"datetime": lambda datestr: datetime.datetime.strptime(datestr, "%Y-%m-%d %H:%M:%S"),  "status":int }

with open("sample.log") as f:

    for line in f:

        data = {k: ops.get(k, lambda x: x)(v) for k,v in regex.match(line).groupdict().items()}

        print(data)

3.生产器方式产生数据

日志数据通常量比较大,如果不采用惰性求值(生成器),对服务器的内存和CPU都会造成较大的压力,还有可能把服务器给搞瘫掉,比较危险。生成器的应用范围非常广,而且也很简单,是一个必须掌握的基本功。

import re

import datetime

pattern = "(?P<datetime>[\d\- :]{19}) (?P<destIP>[\d\.]*) (?P<method>\S*) (?P<url>\S*) - (?P<port>\d*) - " \

          "(?P<SrcIP>[\d\.]{7,}) (?P<browser>.*) (?P<refUrl>[\S]*) (?P<status>[\d]*)"

regex = re.compile(pattern)

ops = {"datetime": lambda datestr: datetime.datetime.strptime(datestr, "%Y-%m-%d %H:%M:%S"), "status": int}

def extract(file):

    with open(file) as f:

        for line in f:

            data = regex.match(line)

            if data:    # some line in log file return None

                yield {k: ops.get(k, lambda x: x)(v) for k, v in data.groupdict().items()}

if __name__ == "__main__":

    src = extract("sample.log")

    for data in src:

        print(data)

4.窗口函数

在分析日志的时候,一般是隔一段时间输出前一个统计窗口的统计信息。比如,每隔5秒钟输出前面1分钟内网站的访问量,网站访问的状态统计等。一般间隔(interval,本例是5秒)和窗口的宽带(width,本例是1分钟,即60秒)。这种情况下,每次只需要对一个窗口宽度(width)的数据进行处理,处理之后,要把最老的interval的数据扔掉,补充新的interval的数据,进行下一次统计。

这个过程在“BeautifulSoulpy的时间窗口函数实现(https://www.jianshu.com/p/c9e8d3d9a33f)”讲的比较清楚。

代码:

import re

import datetime

pattern = "(?P<datetime>[\d\- :]{19}) (?P<destIP>[\d\.]*) (?P<method>\S*) (?P<url>\S*) - (?P<port>\d*) - " \

          "(?P<SrcIP>[\d\.]{7,}) (?P<browser>.*) (?P<refUrl>[\S]*) (?P<status>[\d]*)"

regex = re.compile(pattern)

ops = {"datetime": lambda datestr: datetime.datetime.strptime(datestr, "%Y-%m-%d %H:%M:%S"), "status": int}

def extract(file):

    with open(file) as f:

        for line in f:

            data = regex.match(line)

            if data:    # some line in log file return None

                yield {k: ops.get(k, lambda x: x)(v) for k, v in data.groupdict().items()}

def windows(src, handler, width, interval):

    start = datetime.datetime.strptime("2000-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")

    current = start

    delta = datetime.timedelta(seconds= width - interval)

    buf = []

    while True:

        try:

            data = next(src)

        except StopIteration:

            break

        if data:

            current = data['datetime']

            buf.append(data)

        if (current-start).total_seconds() >= interval:

            handler(buf)

            start = current

            buf = [x for x in buf if x['datetime']>=(current-delta)]

def dataprint(buffer):  # handler that just print data

    print("*" * 80)

    for data in buffer:

        print(data)

def datetimeprint(buffer):

    print("*" * 80)

    for data in buffer:

        print(data['datetime'])

def statusinfo(buffer):

    print("*" * 80)

    status =[x['status'] for x in buffer]

    print( {x: status.count(x)/len(status) for x in set(status)})

if __name__ == "__main__":

    src = extract("sample.log")

    # windows(src, dataprint, 10, 5)

    # windows(src, datetimeprint, 10, 5)

    windows(src, statusinfo, 10, 5)

5.队列、线程和分发器

日志分析中,会有多种需求,各种需求的width、interval和具体输出的内容不同。需要多个具体的处理函数,主程序要负责调度。这个就是分发器的由来。这里,分发器要做的是对每个统计需求都新产生一个线程,并把提取处理的日志信息发送到该线程对应的数据队列。分发器本身就是一个独立的讲课内容,wayne老师的课程中有专门的一个案例。

代码:

import re

import datetime

from queue import Queue

from threading import Thread

pattern = "(?P<datetime>[\d\- :]{19}) (?P<destIP>[\d\.]*) (?P<method>\S*) (?P<url>\S*) - (?P<port>\d*) - " \

          "(?P<SrcIP>[\d\.]{7,}) (?P<browser>.*) (?P<refUrl>[\S]*) (?P<status>[\d]*)"

regex = re.compile(pattern)

ops = {"datetime": lambda datestr: datetime.datetime.strptime(datestr, "%Y-%m-%d %H:%M:%S"), "status": int}

def extract(file):

    with open(file) as f:

        for line in f:

            data = regex.match(line)

            if data:  # some line in log file return None

                yield {k: ops.get(k, lambda x: x)(v) for k, v in data.groupdict().items()}

def windows(src, handler, width, interval):

    start = datetime.datetime.strptime("2000-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")

    current = start

    delta = datetime.timedelta(seconds=width - interval)

    buf = []

    while True:

        # try:

        #    data = next(src)

        # except StopIteration:

        #    break

        data = src.get()

        if data:

            current = data['datetime']

            buf.append(data)

        if (current - start).total_seconds() >= interval:

            handler(buf)

            start = current

            buf = [x for x in buf if x['datetime'] >= (current - delta)]

def dataprint(buffer):  # handler that just print data

    print("*" * 80)

    for data in buffer:

        print(data)

def datetimeprint(buffer):

    print("*" * 80)

    for data in buffer:

        print(data['datetime'])

def statusinfo(buffer):

    print("*" * 80)

    status = [x['status'] for x in buffer]

    print({x: status.count(x) / len(status) for x in set(status)})

def dispatcher(src):

    qs = []

    handlers = []

    def reg(handler, width, interval):

        q = Queue()

        qs.append(q)

        t = Thread(target=windows, args=(q, handler, width, interval), name=str(handler))

        handlers.append(t)

    def run():

        for t in handlers:

            t.start()

        for data in src:

            for q in qs:

                q.put(data)

    return reg, run

if __name__ == "__main__":

    src = extract("20170224.log")

    # windows(src, dataprint, 10, 5)

    # windows(src, datetimeprint, 10, 5)

    # windows(src, statusinfo, 10, 5)

    reg, run = dispatcher(src)

    reg(statusinfo, 10, 5)

    reg(datetimeprint, 10, 5)

    run()

6.用Logging输出信息

日志分析有多种展示形式,可以图形化、可以把存数据库,当然最简单的是直接打印。由于print语句不是线程安全的,因此如果适用print预计,各个线程输出的信息可能会混在一行中,造成数据混乱。模块logging提供了线程安全的输出方式。只要是牵涉到多线程、多进程,都会用到logging模块,也是必须掌握的基本功。

代码:

import re

import datetime

from queue import Queue

from threading import Thread

import logging

logformat="%(threadName)s:  %(message)s"

logging.basicConfig(format=logformat, level=logging.INFO)

pattern = "(?P<datetime>[\d\- :]{19}) (?P<destIP>[\d\.]*) (?P<method>\S*) (?P<url>\S*) - (?P<port>\d*) - " \

          "(?P<SrcIP>[\d\.]{7,}) (?P<browser>.*) (?P<refUrl>[\S]*) (?P<status>[\d]*)"

regex = re.compile(pattern)

ops = {"datetime": lambda datestr: datetime.datetime.strptime(datestr, "%Y-%m-%d %H:%M:%S"), "status": int}

def extract(file):

    with open(file) as f:

        for line in f:

            data = regex.match(line)

            if data:  # some line in log file return None

                yield {k: ops.get(k, lambda x: x)(v) for k, v in data.groupdict().items()}

def windows(src, handler, width, interval):

    start = datetime.datetime.strptime("2000-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")

    current = start

    delta = datetime.timedelta(seconds=width - interval)

    buf = []

    while True:

        # try:

        #    data = next(src)

        # except StopIteration:

        #    break

        data = src.get()

        if data:

            current = data['datetime']

            buf.append(data)

        if (current - start).total_seconds() >= interval:

            handler(buf)

            start = current

            buf = [x for x in buf if x['datetime'] >= (current - delta)]

def dataprint(buffer):  # handler that just print data

    # print("*" * 80)

    for data in buffer:

        logging.info("{}".format(str(data)))

def datetimeprint(buffer):

    for data in buffer:

        logging.info("{}".format(data['datetime']))

def statusinfo(buffer):

    status = [x['status'] for x in buffer]

    # print({x: status.count(x) / len(status) for x in set(status)})

    logging.info("{}".format({x: status.count(x) / len(status) for x in set(status)}))

def dispatcher(src):

    qs = []

    handlers = []

    def reg(handler, width, interval, name):

        q = Queue()

        qs.append(q)

        t = Thread(target=windows, args=(q, handler, width, interval),name=name)

        handlers.append(t)

    def run():

        for t in handlers:

            t.start()

        for data in src:

            for q in qs:

                q.put(data)

    return reg, run

if __name__ == "__main__":

    src = extract("20170224.log")

    # windows(src, dataprint, 10, 5)

    # windows(src, datetimeprint, 10, 5)

    # windows(src, statusinfo, 10, 5)

    reg, run = dispatcher(src)

    reg(statusinfo, 10, 5, 'statusThread')

    reg(datetimeprint, 10, 5, 'dateprintThread')

    run()

7.通用的文件装载

日志文件可能是一个日志目录下面还有子目录,子目录中有日志文件,也有其他文件,如果只选择特定目录下面特定文件名后缀的日志文件来处理,也有比较通用的办法,这个在“采蘑菇的下午茶”有详细的介绍。

代码:

import re

import datetime

from queue import Queue

from threading import Thread

import logging

from pathlib import Path

logformat="%(threadName)s:  %(message)s"

logging.basicConfig(format=logformat, level=logging.INFO)

pattern = "(?P<datetime>[\d\- :]{19}) (?P<destIP>[\d\.]*) (?P<method>\S*) (?P<url>\S*) - (?P<port>\d*) - " \

          "(?P<SrcIP>[\d\.]{7,}) (?P<browser>.*) (?P<refUrl>[\S]*) (?P<status>[\d]*)"

regex = re.compile(pattern)

ops = {"datetime": lambda datestr: datetime.datetime.strptime(datestr, "%Y-%m-%d %H:%M:%S"), "status": int}

def loadfile(filename, encoding):

    # print(filename)

    with open(filename, encoding=encoding) as f:

        for line in f:

            data = regex.match(line)

            if data:  # some line in log file return None

                yield {k: ops.get(k, lambda x: x)(v) for k, v in data.groupdict().items()}

def windows(src, handler, width, interval):

    start = datetime.datetime.strptime("2000-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")

    current = start

    delta = datetime.timedelta(seconds=width - interval)

    buf = []

    while True:

        # try:

        #    data = next(src)

        # except StopIteration:

        #    break

        data = src.get()

        if data:

            current = data['datetime']

            buf.append(data)

        if (current - start).total_seconds() >= interval:

            handler(buf)

            start = current

            buf = [x for x in buf if x['datetime'] >= (current - delta)]

def dataprint(buffer):  # handler that just print data

    # print("*" * 80)

    for data in buffer:

        logging.info("{}".format(str(data)))

def datetimeprint(buffer):

    for data in buffer:

        logging.info("{}".format(data['datetime']))

def statusinfo(buffer):

    status = [x['status'] for x in buffer]

    # print({x: status.count(x) / len(status) for x in set(status)})

    logging.info("{}".format({x: status.count(x) / len(status) for x in set(status)}))

def dispatcher(src):

    qs = []

    handlers = []

    def reg(handler, width, interval, name):

        q = Queue()

        qs.append(q)

        t = Thread(target=windows, args=(q, handler, width, interval),name=name)

        handlers.append(t)

    def run():

        for t in handlers:

            t.start()

        for data in src:

            for q in qs:

                q.put(data)

    return reg, run

def load(*args, ext="*.log",recursive=False, encoding='utf-8'):

    for pf in args:

        print(pf)

        p = Path(pf)

        if p.exists():

            if p.is_dir():

                if type:

                    if isinstance(ext,str):

                        ext = [ext]

                    else:

                        ext = list(ext)

                    for s in ext:

                        fl = p.rglob(s) if recursive else p.glob(s)

                        for f in fl:

                            yield from loadfile(str(f.absolute()), encoding=encoding)

            elif p.is_file():

                yield from loadfile(str(p.absolute()), encoding=encoding)

if __name__ == "__main__":

    src = load("sample.log")

    # src = load('.')

    # windows(src, dataprint, 10, 5)

    # windows(src, datetimeprint, 10, 5)

    # windows(src, statusinfo, 10, 5)

    reg, run = dispatcher(src)

    reg(statusinfo, 10, 5, 'statusThread')

    reg(datetimeprint, 10, 5, 'dateprintThread')

    run()

说明

Wayne老师在教学视频里面多次强调敲代码的重要性。一套代码,看看好像懂了,把讲义合上,有很多的语句可能有敲不出来了。要真正理解一个概念,最简单的标准就是:不看讲义,自己能够从头到尾把代码敲出来并且能够编译通过,这样对于相关的知识才算是有了真正的理解。

现在网络上的python教学课程非常多。学习的关键是找一套合适自己的资料,腾讯课堂里面的马哥教育针对不同的学习方向有不同的python教程,好像都是Wayne老师在讲,根据不同的要求对教程进行了取舍,很棒。

资料来源:

1. 马哥教育wayne老师教学视频:Wayne老师的课程链接https://ke.qq.com/course/134017?taid=537476502522753

2. “采蘑菇的下午茶”的《Python学习之 ---日志分析+数据分发与分析+多线程+queue模块+日志分析综合》:https://blog.csdn.net/qq_40498551/article/details/90181774

3. BeautifulSoulpy的时间窗口函数实现:https://www.jianshu.com/p/c9e8d3d9a33f

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,377评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,390评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,967评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,344评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,441评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,492评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,497评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,274评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,732评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,008评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,184评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,837评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,520评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,156评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,407评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,056评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,074评论 2 352

推荐阅读更多精彩内容

  • datetime是Python处理日期和时间的标准库。 获取当前日期和时间 我们先看如何获取当前日期和时间: >>...
    jbb_43b0阅读 1,011评论 0 0
  • 不支持上传文件,所以就复制过来了。作者信息什么的都没删。对前端基本属于一窍不通,所以没有任何修改,反正用着没问题就...
    全栈在路上阅读 1,955评论 0 2
  • 学习目标: 1.了解和学习shapely和geopandas的基本功能,掌握用python中的这两个库实现几何对象...
    1598903c9dd7阅读 442评论 0 0
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,644评论 18 139
  • 我是黑夜里大雨纷飞的人啊 1 “又到一年六月,有人笑有人哭,有人欢乐有人忧愁,有人惊喜有人失落,有的觉得收获满满有...
    陌忘宇阅读 8,532评论 28 53