loguru相信玩过的玩家都觉得很爽,这里我们简单实现一种日志拦截(过滤)器,可用于对特定级别日志进行监控并去执行你想做的事情,适用于异步和非异步场景
不啰嗦,我们直接贴代码,喜欢的可以根据例子进行改造,希望帮到各位:
# -*- coding: utf-8 -*-
import sys
import queue
import asyncio
import loguru
from threading import Thread
from loguru._handler import Message
from loguru._logger import Level, Core
class InterceptSink:
def __init__(self,
callback,
level,
format=True,
max_submit_num=50,
min_submit_num=1,
buffer_size=0,
buffer_block=False,
**kwargs
):
"""
callback: intercept log execute
level: intercept log level
format: True if you need to format, like this:
`
2021-03-15T22:25:34.568938+0800
elapsed: 0:00:00.009447
exception: None
extra: {}
file: sink.py
function: <module>
level: ERROR
line: 116
message: 这是条错误日志
module: sink
name: __main__
process: 88795
thread: 4491730240
time: 2021-03-15T22:25:34.568938+0800
`
buffer_size: intercept log maximum water level
min_submit_num: Minimum number of logs per commit
max_submit_num: Maximum number of logs per commit
"""
self._max_submit_num = max_submit_num
self._min_submit_num = min_submit_num
self._format = format
self._callback = callback
self._buffer = queue.Queue(buffer_size)
self._buffer_size = buffer_size
self._buffer_block = buffer_block
if isinstance(level, Level):
self._level = level
assert level.upper() in ["DEBUG", "INFO", "WARNING", "ERROR", "TRACE", "SUCCESS", "CRITICAL"]
self._level = Core().levels[level.upper()]
self._worker = Thread(target=self._queued_executer, daemon=True)
self._running = True
self._worker.start()
def stop(self):
self._running = False
self._worker.join(10)
def write(self, message: Message):
if message.record['level'].no >= self._level.no:
if self._format:
line = [f' {key}: {val}' for key, val in message.record.items()]
line.insert(0, f'{message.record["time"]}')
record = '\n'.join(line)
else:
record = message
try:
self._buffer.put(record)
except queue.Full as _:
sys.stderr.write(f'{self.__class__.__name__} Log queued writer overflow: {self._buffer_size}')
def _queued_executer(self):
while self._running:
messages = []
while True:
try:
message = self._buffer.get(block=True, timeout=1)
messages.append(message)
if len(messages) >= self._max_submit_num:
break
except queue.Empty as _:
break
if (messages and len(messages) >= self._min_submit_num) or (messages and not self._running):
if asyncio.iscoroutinefunction(self._callback):
asyncio.run(self._callback(messages))
else:
self._callback(messages)
if __name__ == '__main__':
async def async_handle(messages):
for msg in messages:
print('这是异步打印的\n'+ msg)
def sync_handle(messages):
for msg in messages:
print('这是同步打印的\n'+ msg)
loguru.logger.add(
InterceptSink(
async_handle,
level='ERROR'
)
)
loguru.logger.add(
InterceptSink(
sync_handle,
level='ERROR'
)
)
loguru.logger.error('这是条错误日志')