扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
本篇内容介绍了“怎么用Python实现强大的 logging 模块”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
目前成都创新互联已为千余家的企业提供了网站建设、域名、雅安服务器托管、成都网站托管、企业网站设计、云浮网站维护等服务,公司将坚持客户导向、应用为本的策略,正道将秉承"和谐、参与、激情"的文化,与客户和合作伙伴齐心协力一起成长,共同发展。
Python 的 logging 模块实现了灵活的日志系统。整个模块仅仅 3 个类,不到 5000 行代码的样子,学习它可以加深对程序日志的了解,本文分下面几个部分:
logging 简介
logging API 设计
记录器对象 Logger
日志记录对象 LogRecord
处理器对象 Hander
格式器对象 Formatter
滚动日志文件处理器
小结
小技巧
本次代码使用的是 python 3.8.5
的版本,官方中文文档 3.8.8
。参考链接中官方中文文档非常详细,建议先看一遍了解日志使用。
类 | 功能 |
---|---|
logging-module | logging的API |
Logger | 日志记录器对象类,可以创建一个对象用来记录日志 |
LogRecord | 日志记录对象,每条日志记录都封装成一个日志记录对象 |
Hander | 处理器对象,负责日志输出到流/文件的控制 |
Formatter | 格式器,负责日志记录的格式化 |
RotatingFileHandler | 按大小滚动的日志文件记录器 |
TimedRotatingFileHandler | 按时间滚动的日志文件处理器 |
我们主要研究日志如何输出到标准窗口这一主线;日志的配置,日志的线程安全及各种特别的Handler等支线可以先忽略。
先看看日志使用:
import logging logging.basicConfig(level=logging.INFO, format='%(levelname)-8s %(name)-10s %(asctime)s %(message)s') lang = {"name": "python", "age":20} logging.info('This is a info message %s', lang) logging.debug('This is a debug message') logging.warning('This is a warning message') logger = logging.getLogger(__name__) logger.warning('This is a warning')
输出内容如下:
INFO root 2021-03-04 00:03:53,473 This is a info message {'name': 'python', 'age': 20} WARNING root 2021-03-04 00:03:53,473 This is a warning message WARNING __main__ 2021-03-04 00:03:53,473 This is a warning
可以看到 logging 的使用非常方便,模块直接提供了一组API。
root = RootLogger(WARNING) # 默认提供的logger Logger.root = root Logger.manager = Manager(Logger.root) def debug(msg, *args, **kwargs): # info,warning等api类似 if len(root.handlers) == 0: basicConfig() # 默认配置 root.debug(msg, *args, **kwargs) def getLogger(name=None): if name: return Logger.manager.getLogger(name) # 创建特定的logger else: return root # 返回默认的logger
这种API的提供方式,我们在 requests 中也有看到。api中很重要的设置config的方式:
def basicConfig(**kwargs): ... if handlers is None: filename = kwargs.pop("filename", None) mode = kwargs.pop("filemode", 'a') if filename: h = FileHandler(filename, mode) else: stream = kwargs.pop("stream", None) h = StreamHandler(stream) # 默认的handler handlers = [h] dfs = kwargs.pop("datefmt", None) style = kwargs.pop("style", '%') fs = kwargs.pop("format", _STYLES[style][1]) fmt = Formatter(fs, dfs, style) # 生成formatter for h in handlers: if h.formatter is None: h.setFormatter(fmt) root.addHandler(h) # 设置root的handler level = kwargs.pop("level", None) if level is not None: root.setLevel(level) # 设置日志级别
可以看到,日志的配置主要包括下面几项:
level 日志级别
format 信息格式化模版
filename 输出到文件
datefmt %Y-%m-%d %H:%M:%S,uuu
时间的格式模版
style [ %
, {
,$] 格式样板
演示代码输出中,可以看到debug日志没有显示,是因为 debug < info
:
CRITICAL = 50 FATAL = CRITICAL ERROR = 40 WARNING = 30 WARN = WARNING INFO = 20 DEBUG = 10 NOTSET = 0
查看Logger之前,先看logger对象的管理类Manager
_loggerClass = Logger class Manager(object): def __init__(self, rootnode): self.root = rootnode self.disable = 0 self.loggerDict = {} # 所有日志记录对象的字典 ... def getLogger(self, name): rv = None if name in self.loggerDict: rv = self.loggerDict[name] # 获取已经创建过的同名logger ... else: rv = (self.loggerClass or _loggerClass)(name) # 创建新的logger rv.manager = self self.loggerDict[name] = rv ... return rv
日志过滤器
class Filterer(object): def __init__(self): self.filters = [] def addFilter(self, filter): self.filters.append(filter) def removeFilter(self, filter): self.filters.remove(filter) def filter(self, record): rv = True for f in self.filters: # 过滤日志 if hasattr(f, 'filter'): result = f.filter(record) else: result = f(record) # assume callable - will raise if not if not result: rv = False break return r
核心的 Logger
实际上只是一个控制中心:
class Logger(Filterer): # logger可以过滤日志 def __init__(self, name, level=NOTSET): Filterer.__init__(self) self.name = name self.level = _checkLevel(level) self.parent = None # 日志可以有层级 self.propagate = True self.handlers = [] # 可以输出到多个handler self.disabled = False # 可以关闭 self._cache = {} def debug(self, msg, *args, **kwargs): # 输出debug日志 if self.isEnabledFor(DEBUG): self._log(DEBUG, msg, args, **kwargs)
logger可以判断日志级别:
def isEnabledFor(self, level): if self.disabled: return False try: return self._cache[level] except KeyError: try: if self.manager.disable >= level: is_enabled = self._cache[level] = False else: is_enabled = self._cache[level] = ( level >= self.getEffectiveLevel() ) return is_enabled def getEffectiveLevel(self): logger = self while logger: if logger.level: return logger.level logger = logger.parent return NOTSET
日志输出:
def _log(self, level, msg, args, exc_info=None, extra=None, stack_info=False, stacklevel=1): ... fn, lno, func = "(unknown file)", 0, "(unknown function)" ... # 生成日志记录 record = self.makeRecord(self.name, level, fn, lno, msg, args, exc_info, func, extra, sinfo) # 使用handler处理日志 self.handle(record)
日志记录的生产,就是创建一个LogRecord对象:
_logRecordFactory = LogRecord def makeRecord(self, name, level, fn, lno, msg, args, exc_info, func=None, extra=None, sinfo=None): ... rv = _logRecordFactory(name, level, fn, lno, msg, args, exc_info, func, sinfo) ... return rv
使用logger对象的所有handler处理日志:
def handle(self, record): c = self found = 0 while c: for hdlr in c.handlers: # 使用所有的handler处理日志 found = found + 1 if record.levelno >= hdlr.level: hdlr.handle(record)
root-logger的handler是在config中配置的:
def basicConfig(**kwargs): ... root.addHandler(h) # 设置root的handler
日志记录对象非常简单:
class LogRecord(object): def __init__(self, name, level, pathname, lineno, msg, args, exc_info, func=None, sinfo=None, **kwargs): ct = time.time() self.name = name # logger名称 self.msg = msg # 日志标识信息 ... self.args = args # 变量 self.levelname = getLevelName(level) ... def getMessage(self): msg = str(self.msg) if self.args: msg = msg % self.args # 格式化消息 return msg
顶级Handler定义了Handler的模版方法
class Handler(Filterer): # 处理器也可以过滤日志 def __init__(self, level=NOTSET): Filterer.__init__(self) self._name = None self.level = _checkLevel(level) # handler也有日志级别 self.formatter = None _addHandlerRef(self) self.createLock() def handle(self, record): # 处理日志 rv = self.filter(record) # 过滤日志 if rv: self.acquire() # 申请锁 try: self.emit(record) # 提交记录,由不同子类实现 finally: self.release() # 释放锁 return rv
默认的console流 StreamHandler
class StreamHandler(Handler): terminator = '\n' # 自动换行 def __init__(self, stream=None): Handler.__init__(self) if stream is None: stream = sys.stderr # 默认使用stderr输出 self.stream = stream def emit(self, record): try: msg = self.format(record) # 格式化日志记录 stream = self.stream stream.write(msg + self.terminator) # 写日志 self.flush() # 刷新写缓存 except Exception: ... def format(self, record): if self.formatter: fmt = self.formatter else: fmt = _defaultFormatter return fmt.format(record) # 使用格式化器格式化日志记录
为什么使用stderr,可以看下面的测试中的输出都是到console:
print("haha") print("fatal error", file=sys.stderr) sys.stderr.write("fatal error\n")
格式化器主要使用Formatter和Style实现
class Formatter(object): def __init__(self, fmt=None, datefmt=None, style='%', validate=True): self._style = _STYLES[style][0](fmt) self._fmt = self._style._fmt self.datefmt = datefmt def format(self, record): record.message = record.getMessage() s = self.formatMessage(record) return s def formatMessage(self, record): return self._style.format(record) # 格式化
Style类
class PercentStyle(object): default_format = '%(message)s' asctime_format = '%(asctime)s' asctime_search = '%(asctime)' validation_pattern = re.compile(r'%\(\w+\)[#0+ -]*(\*|\d+)?(\.(\*|\d+))?[diouxefgcrsa%]', re.I) def __init__(self, fmt): self._fmt = fmt or self.default_format def usesTime(self): return self._fmt.find(self.asctime_search) >= 0 def validate(self): """Validate the input format, ensure it matches the correct style""" if not self.validation_pattern.search(self._fmt): raise ValueError("Invalid format '%s' for '%s' style" % (self._fmt, self.default_format[0])) def _format(self, record): return self._fmt % record.__dict__ # 格式化日志记录对象 def format(self, record): try: return self._format(record) except KeyError as e: raise ValueError('Formatting field not found in record: %s' % e)
线上的日志持续输出到一个文件的话,会让文件巨大,即有加剧了丢失的风险,也难以处理。通常有按照大小滚动或者按照日期滚动的方法,这个功能非常重要。先看滚动日志处理器模版:
class BaseRotatingHandler(logging.FileHandler): def emit(self, record): try: if self.shouldRollover(record): # 判断是否需要滚动 self.doRollover() # 滚动日志 logging.FileHandler.emit(self, record) # 输出日志 except Exception: self.handleError(record) def rotate(self, source, dest): if not callable(self.rotator): if os.path.exists(source): os.rename(source, dest) # 重命名日志文件 else: self.rotator(source, dest)
按照文件大小滚动的处理器:
class RotatingFileHandler(BaseRotatingHandler): def __init__(self, filename, mode='a', maxBytes=0, backupCount=0, encoding=None, delay=False): if maxBytes > 0: mode = 'a' BaseRotatingHandler.__init__(self, filename, mode, encoding, delay) self.maxBytes = maxBytes # 单个文件大小上限 self.backupCount = backupCount # 日志备份数量 def doRollover(self): # 执行滚动 if self.stream: self.stream.close() # 关闭当前的流 self.stream = None if self.backupCount > 0: for i in range(self.backupCount - 1, 0, -1): sfn = self.rotation_filename("%s.%d" % (self.baseFilename, i)) dfn = self.rotation_filename("%s.%d" % (self.baseFilename, i + 1)) if os.path.exists(sfn): if os.path.exists(dfn): os.remove(dfn) os.rename(sfn, dfn) dfn = self.rotation_filename(self.baseFilename + ".1") if os.path.exists(dfn): os.remove(dfn) self.rotate(self.baseFilename, dfn) # 重命名文件 if not self.delay: self.stream = self._open() # 如果shouldRollover延迟,可以打开新的流 def shouldRollover(self, record): # 判断是否需要滚动 if self.stream is None: # 立即打开流 self.stream = self._open() if self.maxBytes > 0: msg = "%s\n" % self.format(record) self.stream.seek(0, 2) #due to non-posix-compliant Windows feature if self.stream.tell() + len(msg) >= self.maxBytes: # 判断大小 return 1 return 0
文件大小滚动就是在记录日志时候判断文档是否超过上限,超过则重命名旧日志,生成新日志。
按照日期滚动的处理器:
class TimedRotatingFileHandler(BaseRotatingHandler): def __init__(self, filename, when='h', interval=1, backupCount=0, encoding=None, delay=False, utc=False, atTime=None): BaseRotatingHandler.__init__(self, filename, 'a', encoding, delay) self.when = when.upper() self.backupCount = backupCount self.utc = utc self.atTime = atTime # 日期设置,支持多种方式 if self.when == 'S': self.interval = 1 # one second self.suffix = "%Y-%m-%d_%H-%M-%S" self.extMatch = r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}(\.\w+)?$" ... self.extMatch = re.compile(self.extMatch, re.ASCII) self.interval = self.interval * interval # multiply by units requested filename = self.baseFilename if os.path.exists(filename): t = os.stat(filename)[ST_MTIME] # 最后修改时间 else: t = int(time.time()) self.rolloverAt = self.computeRollover(t) # 提前计算终止时间 def computeRollover(self, currentTime): # 判断的方法还是很长很复杂的,先pass def shouldRollover(self, record): t = int(time.time()) if t >= self.rolloverAt: # 判断是否到期 return 1 return 0 def doRollover(self): ... dfn = self.rotation_filename(self.baseFilename + "." + time.strftime(self.suffix, timeTuple)) # 滚动日志文件 if os.path.exists(dfn): os.remove(dfn) self.rotate(self.baseFilename, dfn) if self.backupCount > 0: for s in self.getFilesToDelete(): os.remove(s) ... # 计算下一个时间点 newRolloverAt = self.computeRollover(currentTime) ... self.rolloverAt = newRolloverAt
日期滚动就是计算最后时间点,超过时间点则重新生成新的日志文件。
logging的处理逻辑大概是这样的:
创建Logger对象,提供API,用来接收应用程序日志
Logger对象包括多个Handler
每个Handler有一个Formatter对象
每条日志都会生成一个LogRecord对象
使用不同的Handler对象将LogRecored对象提交到不同的流
每个日志对象通过Formatter格式化输出
可以使用按日期/文件大小的方式进行日志文件的滚动记录
覆盖对象的 __reduce__
方法,让对象支持 reduce 函数:
class RootLogger(Logger): def __init__(self, level): Logger.__init__(self, "root", level) def __reduce__(self): return getLogger, ()
线程锁的创建和释放:
_lock = threading.RLock() def _acquireLock(): if _lock: _lock.acquire() def _releaseLock(): if _lock: _lock.release()
线程锁的使用:
def addHandler(self, hdlr): _acquireLock() try: self.handlers.append(hdlr) finally: _releaseLock() def removeHandler(self, hdlr): _acquireLock() try: self.handlers.remove(hdlr) finally: _releaseLock()
Logging in Python https://realpython.com/python-logging/
日志操作手册 https://docs.python.org/zh-cn/3.8/howto/logging-cookbook.html#cookbook-rotator-namer
Python 的日志记录工具 https://docs.python.org/zh-cn/3.8/library/logging.html
“怎么用Python实现强大的 logging 模块”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注创新互联网站,小编将为大家输出更多高质量的实用文章!
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流