Python-logging实现分布式日志


title: Python实现分布式日志
date: 2019-06-25 08:55:44
tags:

  • python
  • logging
    categories:
  • 开发
  • python
  • logging

Python实现分布式日志

分布式日志初探

​ 在写分布式爬虫过程中,需要打印一些关键性日志,但是程序是分布在各个机器上,这样不便于我们程序的日志的统计,以及错误代码的查看;

​ 所以我查看了关于logging的教程,显示logging的handler存在以下几种:

StreamHandler:logging.StreamHandler;日志输出到流,可以是sys.stderr,sys.stdout或者文件
FileHandler:logging.FileHandler;日志输出到文件
BaseRotatingHandler:logging.handlers.BaseRotatingHandler;基本的日志回滚方式
RotatingHandler:logging.handlers.RotatingHandler;日志回滚方式,支持日志文件最大数量和日志文件回滚
TimeRotatingHandler:logging.handlers.TimeRotatingHandler;日志回滚方式,在一定时间区域内回滚日志文件
SocketHandler:logging.handlers.SocketHandler;远程输出日志到TCP/IP sockets
DatagramHandler:logging.handlers.DatagramHandler;远程输出日志到UDP sockets
SMTPHandler:logging.handlers.SMTPHandler;远程输出日志到邮件地址
SysLogHandler:logging.handlers.SysLogHandler;日志输出到syslog
NTEventLogHandler:logging.handlers.NTEventLogHandler;远程输出日志到Windows NT/2000/XP的事件日志
MemoryHandler:logging.handlers.MemoryHandler;日志输出到内存中的指定buffer
HTTPHandler:logging.handlers.HTTPHandler;通过"GET"或者"POST"远程输出到HTTP服务器

其中我特别注意到的handler是SocketHandler, handler可以实现socket发送日志;

​ 而联想到使用es来存取日志,本来是想用es api来接收日志(这样就可以使用HTTPHandler),但是http毕竟是上层协议,发送可能会慢,所以继续找到了logstash, 作为ELK的一员,我们可以先发送到logstash再转发到es,接下来就查看logstash是否有python开源包,接下来就发现了python-logstash。

安装并运行logstash

  • 安装java
  • 进入官网下载安装包
  • 上传服务器并解压(这里使用的是6.3.2)
tar -zxvf logstash-6.3.2.tar.gz
  • 编写logstash的配置文件
input {                                                                           
    udp {                                                                         
       port => 5959                                                              
       codec => json                                                             
    }                                                                             
}                                                                                 
                                                                                  
output {                                                                          
    elasticsearch{                                                                
       hosts => ["192.168.1.17:9200", "192.168.1.18:9200", "192.168.1.19:9200"]  
       index => "crawler-%{+YYYY.MM.dd}"                                         
    }                                                                             
    stdout{                                                                       
       codec => rubydebug                                                        
    }                                                                             
}                                                     
  • 运行logstash
logstash -f logstash.conf
  • 截图
logstash运行成功.jpg

使用python-logstash

代码如下:

import logging
import logstash
import sys

host = 'localhost'

test_logger = logging.getLogger('python-logstash-logger')
test_logger.setLevel(logging.INFO)
test_logger.addHandler(logstash.LogstashHandler(host, 5959, version=1))

extra = {
    'test_string': 'python version: ' + repr(sys.version_info),
    'test_boolean': True,
    'test_dict': {'a': 1, 'b': 'c'},
    'test_float': 1.23,
    'test_integer': 123,
    'test_list': [1, 2, '3'],
}
test_logger.info('python-logstash: test extra fields', extra=extra)

运行后logstash显示:


python-logstash.png

查看es中是否接收到数据:


log-es.png

以上,我们基本的分布式日志已经发送测试成功;

深入研究logging的日志

​ 第三方库中的日志格式并不满足我们的需求,所以我们可以对其进行修改(借鉴其源码):

源码:详细代码

class LogstashFormatterBase(logging.Formatter):

    def __init__(self, message_type='Logstash', tags=None, fqdn=False):
        pass
    def get_extra_fields(self, record):
        pass

    def get_debug_fields(self, record):
        pass

    @classmethod
    def format_source(cls, message_type, host, path):
        pass

    @classmethod
    def format_timestamp(cls, time):
        pass

    @classmethod
    def format_exception(cls, exc_info):
        pass

    @classmethod
    def serialize(cls, message):
        pass

这里我们发现其继承了Formatter;查看Logging官方文档可以得知,实现自定义format只要实现其format方法就可以了,第一次我直接继承了python-logstash的LogstashFormatterVersion1:

class LogstashFormat(LogstashFormatterVersion1):

    def format(self, record):
        message = {
            '@timestamp': self.format_timestamp(record.created),
            '@message': record.getMessage(),
            'log_level': record.levelname,
            'log_file': record.filename,
            'line_no': record.lineno,
            'host': self.host,
        }
        message.update(self.get_extra_fields(record))

        return self.serialize(message)

    @classmethod
    def format_timestamp(cls, time):
        current_time = datetime.datetime.fromtimestamp(time)
        return ''.join([current_time.strftime("%Y-%m-%dT%H:%M:%S"),
                        ".%03d"%(current_time.microsecond / 1000),
                        "Z"])
 
