title: Python实现分布式日志
date: 2019-06-25 08:55:44
tags:
- python
- logging
categories: - 开发
- python
- logging
Python实现分布式日志
分布式日志初探
在写分布式爬虫过程中,需要打印一些关键性日志,但是程序是分布在各个机器上,这样不便于我们程序的日志的统计,以及错误代码的查看;
所以我查看了关于logging的教程,显示logging的handler存在以下几种:
StreamHandler:logging.StreamHandler;日志输出到流,可以是sys.stderr,sys.stdout或者文件
FileHandler:logging.FileHandler;日志输出到文件
BaseRotatingHandler:logging.handlers.BaseRotatingHandler;基本的日志回滚方式
RotatingHandler:logging.handlers.RotatingHandler;日志回滚方式,支持日志文件最大数量和日志文件回滚
TimeRotatingHandler:logging.handlers.TimeRotatingHandler;日志回滚方式,在一定时间区域内回滚日志文件
SocketHandler:logging.handlers.SocketHandler;远程输出日志到TCP/IP sockets
DatagramHandler:logging.handlers.DatagramHandler;远程输出日志到UDP sockets
SMTPHandler:logging.handlers.SMTPHandler;远程输出日志到邮件地址
SysLogHandler:logging.handlers.SysLogHandler;日志输出到syslog
NTEventLogHandler:logging.handlers.NTEventLogHandler;远程输出日志到Windows NT/2000/XP的事件日志
MemoryHandler:logging.handlers.MemoryHandler;日志输出到内存中的指定buffer
HTTPHandler:logging.handlers.HTTPHandler;通过"GET"或者"POST"远程输出到HTTP服务器
其中我特别注意到的handler是SocketHandler, handler可以实现socket发送日志;
而联想到使用es来存取日志,本来是想用es api来接收日志(这样就可以使用HTTPHandler),但是http毕竟是上层协议,发送可能会慢,所以继续找到了logstash, 作为ELK的一员,我们可以先发送到logstash再转发到es,接下来就查看logstash是否有python开源包,接下来就发现了python-logstash。
安装并运行logstash
- 安装java
- 进入官网下载安装包
- 上传服务器并解压(这里使用的是6.3.2)
tar -zxvf logstash-6.3.2.tar.gz
- 编写logstash的配置文件
input {
udp {
port => 5959
codec => json
}
}
output {
elasticsearch{
hosts => ["192.168.1.17:9200", "192.168.1.18:9200", "192.168.1.19:9200"]
index => "crawler-%{+YYYY.MM.dd}"
}
stdout{
codec => rubydebug
}
}
- 运行logstash
logstash -f logstash.conf
- 截图
使用python-logstash
代码如下:
import logging
import logstash
import sys
host = 'localhost'
test_logger = logging.getLogger('python-logstash-logger')
test_logger.setLevel(logging.INFO)
test_logger.addHandler(logstash.LogstashHandler(host, 5959, version=1))
extra = {
'test_string': 'python version: ' + repr(sys.version_info),
'test_boolean': True,
'test_dict': {'a': 1, 'b': 'c'},
'test_float': 1.23,
'test_integer': 123,
'test_list': [1, 2, '3'],
}
test_logger.info('python-logstash: test extra fields', extra=extra)
运行后logstash显示:
查看es中是否接收到数据:
以上,我们基本的分布式日志已经发送测试成功;
深入研究logging的日志
第三方库中的日志格式并不满足我们的需求,所以我们可以对其进行修改(借鉴其源码):
源码:详细代码
class LogstashFormatterBase(logging.Formatter):
def __init__(self, message_type='Logstash', tags=None, fqdn=False):
pass
def get_extra_fields(self, record):
pass
def get_debug_fields(self, record):
pass
@classmethod
def format_source(cls, message_type, host, path):
pass
@classmethod
def format_timestamp(cls, time):
pass
@classmethod
def format_exception(cls, exc_info):
pass
@classmethod
def serialize(cls, message):
pass
这里我们发现其继承了Formatter;查看Logging官方文档可以得知,实现自定义format只要实现其format方法就可以了,第一次我直接继承了python-logstash的LogstashFormatterVersion1:
class LogstashFormat(LogstashFormatterVersion1):
def format(self, record):
message = {
'@timestamp': self.format_timestamp(record.created),
'@message': record.getMessage(),
'log_level': record.levelname,
'log_file': record.filename,
'line_no': record.lineno,
'host': self.host,
}
message.update(self.get_extra_fields(record))
return self.serialize(message)
@classmethod
def format_timestamp(cls, time):
current_time = datetime.datetime.fromtimestamp(time)
return ''.join([current_time.strftime("%Y-%m-%dT%H:%M:%S"),
".%03d"%(current_time.microsecond / 1000),
"Z"])
def init_logstash_logger(level, host, port):
logger = logging.getLogger('python-logstash-logger')
logger.setLevel(LEVELS.get(level))
logstash_format = LogstashFormat()
logstash_handler = LogstashHandler(host, port)
logstash_handler.setFormatter(logstash_format)
logger.addHandler(logstash_handler)
return logger
修改了其对时间的处理,以及format返回的处理,这里我们实现一个初始化方法进行测试;
执行:
结果:
以上我们就可以自定义,自己的代码发送到logstash上了。
es会对上传的日志进行日期建立索引
提供发送日志到kafka源码:
要求: python3.7
库: confluent-kafka-python
import datetime
import json
import logging
from logging import Formatter
from logging.handlers import QueueHandler
from confluent_kafka import Producer
produce = Producer({'bootstrap.servers': '192.168.11.5:9092'})
class KafkaQueueHandler(QueueHandler):
def emit(self, record):
┆ self.queue.poll(0)
┆ self.queue.produce('test', self.format(record), callback=self.delivery_report)
┆ self.queue.flush()
def delivery_report(self, err, msg):
┆ if err is not None:
┆ ┆ print('message:%s', err)
┆ else:
┆ ┆ print('message deliverd %s', msg.topic())
class QueueFormat(Formatter):
def __init__(self, topic):
self.topic = topic
┆ super(QueueFormat, self).__init__()
def get_extra_fields(self, record):
┆ skip_list = (
┆ ┆ 'args', 'asctime', 'created', 'exc_info', 'exc_text', 'filename',
┆ ┆ 'funcName', 'id', 'levelname', 'levelno', 'lineno', 'module',
┆ ┆ 'msecs', 'msecs', 'message', 'msg', 'name', 'pathname', 'process',
┆ ┆ 'processName', 'relativeCreated', 'thread', 'threadName', 'extra',
┆ ┆ 'auth_token', 'password'
┆ )
┆ easy_types = (str, bool, dict, float, int, list, type(None))
┆ fields = {}
┆ for key, value in record.__dict__.items():
┆ ┆ if key not in skip_list:
┆ ┆ ┆ if isinstance(value, easy_types):
┆ ┆ ┆ ┆ fields[key] = value
┆ ┆ ┆ else:
┆ ┆ ┆ ┆ fields[key] = repr(value)
┆ return fields
@classmethod
def format_timestamp(cls, time):
┆ current_time = datetime.datetime.fromtimestamp(time)
┆ return ''.join([current_time.strftime("%Y-%m-%dT%H:%M:%S"),
┆ ┆ ┆ ┆ ┆ ".%03d"%(current_time.microsecond / 1000),
┆ ┆ ┆ ┆ ┆ "Z"])
@classmethod
def format_exception(cls, exc_info):
┆ return ''.join(traceback.format_exception(*exc_info)) if exc_info else ''
@classmethod
def serialize(cls, message):
┆ return bytes(json.dumps(message), 'utf-8')
def format(self, record):
┆ message = {
┆ ┆ '@timestamp': self.format_timestamp(record.created),
┆ ┆ '@message': record.getMessage(),
┆ ┆ 'log_level': record.levelname,
┆ ┆ 'log_file': record.filename,
┆ ┆ 'line_no': record.lineno,
┆ ┆ 'topic': self.topic
┆ }
┆ message.update(self.get_extra_fields(record))
┆ return self.serialize(message)
logger = logging.getLogger('kafka')
kafka_handle = KafkaQueueHandler(produce)
log_format = QueueFormat('test')
kafka_handle.setFormatter(log_format)
logger.setLevel(logging.DEBUG)
logger.addHandler(kafka_handle)
logger.info('aaaa')
向kafka发送日志后,可以使用logstash的插件从kafka中读取日志,发送到es。