def init_logstash_logger(level, host, port):
    logger = logging.getLogger('python-logstash-logger')
    logger.setLevel(LEVELS.get(level))
    logstash_format = LogstashFormat()
    logstash_handler = LogstashHandler(host, port)
    logstash_handler.setFormatter(logstash_format)
    logger.addHandler(logstash_handler)
    return logger

​ 修改了其对时间的处理,以及format返回的处理,这里我们实现一个初始化方法进行测试;

执行:

exec_init.png

结果:

exec_logstash.png

以上我们就可以自定义,自己的代码发送到logstash上了。

es会对上传的日志进行日期建立索引

es_index.png

提供发送日志到kafka源码:

要求: python3.7

库: confluent-kafka-python

import datetime                                                                        
import json                                                                            
import logging                                                                         
                                                                                       
from logging import Formatter                                                          
from logging.handlers import QueueHandler                                              
from confluent_kafka import Producer                                                   
                                                                                       
                                                                                       
produce = Producer({'bootstrap.servers': '192.168.11.5:9092'})                         
                                                                                       
class KafkaQueueHandler(QueueHandler):                                                 
                                                                                       
    def emit(self, record):                                                            
    ┆   self.queue.poll(0)                                                             
    ┆   self.queue.produce('test', self.format(record), callback=self.delivery_report) 
    ┆   self.queue.flush()                                                             
                                                                                       
    def delivery_report(self, err, msg):                                               
    ┆   if err is not None:                                                            
    ┆   ┆   print('message:%s', err)                                                   
    ┆   else:                                                                          
    ┆   ┆   print('message deliverd %s', msg.topic())                                  
                                                                                       
                                                                                       
class QueueFormat(Formatter):                                                          
                                                                                       
    def __init__(self, topic):                                                         
        self.topic = topic                                                             
    ┆   super(QueueFormat, self).__init__()                                            
                                                                                       
    def get_extra_fields(self, record):                                                
    ┆   skip_list = (                                                                  
    ┆   ┆   'args', 'asctime', 'created', 'exc_info', 'exc_text', 'filename',          
    ┆   ┆   'funcName', 'id', 'levelname', 'levelno', 'lineno', 'module',              
    ┆   ┆   'msecs', 'msecs', 'message', 'msg', 'name', 'pathname', 'process',         
    ┆   ┆   'processName', 'relativeCreated', 'thread', 'threadName', 'extra',         
    ┆   ┆   'auth_token', 'password'                                                   
    ┆   )                                                                              
    ┆   easy_types = (str, bool, dict, float, int, list, type(None))                   
                                                                                       
    ┆   fields = {}                                                                    
    ┆   for key, value in record.__dict__.items():                                     
    ┆   ┆   if key not in skip_list:                                                   
    ┆   ┆   ┆   if isinstance(value, easy_types):                                      
    ┆   ┆   ┆   ┆   fields[key] = value                                                
    ┆   ┆   ┆   else:                                                                  
    ┆   ┆   ┆   ┆   fields[key] = repr(value)                                          
                                                                                       
    ┆   return fields                                                                  
                                                                                       
    @classmethod                                                                       
    def format_timestamp(cls, time):                                                   
    ┆   current_time = datetime.datetime.fromtimestamp(time)                           
    ┆   return ''.join([current_time.strftime("%Y-%m-%dT%H:%M:%S"),                    
    ┆   ┆   ┆   ┆   ┆   ".%03d"%(current_time.microsecond / 1000),                     
    ┆   ┆   ┆   ┆   ┆   "Z"])                                                          
                          @classmethod                                                                    
    def format_exception(cls, exc_info):                                            
    ┆   return ''.join(traceback.format_exception(*exc_info)) if exc_info else ''   
                                                                                    
    @classmethod                                                                    
    def serialize(cls, message):                                                    
    ┆   return bytes(json.dumps(message), 'utf-8')                                  
                                                                                    
    def format(self, record):                                                       
    ┆   message = {                                                                 
    ┆   ┆   '@timestamp': self.format_timestamp(record.created),                    
    ┆   ┆   '@message': record.getMessage(),                                        
    ┆   ┆   'log_level': record.levelname,                                          
    ┆   ┆   'log_file': record.filename,                                            
    ┆   ┆   'line_no': record.lineno,                                               
    ┆   ┆   'topic': self.topic                                                     
    ┆   }                                                                           
    ┆   message.update(self.get_extra_fields(record))                               
    ┆   return self.serialize(message)                                              
                                                                                    
logger = logging.getLogger('kafka')                                                 
kafka_handle = KafkaQueueHandler(produce)                                           
log_format = QueueFormat('test')                                                    
kafka_handle.setFormatter(log_format)                                               
                                                                                    
logger.setLevel(logging.DEBUG)                                                      
logger.addHandler(kafka_handle)                                                     
logger.info('aaaa')                                                         

向kafka发送日志后,可以使用logstash的插件从kafka中读取日志,发送到es。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,907评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,987评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,298评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,586评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,633评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,488评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,275评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,176评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,619评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,819评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,932评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,655评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,265评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,871评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,994评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,095评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,884评论 2 354

推荐阅读更多精彩内容