python / 3.7.2rc1 / all / howto-logging-cookbook.html

Logging Cookbook

  • Author

    • Vinay Sajip <vinay_sajip at red-dove dot com>

该页面包含许多与日志记录有关的食谱,这些食谱在过去被发现有用。

使用登录多个模块

多次调用logging.getLogger('someLogger')会返回对同一 Logger 对象的引用。只要在同一 Python 解释器进程中,不仅在同一模块内,而且在各个模块之间都是如此。对于相同对象的引用是正确的。此外,应用程序代码可以在一个模块中定义和配置父 Logger,并在单独的模块中创建(但不配置)子 Logger,并且所有对子 Logger 的调用都将传递给父 Logger。这是一个主要模块:

import logging
import auxiliary_module

# create logger with 'spam_application'
logger = logging.getLogger('spam_application')
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# add the handlers to the logger
logger.addHandler(fh)
logger.addHandler(ch)

logger.info('creating an instance of auxiliary_module.Auxiliary')
a = auxiliary_module.Auxiliary()
logger.info('created an instance of auxiliary_module.Auxiliary')
logger.info('calling auxiliary_module.Auxiliary.do_something')
a.do_something()
logger.info('finished auxiliary_module.Auxiliary.do_something')
logger.info('calling auxiliary_module.some_function()')
auxiliary_module.some_function()
logger.info('done with auxiliary_module.some_function()')

这是辅助模块:

import logging

# create logger
module_logger = logging.getLogger('spam_application.auxiliary')

class Auxiliary:
    def __init__(self):
        self.logger = logging.getLogger('spam_application.auxiliary.Auxiliary')
        self.logger.info('creating an instance of Auxiliary')

    def do_something(self):
        self.logger.info('doing something')
        a = 1 + 1
        self.logger.info('done doing something')

def some_function():
    module_logger.info('received a call to "some_function"')

输出如下所示:

2005-03-23 23:47:11,663 - spam_application - INFO -
   creating an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,665 - spam_application.auxiliary.Auxiliary - INFO -
   creating an instance of Auxiliary
2005-03-23 23:47:11,665 - spam_application - INFO -
   created an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,668 - spam_application - INFO -
   calling auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,668 - spam_application.auxiliary.Auxiliary - INFO -
   doing something
2005-03-23 23:47:11,669 - spam_application.auxiliary.Auxiliary - INFO -
   done doing something
2005-03-23 23:47:11,670 - spam_application - INFO -
   finished auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,671 - spam_application - INFO -
   calling auxiliary_module.some_function()
2005-03-23 23:47:11,672 - spam_application.auxiliary - INFO -
   received a call to 'some_function'
2005-03-23 23:47:11,673 - spam_application - INFO -
   done with auxiliary_module.some_function()

从多个线程记录

从多个线程进行日志记录不需要任何特殊的工作。以下示例显示了从主(初始)线程和另一个线程进行的日志记录:

import logging
import threading
import time

def worker(arg):
    while not arg['stop']:
        logging.debug('Hi from myfunc')
        time.sleep(0.5)

def main():
    logging.basicConfig(level=logging.DEBUG, format='%(relativeCreated)6d %(threadName)s %(message)s')
    info = {'stop': False}
    thread = threading.Thread(target=worker, args=(info,))
    thread.start()
    while True:
        try:
            logging.debug('Hello from main')
            time.sleep(0.75)
        except KeyboardInterrupt:
            info['stop'] = True
            break
    thread.join()

if __name__ == '__main__':
    main()

运行时,脚本应打印如下内容:

0 Thread-1 Hi from myfunc
   3 MainThread Hello from main
 505 Thread-1 Hi from myfunc
 755 MainThread Hello from main
1007 Thread-1 Hi from myfunc
1507 MainThread Hello from main
1508 Thread-1 Hi from myfunc
2010 Thread-1 Hi from myfunc
2258 MainThread Hello from main
2512 Thread-1 Hi from myfunc
3009 MainThread Hello from main
3013 Thread-1 Hi from myfunc
3515 Thread-1 Hi from myfunc
3761 MainThread Hello from main
4017 Thread-1 Hi from myfunc
4513 MainThread Hello from main
4518 Thread-1 Hi from myfunc

这显示了日志输出如预期的那样散布。当然,这种方法适用于比此处显示的线程更多的线程。

多个处理程序和格式化程序

Logger 是普通的 Python 对象。 addHandler()方法没有您可以添加的处理程序数量的最小或最大配额。有时,将应用程序将所有严重性的所有消息记录到文本文件,同时将错误或更高级别的消息记录到控制台将是有益的。要进行设置,只需配置适当的处理程序即可。应用程序代码中的日志记录调用将保持不变。这是对先前基于模块的简单配置示例的略微修改:

import logging

logger = logging.getLogger('simple_example')
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
fh.setFormatter(formatter)
# add the handlers to logger
logger.addHandler(ch)
logger.addHandler(fh)

# 'application' code
logger.debug('debug message')
logger.info('info message')
logger.warning('warn message')
logger.error('error message')
logger.critical('critical message')

请注意,“应用程序”代码并不关心多个处理程序。所做的更改只是添加和配置了名为* fh *的新处理程序。

使用较高或较低严重性过滤器创建新的处理程序的能力在编写和测试应用程序时非常有用。而不是使用许多print语句进行调试,而使用logger.debug:与 print 语句不同,在稍后您将其删除或 Comments 掉之后,logger.debug 语句可以在源代码中保持不变并保持休眠状态,直到再次需要它们为止。那时,唯一需要进行的更改是修改 Logger 和/或处理程序的严重性级别以进行调试。

登录到多个目的地

假设您要使用不同的消息格式和在不同的情况下登录控制台和文件。假设您要记录 DEBUG 或更高级别的消息到文件,而 INFO 或更高级别的消息记录到控制台。我们还假设该文件应包含时间戳,但控制台消息不应包含时间戳。这是实现此目的的方法:

import logging

# set up logging to file - see previous section for more details
logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
                    datefmt='%m-%d %H:%M',
                    filename='/temp/myapp.log',
                    filemode='w')
# define a Handler which writes INFO messages or higher to the sys.stderr
console = logging.StreamHandler()
console.setLevel(logging.INFO)
# set a format which is simpler for console use
formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
# tell the handler to use this format
console.setFormatter(formatter)
# add the handler to the root logger
logging.getLogger('').addHandler(console)

# Now, we can log to the root logger, or any other logger. First the root...
logging.info('Jackdaws love my big sphinx of quartz.')

# Now, define a couple of other loggers which might represent areas in your
# application:

logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')

logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')

运行此命令时,在控制台上,您将看到

root        : INFO     Jackdaws love my big sphinx of quartz.
myapp.area1 : INFO     How quickly daft jumping zebras vex.
myapp.area2 : WARNING  Jail zesty vixen who grabbed pay from quack.
myapp.area2 : ERROR    The five boxing wizards jump quickly.

在文件中,您会看到类似

10-22 22:19 root         INFO     Jackdaws love my big sphinx of quartz.
10-22 22:19 myapp.area1  DEBUG    Quick zephyrs blow, vexing daft Jim.
10-22 22:19 myapp.area1  INFO     How quickly daft jumping zebras vex.
10-22 22:19 myapp.area2  WARNING  Jail zesty vixen who grabbed pay from quack.
10-22 22:19 myapp.area2  ERROR    The five boxing wizards jump quickly.

如您所见,DEBUG 消息仅显示在文件中。其他消息发送到两个目的地。

本示例使用控制台和文件处理程序,但是您可以使用任意数量和所选处理程序的组合。

配置服务器示例

这是使用日志记录配置服务器的模块示例:

import logging
import logging.config
import time
import os

# read initial config file
logging.config.fileConfig('logging.conf')

# create and start listener on port 9999
t = logging.config.listen(9999)
t.start()

logger = logging.getLogger('simpleExample')

try:
    # loop through logging calls to see the difference
    # new configurations make, until Ctrl+C is pressed
    while True:
        logger.debug('debug message')
        logger.info('info message')
        logger.warning('warn message')
        logger.error('error message')
        logger.critical('critical message')
        time.sleep(5)
except KeyboardInterrupt:
    # cleanup
    logging.config.stopListening()
    t.join()

这是一个脚本,该脚本采用文件名并将该文件发送到服务器,并以新的日志记录配置正确地以二进制编码的长度开头:

#!/usr/bin/env python
import socket, sys, struct

with open(sys.argv[1], 'rb') as f:
    data_to_send = f.read()

HOST = 'localhost'
PORT = 9999
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print('connecting...')
s.connect((HOST, PORT))
print('sending config...')
s.send(struct.pack('>L', len(data_to_send)))
s.send(data_to_send)
s.close()
print('complete')

处理阻止的处理程序

有时,您必须让日志记录处理程序来执行其工作,而又不会阻塞正在从中进行日志记录的线程。这在 Web 应用程序中很常见,尽管当然在其他情况下也会发生。

表现迟钝的常见罪魁祸首是SMTPHandler:由于开发人员无法控制的多种原因(例如,性能不佳的邮件或网络基础结构),发送电子邮件可能会花费很长时间。但是几乎所有基于网络的处理程序都可以阻止:即使SocketHandler操作也可能在后台进行 DNS 查询,这太慢了(该查询可以深入到套接字库代码的深处,Python 层以下以及控件之外)。

一种解决方案是使用两部分方法。对于第一部分,仅将QueueHandler附加到从性能关键线程访问的那些 Logger 上。他们只是简单地写入队列,队列的大小可以调整为足够大的容量,也可以初始化而没有上限。尽管可能需要捕获queue.Full异常作为代码中的预防措施,但是通常会很快接受对队列的写操作。如果您是在代码中包含性能关键线程的库开发人员,请确保对此进行记录(以及仅将QueueHandlers附加到 Logger 的建议),以使其他使用您代码的开发人员受益。

解决方案的第二部分是QueueListener,它已被设计为QueueHandler的对应物。 QueueListener非常简单:它传递了一个队列和一些处理程序,并启动了一个内部线程,该线程侦听其队列中从QueueHandlers(或LogRecords的任何其他来源)发送的 LogRecords。 LogRecords从队列中删除,并传递给处理程序进行处理。

拥有单独的QueueListener类的优点是您可以使用同一实例为多个QueueHandlers服务。比起拥有现有处理程序类的线程版本,这将更加节省资源,后者将在每个处理程序中消耗一个线程,而没有任何特别的好处。

下面是使用这两个类的示例(Ellipsis了导入):

que = queue.Queue(-1)  # no limit on size
queue_handler = QueueHandler(que)
handler = logging.StreamHandler()
listener = QueueListener(que, handler)
root = logging.getLogger()
root.addHandler(queue_handler)
formatter = logging.Formatter('%(threadName)s: %(message)s')
handler.setFormatter(formatter)
listener.start()
# The log output will display the thread which generated
# the event (the main thread) rather than the internal
# thread which monitors the internal queue. This is what
# you want to happen.
root.warning('Look out!')
listener.stop()

运行时将产生:

MainThread: Look out!

在版本 3.5 中进行了更改:在 Python 3.5 之前,QueueListener总是将从队列中接收到的所有消息传递给初始化它的每个处理程序。 (这是因为假定级别过滤都是在填充队列的另一侧完成的.)从 3.5 开始,可以pass将关键字参数respect_handler_level=True传递给侦听器的构造函数来更改此行为。完成此操作后,侦听器会将每个消息的级别与处理程序的级别进行比较,并且仅在适当的情况下才将消息传递给处理程序。

pass网络发送和接收日志记录事件

假设您要pass网络发送日志记录事件,并在接收端进行处理。一种简单的方法是在发送端将SocketHandler实例附加到根 Logger:

import logging, logging.handlers

rootLogger = logging.getLogger('')
rootLogger.setLevel(logging.DEBUG)
socketHandler = logging.handlers.SocketHandler('localhost',
                    logging.handlers.DEFAULT_TCP_LOGGING_PORT)
# don't bother with a formatter, since a socket handler sends the event as
# an unformatted pickle
rootLogger.addHandler(socketHandler)

# Now, we can log to the root logger, or any other logger. First the root...
logging.info('Jackdaws love my big sphinx of quartz.')

# Now, define a couple of other loggers which might represent areas in your
# application:

logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')

logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')

在接收端,您可以使用socketserver模块设置接收器。这是一个基本的工作示例:

import pickle
import logging
import logging.handlers
import socketserver
import struct

class LogRecordStreamHandler(socketserver.StreamRequestHandler):
    """Handler for a streaming logging request.

    This basically logs the record using whatever logging policy is
    configured locally.
    """

    def handle(self):
        """
        Handle multiple requests - each expected to be a 4-byte length,
        followed by the LogRecord in pickle format. Logs the record
        according to whatever policy is configured locally.
        """
        while True:
            chunk = self.connection.recv(4)
            if len(chunk) < 4:
                break
            slen = struct.unpack('>L', chunk)[0]
            chunk = self.connection.recv(slen)
            while len(chunk) < slen:
                chunk = chunk + self.connection.recv(slen - len(chunk))
            obj = self.unPickle(chunk)
            record = logging.makeLogRecord(obj)
            self.handleLogRecord(record)

    def unPickle(self, data):
        return pickle.loads(data)

    def handleLogRecord(self, record):
        # if a name is specified, we use the named logger rather than the one
        # implied by the record.
        if self.server.logname is not None:
            name = self.server.logname
        else:
            name = record.name
        logger = logging.getLogger(name)
        # N.B. EVERY record gets logged. This is because Logger.handle
        # is normally called AFTER logger-level filtering. If you want
        # to do filtering, do it at the client end to save wasting
        # cycles and network bandwidth!
        logger.handle(record)

class LogRecordSocketReceiver(socketserver.ThreadingTCPServer):
    """
    Simple TCP socket-based logging receiver suitable for testing.
    """

    allow_reuse_address = True

    def __init__(self, host='localhost',
                 port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
                 handler=LogRecordStreamHandler):
        socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
        self.abort = 0
        self.timeout = 1
        self.logname = None

    def serve_until_stopped(self):
        import select
        abort = 0
        while not abort:
            rd, wr, ex = select.select([self.socket.fileno()],
                                       [], [],
                                       self.timeout)
            if rd:
                self.handle_request()
            abort = self.abort

def main():
    logging.basicConfig(
        format='%(relativeCreated)5d %(name)-15s %(levelname)-8s %(message)s')
    tcpserver = LogRecordSocketReceiver()
    print('About to start TCP server...')
    tcpserver.serve_until_stopped()

if __name__ == '__main__':
    main()

首先运行服务器,然后运行 Client 端。在 Client 端,控制台上未打印任何内容。在服务器端,您应该看到类似以下内容:

About to start TCP server...
   59 root            INFO     Jackdaws love my big sphinx of quartz.
   59 myapp.area1     DEBUG    Quick zephyrs blow, vexing daft Jim.
   69 myapp.area1     INFO     How quickly daft jumping zebras vex.
   69 myapp.area2     WARNING  Jail zesty vixen who grabbed pay from quack.
   69 myapp.area2     ERROR    The five boxing wizards jump quickly.

请注意,在某些情况下,pickle 存在一些安全问题。如果这些影响您,您可以使用替代序列化方案,方法是重写makePickle()方法并在那里实现替代方案,并改编上述脚本以使用替代序列化方案。

将上下文信息添加到您的日志记录输出中

有时您希望日志输出除了传递给日志调用的参数外,还包含上下文信息。例如,在联网应用中,可能希望在日志中记录特定于 Client 端的信息(例如,远程 Client 端的用户名或 IP 地址)。尽管可以使用* extra *参数来实现此目的,但是以这种方式传递信息并不总是很方便。尽管可能很想在每个连接的基础上创建Logger个实例,但这不是一个好主意,因为这些实例不会被垃圾回收。尽管实际上这不是问题,但当Logger实例的数量取决于您要在记录应用程序时使用的粒度级别时,如果Logger实例的数量实际上变得无界,则可能很难 Management。

使用 LoggerAdapters 传递上下文信息

传递上下文信息与日志事件信息一起输出的一种简单方法是使用LoggerAdapter类。此类设计为看起来像Logger,因此您可以调用debug()info()warning()error()exception()critical()log()。这些方法与Logger中的方法具有相同的签名,因此您可以互换使用两种类型的实例。

创建LoggerAdapter的实例时,您将为其传递Logger实例和一个包含上下文信息的类似 dict 的对象。当您在LoggerAdapter的实例上调用其中一个日志记录方法时,它会将调用委派给传递给其构造函数的Logger的基础实例,并安排在委派的调用中传递上下文信息。这是LoggerAdapter的代码段:

def debug(self, msg, /, *args, **kwargs):
    """
    Delegate a debug call to the underlying logger, after adding
    contextual information from this adapter instance.
    """
    msg, kwargs = self.process(msg, kwargs)
    self.logger.debug(msg, *args, **kwargs)

LoggerAdapterprocess()方法是将上下文信息添加到日志输出的位置。它传递了日志记录调用的消息和关键字参数,并将它们的(可能)修改后的版本传递回用于基础 Logger 的调用中。该方法的默认实现不考虑消息,而是在关键字参数中插入“额外”键,其值是传递给构造函数的类似 dict 的对象。当然,如果您在对适配器的调用中传递了“ extra”关键字参数,则它将被静默覆盖。

使用“额外”的优点是,将类似 dict 的对象中的值合并到LogRecord实例的__dict_中,从而使您可以将自定义字符串与Formatter实例一起使用,这些字符串了解类似 dict 的对象的键。如果您需要其他方法,例如如果要在消息字符串前添加上下文信息或将上下文信息附加到消息字符串,则只需要子类LoggerAdapter并重写process()即可完成所需的操作。这是一个简单的示例:

class CustomAdapter(logging.LoggerAdapter):
    """
    This example adapter expects the passed in dict-like object to have a
    'connid' key, whose value in brackets is prepended to the log message.
    """
    def process(self, msg, kwargs):
        return '[%s] %s' % (self.extra['connid'], msg), kwargs

您可以这样使用:

logger = logging.getLogger(__name__)
adapter = CustomAdapter(logger, {'connid': some_conn_id})

然后,您登录到适配器的所有事件的值都将在日志消息之前加上some_conn_id的值。

使用除字典以外的对象传递上下文信息

您不需要将实际的 dict 传递给LoggerAdapter -您可以传递实现__getitem____iter__的类的实例,这样它看起来像是记录日志的 dict。如果要动态生成值(而 dict 中的值将是恒定的),这将很有用。

使用过滤器传递上下文信息

您还可以使用用户定义的Filter将上下文信息添加到日志输出中。允许Filter个实例修改传递给它们的LogRecords,包括添加其他属性,然后可以使用适当的格式字符串或需要的自定义Formatter输出这些属性。

例如,在 Web 应用程序中,可以将正在处理的请求(或至少是其中有趣的部分)存储在 threadlocal(threading.local)变量中,然后从Filter访问以添加例如来自请求的信息- ,即LogRecord的远程 IP 地址和远程用户的用户名-,使用属性名'ip'和'user',如上面的LoggerAdapter示例所示。在那种情况下,可以使用相同的格式字符串来获得与上面所示类似的输出。这是一个示例脚本:

import logging
from random import choice

class ContextFilter(logging.Filter):
    """
    This is a filter which injects contextual information into the log.

    Rather than use actual contextual information, we just use random
    data in this demo.
    """

    USERS = ['jim', 'fred', 'sheila']
    IPS = ['123.231.231.123', '127.0.0.1', '192.168.0.1']

    def filter(self, record):

        record.ip = choice(ContextFilter.IPS)
        record.user = choice(ContextFilter.USERS)
        return True

if __name__ == '__main__':
    levels = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL)
    logging.basicConfig(level=logging.DEBUG,
                        format='%(asctime)-15s %(name)-5s %(levelname)-8s IP: %(ip)-15s User: %(user)-8s %(message)s')
    a1 = logging.getLogger('a.b.c')
    a2 = logging.getLogger('d.e.f')

    f = ContextFilter()
    a1.addFilter(f)
    a2.addFilter(f)
    a1.debug('A debug message')
    a1.info('An info message with %s', 'some parameters')
    for x in range(10):
        lvl = choice(levels)
        lvlname = logging.getLevelName(lvl)
        a2.log(lvl, 'A message at %s level with %d %s', lvlname, 2, 'parameters')

运行时会产生以下内容:

2010-09-06 22:38:15,292 a.b.c DEBUG    IP: 123.231.231.123 User: fred     A debug message
2010-09-06 22:38:15,300 a.b.c INFO     IP: 192.168.0.1     User: sheila   An info message with some parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1       User: sheila   A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR    IP: 127.0.0.1       User: jim      A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG    IP: 127.0.0.1       User: sheila   A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR    IP: 123.231.231.123 User: fred     A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 192.168.0.1     User: jim      A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1       User: sheila   A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG    IP: 192.168.0.1     User: jim      A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f ERROR    IP: 127.0.0.1       User: sheila   A message at ERROR level with 2 parameters
2010-09-06 22:38:15,301 d.e.f DEBUG    IP: 123.231.231.123 User: fred     A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f INFO     IP: 123.231.231.123 User: fred     A message at INFO level with 2 parameters

从多个进程登录到单个文件

尽管日志记录是线程安全的,并且支持在单个进程中从多个线程登录到单个文件,但是支持从多个进程*登录到单个文件,因为没有序列化访问的标准方法跨 Python 中的多个进程复制到单个文件。如果需要从多个进程登录到单个文件,执行此操作的一种方法是使所有进程都登录到SocketHandler,并有一个单独的进程,该进程实现一个套接字服务器,该服务器从套接字读取并记录到文件。 (如果愿意,可以在现有进程之一中指定一个线程来执行此Function.)This section详细记录了此方法,并包括一个有效的套接字接收器,可以用作您适应自己的起点应用程序。

您还可以编写自己的处理程序,该处理程序使用multiprocessing模块中的Lock类来序列化对进程的文件访问。现有的FileHandler和子类目前不使用multiprocessing,尽管将来可能会使用。请注意,目前multiprocessing模块并非在所有平台上都提供工作锁定Function(请参见https://bugs.python.org/issue3770)。

或者,您可以使用QueueQueueHandler将所有日志记录事件发送到多进程应用程序中的某个进程。以下示例脚本演示了如何执行此操作;在示例中,一个单独的侦听器进程侦听其他进程发送的事件,并根据其自己的日志记录配置记录这些事件。尽管该示例仅演示了一种实现方式(例如,您可能要使用侦听器线程而不是单独的侦听器进程–实现将是类似的),但它确实为侦听器和其他进程提供了完全不同的日志记录配置在您的应用程序中,并且可以用作满足您自己特定要求的代码的基础:

# You'll need these imports in your own code
import logging
import logging.handlers
import multiprocessing

# Next two import lines for this demo only
from random import choice, random
import time

#
# Because you'll want to define the logging configurations for listener and workers, the
# listener and worker process functions take a configurer parameter which is a callable
# for configuring logging for that process. These functions are also passed the queue,
# which they use for communication.
#
# In practice, you can configure the listener however you want, but note that in this
# simple example, the listener does not apply level or filter logic to received records.
# In practice, you would probably want to do this logic in the worker processes, to avoid
# sending events which would be filtered out between processes.
#
# The size of the rotated files is made small so you can see the results easily.
def listener_configurer():
    root = logging.getLogger()
    h = logging.handlers.RotatingFileHandler('mptest.log', 'a', 300, 10)
    f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
    h.setFormatter(f)
    root.addHandler(h)

# This is the listener process top-level loop: wait for logging events
# (LogRecords)on the queue and handle them, quit when you get a None for a
# LogRecord.
def listener_process(queue, configurer):
    configurer()
    while True:
        try:
            record = queue.get()
            if record is None:  # We send this as a sentinel to tell the listener to quit.
                break
            logger = logging.getLogger(record.name)
            logger.handle(record)  # No level or filter logic applied - just do it!
        except Exception:
            import sys, traceback
            print('Whoops! Problem:', file=sys.stderr)
            traceback.print_exc(file=sys.stderr)

# Arrays used for random selections in this demo

LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
          logging.ERROR, logging.CRITICAL]

LOGGERS = ['a.b.c', 'd.e.f']

MESSAGES = [
    'Random message #1',
    'Random message #2',
    'Random message #3',
]

# The worker configuration is done at the start of the worker process run.
# Note that on Windows you can't rely on fork semantics, so each process
# will run the logging configuration code when it starts.
def worker_configurer(queue):
    h = logging.handlers.QueueHandler(queue)  # Just the one handler needed
    root = logging.getLogger()
    root.addHandler(h)
    # send all messages, for demo; no other level or filter logic applied.
    root.setLevel(logging.DEBUG)

# This is the worker process top-level loop, which just logs ten events with
# random intervening delays before terminating.
# The print messages are just so you know it's doing something!
def worker_process(queue, configurer):
    configurer(queue)
    name = multiprocessing.current_process().name
    print('Worker started: %s' % name)
    for i in range(10):
        time.sleep(random())
        logger = logging.getLogger(choice(LOGGERS))
        level = choice(LEVELS)
        message = choice(MESSAGES)
        logger.log(level, message)
    print('Worker finished: %s' % name)

# Here's where the demo gets orchestrated. Create the queue, create and start
# the listener, create ten workers and start them, wait for them to finish,
# then send a None to the queue to tell the listener to finish.
def main():
    queue = multiprocessing.Queue(-1)
    listener = multiprocessing.Process(target=listener_process,
                                       args=(queue, listener_configurer))
    listener.start()
    workers = []
    for i in range(10):
        worker = multiprocessing.Process(target=worker_process,
                                         args=(queue, worker_configurer))
        workers.append(worker)
        worker.start()
    for w in workers:
        w.join()
    queue.put_nowait(None)
    listener.join()

if __name__ == '__main__':
    main()

上述脚本的一个变体将日志记录保留在主进程中的单独线程中:

import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue
import random
import threading
import time

def logger_thread(q):
    while True:
        record = q.get()
        if record is None:
            break
        logger = logging.getLogger(record.name)
        logger.handle(record)

def worker_process(q):
    qh = logging.handlers.QueueHandler(q)
    root = logging.getLogger()
    root.setLevel(logging.DEBUG)
    root.addHandler(qh)
    levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
              logging.CRITICAL]
    loggers = ['foo', 'foo.bar', 'foo.bar.baz',
               'spam', 'spam.ham', 'spam.ham.eggs']
    for i in range(100):
        lvl = random.choice(levels)
        logger = logging.getLogger(random.choice(loggers))
        logger.log(lvl, 'Message no. %d', i)

if __name__ == '__main__':
    q = Queue()
    d = {
        'version': 1,
        'formatters': {
            'detailed': {
                'class': 'logging.Formatter',
                'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
            }
        },
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'level': 'INFO',
            },
            'file': {
                'class': 'logging.FileHandler',
                'filename': 'mplog.log',
                'mode': 'w',
                'formatter': 'detailed',
            },
            'foofile': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-foo.log',
                'mode': 'w',
                'formatter': 'detailed',
            },
            'errors': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-errors.log',
                'mode': 'w',
                'level': 'ERROR',
                'formatter': 'detailed',
            },
        },
        'loggers': {
            'foo': {
                'handlers': ['foofile']
            }
        },
        'root': {
            'level': 'DEBUG',
            'handlers': ['console', 'file', 'errors']
        },
    }
    workers = []
    for i in range(5):
        wp = Process(target=worker_process, name='worker %d' % (i + 1), args=(q,))
        workers.append(wp)
        wp.start()
    logging.config.dictConfig(d)
    lp = threading.Thread(target=logger_thread, args=(q,))
    lp.start()
    # At this point, the main process could do some useful work of its own
    # Once it's done that, it can wait for the workers to terminate...
    for wp in workers:
        wp.join()
    # And now tell the logging thread to finish up, too
    q.put(None)
    lp.join()

此变体说明了如何为特定的 Logger 应用配置-例如fooLogger 具有一个特殊的处理程序,该处理程序将foo子系统中的所有事件存储在文件mplog-foo.log中。日志记录机制将在主进程中使用它(即使在工作进程中生成了日志记录事件)也可以将消息定向到适当的目的地。

Using concurrent.futures.ProcessPoolExecutor

如果要使用concurrent.futures.ProcessPoolExecutor启动工作进程,则需要稍微不同地创建队列。代替

queue = multiprocessing.Queue(-1)

你应该使用

queue = multiprocessing.Manager().Queue(-1)  # also works with the examples above

然后您可以从中替换工作程序创建:

workers = []
for i in range(10):
    worker = multiprocessing.Process(target=worker_process,
                                     args=(queue, worker_configurer))
    workers.append(worker)
    worker.start()
for w in workers:
    w.join()

为此(记住首先导入concurrent.futures):

with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
    for i in range(10):
        executor.submit(worker_process, queue, worker_configurer)

使用文件旋转

有时您想让日志文件增长到一定大小,然后打开一个新文件并记录到该文件。您可能需要保留一定数量的这些文件,并且在创建了那么多文件之后,旋转文件,以使文件数和文件大小都保持有界。对于此使用模式,日志记录包提供了RotatingFileHandler

import glob
import logging
import logging.handlers

LOG_FILENAME = 'logging_rotatingfile_example.out'

# Set up a specific logger with our desired output level
my_logger = logging.getLogger('MyLogger')
my_logger.setLevel(logging.DEBUG)

# Add the log message handler to the logger
handler = logging.handlers.RotatingFileHandler(
              LOG_FILENAME, maxBytes=20, backupCount=5)

my_logger.addHandler(handler)

# Log some messages
for i in range(20):
    my_logger.debug('i = %d' % i)

# See what files are created
logfiles = glob.glob('%s*' % LOG_FILENAME)

for filename in logfiles:
    print(filename)

结果应该是 6 个单独的文件,每个文件都包含应用程序的日志历史记录的一部分:

logging_rotatingfile_example.out
logging_rotatingfile_example.out.1
logging_rotatingfile_example.out.2
logging_rotatingfile_example.out.3
logging_rotatingfile_example.out.4
logging_rotatingfile_example.out.5

最新文件始终为logging_rotatingfile_example.out,并且每次达到大小限制时都会使用后缀.1重命名。每个现有备份文件都被重命名以增加后缀(.1变为.2等),并且.6文件被删除。

显然,作为一个极端示例,此示例将日志长度设置得太小。您可能希望将* maxBytes *设置为适当的值。

使用其他格式样式

将日志记录添加到 Python 标准库后,格式化具有可变内容的消息的唯一方法是使用%-formatting 方法。从那时起,Python 获得了两种新的格式化方法:string.Template(在 Python 2.4 中添加)和str.format()(在 Python 2.6 中添加)。

日志记录(从 3.2 版开始)为这两种其他格式设置样式提供了改进的支持。增强了Formatter类,以使用名为style的附加可选关键字参数。默认为'%',但其他可能的值为'{''$',它们对应于其他两种格式样式。向后兼容性默认情况下保持(如您所愿),但是pass显式指定样式参数,您可以指定适用于str.format()string.Template的格式字符串。这是一个示例控制台会话,展示了各种可能性:

>>> import logging
>>> root = logging.getLogger()
>>> root.setLevel(logging.DEBUG)
>>> handler = logging.StreamHandler()
>>> bf = logging.Formatter('{asctime} {name} {levelname:8s} {message}',
...                        style='{')
>>> handler.setFormatter(bf)
>>> root.addHandler(handler)
>>> logger = logging.getLogger('foo.bar')
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:11:55,341 foo.bar DEBUG    This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:12:11,526 foo.bar CRITICAL This is a CRITICAL message
>>> df = logging.Formatter('$asctime $name ${levelname} $message',
...                        style='$')
>>> handler.setFormatter(df)
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:13:06,924 foo.bar DEBUG This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:13:11,494 foo.bar CRITICAL This is a CRITICAL message
>>>

请注意,finally输出到日志的日志消息的格式完全独立于单个日志消息的构造方式。那仍然可以使用%格式,如下所示:

>>> logger.error('This is an%s %s %s', 'other,', 'ERROR,', 'message')
2010-10-28 15:19:29,833 foo.bar ERROR This is another, ERROR, message
>>>

记录调用(logger.debug()logger.info()等)仅采用实际记录消息本身的位置参数,而关键字参数仅用于确定如何处理实际记录调用的选项(例如exc_info关键字参数指示应记录回溯信息) ,或extra关键字参数来指示要添加到日志中的其他上下文信息)。因此,您不能使用str.format()string.Template语法直接进行日志记录调用,因为日志记录包在内部使用%-formatting 合并格式字符串和变量参数。在保留向后兼容性的同时,不会对此进行任何更改,因为现有代码中存在的所有日志记录调用都将使用%格式的字符串。

但是,有一种方法可以使用\ {}-和$-格式来构造您的单个日志消息。回想一下,对于一条消息,您可以使用任意对象作为消息格式字符串,并且日志记录程序包将对该对象调用str()以获取实际的格式字符串。请考虑以下两类:

class BraceMessage:
    def __init__(self, fmt, /, *args, **kwargs):
        self.fmt = fmt
        self.args = args
        self.kwargs = kwargs

    def __str__(self):
        return self.fmt.format(*self.args, **self.kwargs)

class DollarMessage:
    def __init__(self, fmt, /, **kwargs):
        self.fmt = fmt
        self.kwargs = kwargs

    def __str__(self):
        from string import Template
        return Template(self.fmt).substitute(**self.kwargs)

这些中的任何一个都可以代替格式字符串,以允许\ {}-或$ -formatting 用来构建实际的“消息”部分,该部分出现在格式化日志输出中,代替“%(message)s”或“{message}”或“ $ message”。每当您要记录某些内容时,都很难使用类名,但是如果您使用诸如__(双下划线的别名,请不要与混淆,单个下划线用作gettext.gettext()的别名/别名)的名称非常可口。或其弟兄)。

上面的类不包含在 Python 中,尽管它们很容易复制并粘贴到您自己的代码中。它们可以按如下方式使用(假设它们在名为wherever的模块中语句):

>>> from wherever import BraceMessage as __
>>> print(__('Message with {0} {name}', 2, name='placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})',
...       point=p))
Message with coordinates: (0.50, 0.50)
>>> from wherever import DollarMessage as __
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>

尽管上面的示例使用print()来显示格式化的工作原理,但是您当然会使用logger.debug()或类似的方法来实际记录这种格式。

需要注意的一件事是,使用这种方法不会对性能造成任何重大影响:实际的格式设置不是在进行日志记录调用时发生,而是在(如果有)记录的消息实际上将由处理程序输出到日志时发生。因此,可能会使您不满意的唯一不寻常的事情是括号会围绕格式字符串和参数,而不仅仅是格式字符串。这是因为_表示法只是对 XXXMessage 类之一的构造函数调用的语法糖。

如果愿意,可以使用LoggerAdapter来达到与上述类似的效果,如以下示例所示:

import logging

class Message:
    def __init__(self, fmt, args):
        self.fmt = fmt
        self.args = args

    def __str__(self):
        return self.fmt.format(*self.args)

class StyleAdapter(logging.LoggerAdapter):
    def __init__(self, logger, extra=None):
        super(StyleAdapter, self).__init__(logger, extra or {})

    def log(self, level, msg, /, *args, **kwargs):
        if self.isEnabledFor(level):
            msg, kwargs = self.process(msg, kwargs)
            self.logger._log(level, Message(msg, args), (), **kwargs)

logger = StyleAdapter(logging.getLogger(__name__))

def main():
    logger.debug('Hello, {}', 'world!')

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    main()

当在 Python 3.2 或更高版本上运行时,以上脚本应记录消息Hello, world!

Customizing LogRecord

每个日志记录事件都由一个LogRecord实例表示。当记录了事件且未按记录程序的级别过滤掉事件时,将创建LogRecord,并填充有关该事件的信息,然后将其传递给该记录程序的处理程序(及其祖先,直到并包括该记录程序,在该记录程序中进一步传播)层次结构已禁用)。在 Python 3.2 之前,只有两个地方完成了此创建:

  • Logger.makeRecord(),这是在记录事件的正常过程中调用的。这直接调用LogRecord来创建实例。

  • makeLogRecord(),用包含要添加到 LogRecord 的属性的字典来调用。通常在网络上已接收到合适的字典时调用该字典(例如,passSocketHandler以 pickle 的形式,或passHTTPHandler以 JSON 的形式)。

这通常意味着,如果您需要对LogRecord做任何特殊的事情,则必须执行以下一项操作。

  • 创建自己的Logger子类,该子类将覆盖Logger.makeRecord(),并在实例化您关心的所有 Logger 之前使用setLoggerClass()对其进行设置。

  • Filter添加到 Logger 或处理程序,该方法将在调用其filter()方法时执行所需的特殊操作。

在(例如)几个不同的库想做不同的事情的情况下,第一种方法会有些笨拙。每个人都会try设置自己的Logger子类,而最后一个这样做的人将获胜。

第二种方法在很多情况下都能很好地发挥作用,但不允许您例如使用专门的LogRecord子类。库开发人员可以在 Logger 上设置一个合适的过滤器,但是他们必须记住每次引入新 Logger 时都要这样做(他们只需添加新的软件包或模块并执行

logger = logging.getLogger(__name__)

在模块级别)。这可能是太多要考虑的事情。开发人员也可以将过滤器添加到附加到其顶层 Logger 的NullHandler上,但是如果应用程序开发人员将处理程序附加到较低层的库 Logger 上,则不会调用该过滤器-因此该处理程序的输出将无法反映该过滤器的意图。库开发人员。

在 Python 3.2 及更高版本中,LogRecord创建是pass工厂指定的,您可以指定该工厂。您可以使用setLogRecordFactory()设置工厂,也可以使用getLogRecordFactory()询问工厂。使用LogRecord构造函数相同的签名来调用工厂,因为LogRecord是工厂的默认设置。

这种方法允许自定义工厂控制 LogRecord 创建的所有方面。例如,您可以返回子类,或仅使用以下类似的模式将一些其他属性添加到创建后的 Logging:

old_factory = logging.getLogRecordFactory()

def record_factory(*args, **kwargs):
    record = old_factory(*args, **kwargs)
    record.custom_attribute = 0xdecafbad
    return record

logging.setLogRecordFactory(record_factory)

这种模式允许不同的库将工厂链接在一起,并且只要它们不覆盖彼此的属性或无意覆盖作为标准提供的属性,就不会感到惊讶。但是,应该记住,链中的每个链接都会为所有日志记录操作增加运行时开销,并且该技术仅应在使用Filter无法提供所需结果时使用。

子类化 QueueHandler-ZeroMQ 示例

您可以使用QueueHandler子类将消息发送到其他类型的队列,例如 ZeroMQ“发布”套接字。在下面的示例中,套接字是单独创建的,并传递给处理程序(作为其“队列”):

import zmq   # using pyzmq, the Python binding for ZeroMQ
import json  # for serializing records portably

ctx = zmq.Context()
sock = zmq.Socket(ctx, zmq.PUB)  # or zmq.PUSH, or other suitable value
sock.bind('tcp://*:5556')        # or wherever

class ZeroMQSocketHandler(QueueHandler):
    def enqueue(self, record):
        self.queue.send_json(record.__dict__)

handler = ZeroMQSocketHandler(sock)

当然,还有其他组织方式,例如,传递处理程序创建套接字所需的数据:

class ZeroMQSocketHandler(QueueHandler):
    def __init__(self, uri, socktype=zmq.PUB, ctx=None):
        self.ctx = ctx or zmq.Context()
        socket = zmq.Socket(self.ctx, socktype)
        socket.bind(uri)
        super().__init__(socket)

    def enqueue(self, record):
        self.queue.send_json(record.__dict__)

    def close(self):
        self.queue.close()

子类化 QueueListener-ZeroMQ 示例

您还可以子类QueueListener来从其他种类的队列中获取消息,例如 ZeroMQ'subscribe'套接字。这是一个例子:

class ZeroMQSocketListener(QueueListener):
    def __init__(self, uri, /, *handlers, **kwargs):
        self.ctx = kwargs.get('ctx') or zmq.Context()
        socket = zmq.Socket(self.ctx, zmq.SUB)
        socket.setsockopt_string(zmq.SUBSCRIBE, '')  # subscribe to everything
        socket.connect(uri)
        super().__init__(socket, *handlers, **kwargs)

    def dequeue(self):
        msg = self.queue.recv_json()
        return logging.makeLogRecord(msg)

See also

基本的日志记录教程

更高级的日志记录教程

示例基于字典的配置

以下是日志记录配置字典的示例-取自Django 项目的文档。该字典传递给dictConfig()以使配置生效:

LOGGING = {
    'version': 1,
    'disable_existing_loggers': True,
    'formatters': {
        'verbose': {
            'format': '%(levelname)s %(asctime)s %(module)s %(process)d %(thread)d %(message)s'
        },
        'simple': {
            'format': '%(levelname)s %(message)s'
        },
    },
    'filters': {
        'special': {
            '()': 'project.logging.SpecialFilter',
            'foo': 'bar',
        }
    },
    'handlers': {
        'null': {
            'level':'DEBUG',
            'class':'django.utils.log.NullHandler',
        },
        'console':{
            'level':'DEBUG',
            'class':'logging.StreamHandler',
            'formatter': 'simple'
        },
        'mail_admins': {
            'level': 'ERROR',
            'class': 'django.utils.log.AdminEmailHandler',
            'filters': ['special']
        }
    },
    'loggers': {
        'django': {
            'handlers':['null'],
            'propagate': True,
            'level':'INFO',
        },
        'django.request': {
            'handlers': ['mail_admins'],
            'level': 'ERROR',
            'propagate': False,
        },
        'myproject.custom': {
            'handlers': ['console', 'mail_admins'],
            'level': 'INFO',
            'filters': ['special']
        }
    }
}

有关此配置的更多信息,请参见 Django 文档的relevant section

使用旋转器和命名器来自定义日志旋转处理

以下代码段给出了如何定义命名器和旋转器的示例,该代码显示了基于 zlib 的日志文件压缩:

def namer(name):
    return name + ".gz"

def rotator(source, dest):
    with open(source, "rb") as sf:
        data = sf.read()
        compressed = zlib.compress(data, 9)
        with open(dest, "wb") as df:
            df.write(compressed)
    os.remove(source)

rh = logging.handlers.RotatingFileHandler(...)
rh.rotator = rotator
rh.namer = namer

这些不是“ true” .gz 文件,因为它们是裸压缩数据,没有像实际 gzip 文件中那样的“容器”。此代码段仅用于说明目的。

更详细的 multiprocessing 示例

以下工作示例显示了如何将日志记录与使用配置文件的多重处理一起使用。这些配置非常简单,但是可以说明在实际的 multiprocessing 方案中如何实现更复杂的配置。

在该示例中,主进程产生一个侦听器进程和一些辅助进程。每个主进程,侦听器和工作程序都有三个单独的配置(工作程序都共享相同的配置)。我们可以看到主要过程中的日志记录,工作人员如何记录到 QueueHandler 以及侦听器如何实现 QueueListener 和更复杂的日志记录配置,以及安排将pass队列接收的事件分派到配置中指定的处理程序。请注意,这些配置仅是说明性的,但是您应该能够使此示例适应您自己的方案。

这是脚本-文档字符串和 Comments 有望解释其工作原理:

import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue, Event, current_process
import os
import random
import time

class MyHandler:
    """
    A simple handler for logging events. It runs in the listener process and
    dispatches events to loggers based on the name in the received record,
    which then get dispatched, by the logging system, to the handlers
    configured for those loggers.
    """

    def handle(self, record):
        if record.name == "root":
            logger = logging.getLogger()
        else:
            logger = logging.getLogger(record.name)

        if logger.isEnabledFor(record.levelno):
            # The process name is transformed just to show that it's the listener
            # doing the logging to files and console
            record.processName = '%s (for %s)' % (current_process().name, record.processName)
            logger.handle(record)

def listener_process(q, stop_event, config):
    """
    This could be done in the main process, but is just done in a separate
    process for illustrative purposes.

    This initialises logging according to the specified configuration,
    starts the listener and waits for the main process to signal completion
    via the event. The listener is then stopped, and the process exits.
    """
    logging.config.dictConfig(config)
    listener = logging.handlers.QueueListener(q, MyHandler())
    listener.start()
    if os.name == 'posix':
        # On POSIX, the setup logger will have been configured in the
        # parent process, but should have been disabled following the
        # dictConfig call.
        # On Windows, since fork isn't used, the setup logger won't
        # exist in the child, so it would be created and the message
        # would appear - hence the "if posix" clause.
        logger = logging.getLogger('setup')
        logger.critical('Should not appear, because of disabled logger ...')
    stop_event.wait()
    listener.stop()

def worker_process(config):
    """
    A number of these are spawned for the purpose of illustration. In
    practice, they could be a heterogeneous bunch of processes rather than
    ones which are identical to each other.

    This initialises logging according to the specified configuration,
    and logs a hundred messages with random levels to randomly selected
    loggers.

    A small sleep is added to allow other processes a chance to run. This
    is not strictly needed, but it mixes the output from the different
    processes a bit more than if it's left out.
    """
    logging.config.dictConfig(config)
    levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
              logging.CRITICAL]
    loggers = ['foo', 'foo.bar', 'foo.bar.baz',
               'spam', 'spam.ham', 'spam.ham.eggs']
    if os.name == 'posix':
        # On POSIX, the setup logger will have been configured in the
        # parent process, but should have been disabled following the
        # dictConfig call.
        # On Windows, since fork isn't used, the setup logger won't
        # exist in the child, so it would be created and the message
        # would appear - hence the "if posix" clause.
        logger = logging.getLogger('setup')
        logger.critical('Should not appear, because of disabled logger ...')
    for i in range(100):
        lvl = random.choice(levels)
        logger = logging.getLogger(random.choice(loggers))
        logger.log(lvl, 'Message no. %d', i)
        time.sleep(0.01)

def main():
    q = Queue()
    # The main process gets a simple configuration which prints to the console.
    config_initial = {
        'version': 1,
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'level': 'INFO'
            }
        },
        'root': {
            'handlers': ['console'],
            'level': 'DEBUG'
        }
    }
    # The worker process configuration is just a QueueHandler attached to the
    # root logger, which allows all messages to be sent to the queue.
    # We disable existing loggers to disable the "setup" logger used in the
    # parent process. This is needed on POSIX because the logger will
    # be there in the child following a fork().
    config_worker = {
        'version': 1,
        'disable_existing_loggers': True,
        'handlers': {
            'queue': {
                'class': 'logging.handlers.QueueHandler',
                'queue': q
            }
        },
        'root': {
            'handlers': ['queue'],
            'level': 'DEBUG'
        }
    }
    # The listener process configuration shows that the full flexibility of
    # logging configuration is available to dispatch events to handlers however
    # you want.
    # We disable existing loggers to disable the "setup" logger used in the
    # parent process. This is needed on POSIX because the logger will
    # be there in the child following a fork().
    config_listener = {
        'version': 1,
        'disable_existing_loggers': True,
        'formatters': {
            'detailed': {
                'class': 'logging.Formatter',
                'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
            },
            'simple': {
                'class': 'logging.Formatter',
                'format': '%(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
            }
        },
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'formatter': 'simple',
                'level': 'INFO'
            },
            'file': {
                'class': 'logging.FileHandler',
                'filename': 'mplog.log',
                'mode': 'w',
                'formatter': 'detailed'
            },
            'foofile': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-foo.log',
                'mode': 'w',
                'formatter': 'detailed'
            },
            'errors': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-errors.log',
                'mode': 'w',
                'formatter': 'detailed',
                'level': 'ERROR'
            }
        },
        'loggers': {
            'foo': {
                'handlers': ['foofile']
            }
        },
        'root': {
            'handlers': ['console', 'file', 'errors'],
            'level': 'DEBUG'
        }
    }
    # Log some initial events, just to show that logging in the parent works
    # normally.
    logging.config.dictConfig(config_initial)
    logger = logging.getLogger('setup')
    logger.info('About to create workers ...')
    workers = []
    for i in range(5):
        wp = Process(target=worker_process, name='worker %d' % (i + 1),
                     args=(config_worker,))
        workers.append(wp)
        wp.start()
        logger.info('Started worker: %s', wp.name)
    logger.info('About to create listener ...')
    stop_event = Event()
    lp = Process(target=listener_process, name='listener',
                 args=(q, stop_event, config_listener))
    lp.start()
    logger.info('Started listener')
    # We now hang around for the workers to finish their work.
    for wp in workers:
        wp.join()
    # Workers all done, listening can now stop.
    # Logging in the parent still works normally.
    logger.info('Telling listener to stop ...')
    stop_event.set()
    lp.join()
    logger.info('All done.')

if __name__ == '__main__':
    main()

将 BOM 插入发送到 SysLogHandler 的消息中

RFC 5424要求将 Unicode 消息作为一组字节发送到 syslog 守护程序,该字节具有以下结构:可选的纯 ASCII 组件,后跟 UTF-8 字节 Sequences 标记(BOM),然后是使用 UTF 编码的 Unicode -8. (请参见 规范的相关部分。)

在 Python 3.1 中,代码已添加到SysLogHandler中,以将 BOM 表插入消息中,但是不幸的是,它的实现不正确,因为 BOM 表出现在消息的开头,因此不允许任何纯 ASCII 组件出现在消息之前。

由于此行为被破坏,因此错误的 BOM 插入代码已从 Python 3.2.4 及更高版本中删除。但是,它不会被替换,并且如果要生成 RFC 5424兼容的消息,其中包括 BOM,使用 UTF-8 编码的 BOM 表,可选的纯 ASCII 序列以及其后的任意 Unicode 编码,则需要执行以下:

'ASCII section\ufeffUnicode section'

当使用 UTF-8 编码时,Unicode 代码点 U FEFF 将被编码为 UTF-8 BOM –字节串b'\xef\xbb\xbf'

  • 用任何喜欢的占位符替换 ASCII 节,但要确保替换后出现在其中的数据始终是 ASCII(这样,在 UTF-8 编码后,数据将保持不变)。

  • 用您喜欢的任何占位符替换 Unicode 部分;如果替换后出现的数据中包含的字符超出了 ASCII 范围,则可以-使用 UTF-8 进行编码。

格式化的消息passSysLogHandler使用 UTF-8 编码进行编码。如果您遵循上述规则,则应该能够产生 RFC 5424兼容的消息。如果您不这样做,则日志记录可能不会抱怨,但是您的消息将不符合 RFC 5424,并且您的 syslog 守护程序可能会抱怨。

实施结构化日志记录

尽管大多数日志记录消息都是供人阅读的,因此不容易pass机器解析,但是在某些情况下,您可能希望以结构化格式输出消息,该结构化格式能够被程序解析(而无需复杂的正则表达式)解析日志消息)。使用日志记录包可以轻松实现。有多种方法可以实现此目的,但是以下是一种简单的方法,该方法使用 JSON 以机器可解析的方式序列化事件:

import json
import logging

class StructuredMessage:
    def __init__(self, message, /, **kwargs):
        self.message = message
        self.kwargs = kwargs

    def __str__(self):
        return '%s >>> %s' % (self.message, json.dumps(self.kwargs))

_ = StructuredMessage   # optional, to improve readability

logging.basicConfig(level=logging.INFO, format='%(message)s')
logging.info(_('message 1', foo='bar', bar='baz', num=123, fnum=123.456))

如果运行了上面的脚本,它将输出:

message 1 >>> {"fnum": 123.456, "num": 123, "bar": "baz", "foo": "bar"}

请注意,根据所使用的 Python 版本,项目的 Sequences 可能会有所不同。

如果需要更专业的处理,则可以使用自定义 JSON 编码器,如以下完整示例所示:

from __future__ import unicode_literals

import json
import logging

# This next bit is to ensure the script runs unchanged on 2.x and 3.x
try:
    unicode
except NameError:
    unicode = str

class Encoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, set):
            return tuple(o)
        elif isinstance(o, unicode):
            return o.encode('unicode_escape').decode('ascii')
        return super(Encoder, self).default(o)

class StructuredMessage:
    def __init__(self, message, /, **kwargs):
        self.message = message
        self.kwargs = kwargs

    def __str__(self):
        s = Encoder().encode(self.kwargs)
        return '%s >>> %s' % (self.message, s)

_ = StructuredMessage   # optional, to improve readability

def main():
    logging.basicConfig(level=logging.INFO, format='%(message)s')
    logging.info(_('message 1', set_value={1, 2, 3}, snowman='\u2603'))

if __name__ == '__main__':
    main()

运行上面的脚本时,它会打印:

message 1 >>> {"snowman": "\u2603", "set_value": [1, 2, 3]}

请注意,根据所使用的 Python 版本,项目的 Sequences 可能会有所不同。

使用 dictConfig()自定义处理程序

有时您想以特定方式自定义日志处理程序,如果您使用dictConfig(),则无需子类就可以做到这一点。例如,考虑您可能要设置日志文件的所有权。在 POSIX 上,可以使用shutil.chown()轻松完成此操作,但是 stdlib 中的文件处理程序不提供内置支持。您可以使用简单的函数来自定义处理程序的创建,例如:

def owned_file_handler(filename, mode='a', encoding=None, owner=None):
    if owner:
        if not os.path.exists(filename):
            open(filename, 'a').close()
        shutil.chown(filename, *owner)
    return logging.FileHandler(filename, mode, encoding)

然后,您可以在传递给dictConfig()的日志记录配置中指定pass调用此函数来创建日志记录处理程序:

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'default': {
            'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
        },
    },
    'handlers': {
        'file':{
            # The values below are popped from this dictionary and
            # used to create the handler, set the handler's level and
            # its formatter.
            '()': owned_file_handler,
            'level':'DEBUG',
            'formatter': 'default',
            # The values below are passed to the handler creator callable
            # as keyword arguments.
            'owner': ['pulse', 'pulse'],
            'filename': 'chowntest.log',
            'mode': 'w',
            'encoding': 'utf-8',
        },
    },
    'root': {
        'handlers': ['file'],
        'level': 'DEBUG',
    },
}

在此示例中,我仅出于说明目的使用pulse用户和组设置所有权。将其放到工作脚本chowntest.py中:

import logging, logging.config, os, shutil

def owned_file_handler(filename, mode='a', encoding=None, owner=None):
    if owner:
        if not os.path.exists(filename):
            open(filename, 'a').close()
        shutil.chown(filename, *owner)
    return logging.FileHandler(filename, mode, encoding)

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'default': {
            'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
        },
    },
    'handlers': {
        'file':{
            # The values below are popped from this dictionary and
            # used to create the handler, set the handler's level and
            # its formatter.
            '()': owned_file_handler,
            'level':'DEBUG',
            'formatter': 'default',
            # The values below are passed to the handler creator callable
            # as keyword arguments.
            'owner': ['pulse', 'pulse'],
            'filename': 'chowntest.log',
            'mode': 'w',
            'encoding': 'utf-8',
        },
    },
    'root': {
        'handlers': ['file'],
        'level': 'DEBUG',
    },
}

logging.config.dictConfig(LOGGING)
logger = logging.getLogger('mylogger')
logger.debug('A debug message')

要运行此程序,您可能需要以root的身份运行:

$ sudo python3.3 chowntest.py
$ cat chowntest.log
2013-11-05 09:34:51,128 DEBUG mylogger A debug message
$ ls -l chowntest.log
-rw-r--r-- 1 pulse pulse 55 2013-11-05 09:34 chowntest.log

请注意,此示例使用 Python 3.3,因为这是shutil.chown()出现的地方。此方法应与任何支持dictConfig()的 Python 版本一起使用-即 Python 2.7、3.2 或更高版本。在 3.3 之前的版本中,您需要使用来实现实际的所有权更改。 os.chown()

实际上,创建处理程序的Function可能位于项目中某个地方的 Util 模块中。代替配置中的行:

'()': owned_file_handler,

您可以使用例如:

'()': 'ext://project.util.owned_file_handler',

其中project.util可以替换为函数所在包的实际名称。在上面的工作脚本中,应使用'ext://__main__.owned_file_handler'。在这里,实际的可调用项由ext://规范中的dictConfig()解析。

希望本示例还指出了如何实现其他类型的文件更改的方式-例如使用os.chmod()以相同的方式设置特定的 POSIX 权限位。

当然,该方法还可以扩展到除FileHandler之外的其他类型的处理程序-例如,旋转文件处理程序之一,或完全不同的处理程序类型。

在整个应用程序中使用特定的格式样式

在 Python 3.2 中,Formatter获得了style关键字参数,虽然为了向后兼容而默认为%,但允许{$的规范支持str.format()string.Template支持的格式化方法。请注意,这支配了finally输出到日志的日志消息的格式,并且与如何构造单个日志消息完全正交。

记录调用(debug()info()等)仅采用实际记录消息本身的位置参数,而关键字参数仅用于确定如何处理记录调用的选项(例如exc_info关键字参数指示应记录回溯信息,或extra关键字参数表示要添加到日志中的其他上下文信息)。因此,您不能使用str.format()string.Template语法直接进行日志记录调用,因为日志记录包在内部使用%格式合并格式字符串和变量参数。在保留向后兼容性的同时,不会对此进行任何更改,因为现有代码中存在的所有日志记录调用都将使用%格式的字符串。

有人建议将格式样式与特定的 Logger 相关联,但是这种方法也会遇到向后兼容性问题,因为任何现有代码都可能使用给定的 Logger 名称和%-formatting。

为了使日志能够在任何第三方库和您的代码之间互操作,必须在单个日志调用级别上决定格式。这开辟了几种方式,可以容纳其他格式的样式。

使用 LogRecord 工厂

在 Python 3.2 中,连同上面提到的Formatter更改,日志记录包获得了允许用户使用setLogRecordFactory()函数设置自己的LogRecord子类的Function。您可以使用它来设置自己的LogRecord子类,该子类pass覆盖getMessage()方法来执行 Right Thing。此方法的 Base Class 实现是msg % args格式发生的地方,您可以替换替代格式;但是,您应该谨慎地支持所有格式样式,并允许%格式为默认格式,以确保与其他代码的互操作性。就像基本实现一样,也应注意调用str(self.msg)

有关更多信息,请参考setLogRecordFactory()LogRecord上的参考文档。

使用自定义消息对象

还有另一种也许更简单的方法,您可以使用\ {}-和$-格式化来构造您的单个日志消息。您可能会记得(从使用任意对象作为消息),在记录日志时,可以使用任意对象作为消息格式字符串,并且日志记录包将对该对象调用str()以获取实际的格式字符串。请考虑以下两类:

class BraceMessage:
    def __init__(self, fmt, /, *args, **kwargs):
        self.fmt = fmt
        self.args = args
        self.kwargs = kwargs

    def __str__(self):
        return self.fmt.format(*self.args, **self.kwargs)

class DollarMessage:
    def __init__(self, fmt, /, **kwargs):
        self.fmt = fmt
        self.kwargs = kwargs

    def __str__(self):
        from string import Template
        return Template(self.fmt).substitute(**self.kwargs)

这些中的任何一个都可以代替格式字符串,以允许使用\ {}-或$格式来构建实际的“消息”部分,该部分将显示在格式化日志输出中,以代替“%(message)s”或“{message}”或“ $ message”。如果您发现每次要记录某事时都使用类名有点笨拙,则可以在消息中使用诸如M_之类的别名(或者使用__,如果使用_进行本地化)则使其更加可口。 )。

下面给出了这种方法的示例。首先,使用str.format()进行格式化:

>>> __ = BraceMessage
>>> print(__('Message with {0} {1}', 2, 'placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})', point=p))
Message with coordinates: (0.50, 0.50)

其次,使用string.Template进行格式化:

>>> __ = DollarMessage
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>

需要注意的一件事是,使用这种方法不会对性能造成任何重大影响:实际的格式设置不是在进行日志记录调用时发生,而是在(如果有)记录的消息实际上将由处理程序输出到日志时发生。因此,可能会使您不满意的唯一不寻常的事情是括号会围绕格式字符串和参数,而不仅仅是格式字符串。这是因为_表示法只是对上面显示的XXXMessage类之一进行构造函数调用的语法糖。

使用 dictConfig()配置过滤器

您可以*使用_ 配置过滤器,尽管乍看之下操作方法可能并不明显(因此该方法)。由于Filter是标准库中唯一包含的过滤器类,并且不太可能满足许多要求(仅作为 Base Class 存在),因此通常需要使用覆盖的filter()方法定义自己的Filter子类。为此,请在过滤器的配置字典中指定()键,并指定将用于创建过滤器的 callable(一个类是最明显的,但是您可以提供任何返回Filter实例的 callable)。这是一个完整的示例:

import logging
import logging.config
import sys

class MyFilter(logging.Filter):
    def __init__(self, param=None):
        self.param = param

    def filter(self, record):
        if self.param is None:
            allow = True
        else:
            allow = self.param not in record.msg
        if allow:
            record.msg = 'changed: ' + record.msg
        return allow

LOGGING = {
    'version': 1,
    'filters': {
        'myfilter': {
            '()': MyFilter,
            'param': 'noshow',
        }
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'filters': ['myfilter']
        }
    },
    'root': {
        'level': 'DEBUG',
        'handlers': ['console']
    },
}

if __name__ == '__main__':
    logging.config.dictConfig(LOGGING)
    logging.debug('hello')
    logging.debug('hello - noshow')

本示例说明如何以关键字参数的形式将配置数据传递给构造实例的可调用对象。运行时,以上脚本将打印:

changed: hello

这表明过滤器正在按配置工作。

还有两点需要注意:

  • 如果您无法在配置中直接引用可调用对象(例如,如果它位于其他模块中,并且无法将其直接导入配置字典所在的位置),则可以使用访问外部对象中描述的ext://...形式。例如,您可以在上面的示例中使用文本'ext://__main__.MyFilter'而不是MyFilter

  • 除过滤器外,此技术还可用于配置自定义处理程序和格式化程序。有关日志如何支持在配置中使用用户定义的对象的更多信息,请参见User-defined objects,请参阅上面的其他食谱使用 dictConfig()自定义处理程序

自定义 exception 格式

有时您可能想要进行自定义的异常格式设置-出于参数的考虑,假设即使存在异常信息,每个记录的事件也只需要一行。您可以使用自定义格式器类来执行此操作,如以下示例所示:

import logging

class OneLineExceptionFormatter(logging.Formatter):
    def formatException(self, exc_info):
        """
        Format an exception so that it prints on a single line.
        """
        result = super(OneLineExceptionFormatter, self).formatException(exc_info)
        return repr(result)  # or format into one line however you want to

    def format(self, record):
        s = super(OneLineExceptionFormatter, self).format(record)
        if record.exc_text:
            s = s.replace('\n', '') + '|'
        return s

def configure_logging():
    fh = logging.FileHandler('output.txt', 'w')
    f = OneLineExceptionFormatter('%(asctime)s|%(levelname)s|%(message)s|',
                                  '%d/%m/%Y %H:%M:%S')
    fh.setFormatter(f)
    root = logging.getLogger()
    root.setLevel(logging.DEBUG)
    root.addHandler(fh)

def main():
    configure_logging()
    logging.info('Sample message')
    try:
        x = 1 / 0
    except ZeroDivisionError as e:
        logging.exception('ZeroDivisionError: %s', e)

if __name__ == '__main__':
    main()

运行时,这将产生一个包含两行的文件:

28/01/2015 07:21:23|INFO|Sample message|
28/01/2015 07:21:23|ERROR|ZeroDivisionError: integer division or modulo by zero|'Traceback (most recent call last):\n  File "logtest7.py", line 30, in main\n    x = 1 / 0\nZeroDivisionError: integer division or modulo by zero'|

尽管上面的处理很简单,但是它为如何根据自己的喜好格式化异常信息指明了方向。 traceback模块可能有助于满足更多特殊需求。

记录日志信息

在某些情况下,希望以可听而不是可见的格式呈现日志消息。如果您的系统中具有文本转语音(TTS)Function,即使它没有 Python 绑定,也很容易做到。大多数 TTS 系统都有您可以运行的命令行程序,可以使用subprocess从处理程序中调用该程序。这里假设 TTS 命令行程序不会与用户交互或花费很长时间才能完成,并且记录消息的频率不会很高,不会使用户充满消息,并且拥有一次而不是同时说出一条消息,下面的示例实现在处理另一条消息之前先 await 一条消息,这可能会使其他处理程序保持 await 状态。这是一个显示此方法的简短示例,它假定espeak TTS 包可用:

import logging
import subprocess
import sys

class TTSHandler(logging.Handler):
    def emit(self, record):
        msg = self.format(record)
        # Speak slowly in a female English voice
        cmd = ['espeak', '-s150', '-ven+f3', msg]
        p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
                             stderr=subprocess.STDOUT)
        # wait for the program to finish
        p.communicate()

def configure_logging():
    h = TTSHandler()
    root = logging.getLogger()
    root.addHandler(h)
    # the default formatter just returns the message
    root.setLevel(logging.DEBUG)

def main():
    logging.info('Hello')
    logging.debug('Goodbye')

if __name__ == '__main__':
    configure_logging()
    sys.exit(main())

运行时,此脚本应以女性声音说“你好”,然后说“再见”。

当然,以上方法可以适用于其他 TTS 系统,甚至可以与其他可以pass命令行运行的外部程序处理消息的系统一起使用。

缓冲日志消息并有条件地输出

在某些情况下,您可能希望将消息记录在临时区域中,并且仅在发生特定情况时才输出消息。例如,您可能要开始在一个函数中记录调试事件,并且如果该函数完成而没有错误,则您不希望使用收集的调试信息使日志混乱,但是,如果有错误,则需要所有调试要输出的信息以及错误。

这是一个示例,显示了如何使用装饰器为希望日志记录以这种方式运行的函数执行此操作。它使用logging.handlers.MemoryHandler,它允许缓冲记录的事件,直到发生某种情况为止,此时将缓冲的事件flushed-传递给另一个处理程序(target处理程序)进行处理。默认情况下,MemoryHandler在其缓冲区被填满或看到级别大于或等于指定阈值的事件时刷新。如果要自定义冲洗行为,可以将此配方与MemoryHandler的更专门的子类一起使用。

该示例脚本具有一个简单的函数foo,该函数仅循环浏览所有日志记录级别,写入sys.stderr表示要登录的级别,然后实际在该级别记录消息。您可以将参数传递给foo,如果为 true,它将以 ERROR 和 CRITICAL 级别记录-否则,它仅以 DEBUG,INFO 和 WARNING 级别记录。

该脚本只是安排用装饰器装饰foo,它将执行所需的条件日志记录。装饰器将 Logger 作为参数,并在对装饰函数的调用期间附加一个内存处理程序。装饰器还可以使用目标处理程序,应该进行刷新的级别以及缓冲区的容量(缓冲的记录数)进行参数化。这些默认为StreamHandler,分别写入sys.stderrlogging.ERROR100

这是脚本:

import logging
from logging.handlers import MemoryHandler
import sys

logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

def log_if_errors(logger, target_handler=None, flush_level=None, capacity=None):
    if target_handler is None:
        target_handler = logging.StreamHandler()
    if flush_level is None:
        flush_level = logging.ERROR
    if capacity is None:
        capacity = 100
    handler = MemoryHandler(capacity, flushLevel=flush_level, target=target_handler)

    def decorator(fn):
        def wrapper(*args, **kwargs):
            logger.addHandler(handler)
            try:
                return fn(*args, **kwargs)
            except Exception:
                logger.exception('call failed')
                raise
            finally:
                super(MemoryHandler, handler).flush()
                logger.removeHandler(handler)
        return wrapper

    return decorator

def write_line(s):
    sys.stderr.write('%s\n' % s)

def foo(fail=False):
    write_line('about to log at DEBUG ...')
    logger.debug('Actually logged at DEBUG')
    write_line('about to log at INFO ...')
    logger.info('Actually logged at INFO')
    write_line('about to log at WARNING ...')
    logger.warning('Actually logged at WARNING')
    if fail:
        write_line('about to log at ERROR ...')
        logger.error('Actually logged at ERROR')
        write_line('about to log at CRITICAL ...')
        logger.critical('Actually logged at CRITICAL')
    return fail

decorated_foo = log_if_errors(logger)(foo)

if __name__ == '__main__':
    logger.setLevel(logging.DEBUG)
    write_line('Calling undecorated foo with False')
    assert not foo(False)
    write_line('Calling undecorated foo with True')
    assert foo(True)
    write_line('Calling decorated foo with False')
    assert not decorated_foo(False)
    write_line('Calling decorated foo with True')
    assert decorated_foo(True)

运行此脚本时,应观察到以下输出:

Calling undecorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling undecorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
about to log at CRITICAL ...
Calling decorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling decorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
Actually logged at DEBUG
Actually logged at INFO
Actually logged at WARNING
Actually logged at ERROR
about to log at CRITICAL ...
Actually logged at CRITICAL

如您所见,仅当记录严重性为 ERROR 或更高的事件时,才会发生实际的日志记录输出,但是在这种情况下,还会记录严重性较低的任何先前事件。

您当然可以使用传统的装饰方式:

@log_if_errors(logger)
def foo(fail=False):
    ...

pass配置使用 UTC(GMT)格式化时间

有时您想使用 UTC 格式化时间,可以使用类似 UTCFormatter 的类来完成,如下所示:

import logging
import time

class UTCFormatter(logging.Formatter):
    converter = time.gmtime

然后您可以在代码中使用UTCFormatter而不是Formatter。如果要pass配置完成此操作,则可以将dictConfig() API 与以下完整示例所示的方法结合使用:

import logging
import logging.config
import time

class UTCFormatter(logging.Formatter):
    converter = time.gmtime

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'utc': {
            '()': UTCFormatter,
            'format': '%(asctime)s %(message)s',
        },
        'local': {
            'format': '%(asctime)s %(message)s',
        }
    },
    'handlers': {
        'console1': {
            'class': 'logging.StreamHandler',
            'formatter': 'utc',
        },
        'console2': {
            'class': 'logging.StreamHandler',
            'formatter': 'local',
        },
    },
    'root': {
        'handlers': ['console1', 'console2'],
   }
}

if __name__ == '__main__':
    logging.config.dictConfig(LOGGING)
    logging.warning('The local time is %s', time.asctime())

运行此脚本时,它应显示如下内容:

2015-10-17 12:53:29,501 The local time is Sat Oct 17 13:53:29 2015
2015-10-17 13:53:29,501 The local time is Sat Oct 17 13:53:29 2015

显示了如何将时间设置为本地时间和 UTC 格式,每个处理程序一个。

使用上下文 Management 器进行选择性日志记录

有时候,临时更改日志记录配置并在执行某些操作后将其还原会很有用。为此,上下文 Management 器是保存和还原日志上下文的最明显的方法。这是一个这样的上下文 Management 器的简单示例,它允许您有选择地更改日志记录级别并仅在上下文 Management 器的范围内添加日志记录处理程序:

import logging
import sys

class LoggingContext:
    def __init__(self, logger, level=None, handler=None, close=True):
        self.logger = logger
        self.level = level
        self.handler = handler
        self.close = close

    def __enter__(self):
        if self.level is not None:
            self.old_level = self.logger.level
            self.logger.setLevel(self.level)
        if self.handler:
            self.logger.addHandler(self.handler)

    def __exit__(self, et, ev, tb):
        if self.level is not None:
            self.logger.setLevel(self.old_level)
        if self.handler:
            self.logger.removeHandler(self.handler)
        if self.handler and self.close:
            self.handler.close()
        # implicit return of None => don't swallow exceptions

如果指定级别值,那么 Logger 的级别在上下文 Management 器覆盖的 with 块的范围内设置为该值。如果指定处理程序,则在进入该块时将其添加到 Logger,并在退出该块时将其删除。您也可以要求 Manager 在代码块退出时为您关闭处理程序-如果您不再需要该处理程序,则可以执行此操作。

为了说明它是如何工作的,我们可以在上面添加以下代码块:

if __name__ == '__main__':
    logger = logging.getLogger('foo')
    logger.addHandler(logging.StreamHandler())
    logger.setLevel(logging.INFO)
    logger.info('1. This should appear just once on stderr.')
    logger.debug('2. This should not appear.')
    with LoggingContext(logger, level=logging.DEBUG):
        logger.debug('3. This should appear once on stderr.')
    logger.debug('4. This should not appear.')
    h = logging.StreamHandler(sys.stdout)
    with LoggingContext(logger, level=logging.DEBUG, handler=h, close=True):
        logger.debug('5. This should appear twice - once on stderr and once on stdout.')
    logger.info('6. This should appear just once on stderr.')
    logger.debug('7. This should not appear.')

我们最初将 Logger 的级别设置为INFO,因此出现消息#1,而没有消息#2.然后,在下面的with块中将级别临时更改为DEBUG,因此出现消息#3.块退出后,Logger 的级别恢复为INFO,因此消息#4 不出现。在下一个with块中,我们再次将级别设置为DEBUG,但还添加了写入sys.stdout的处理程序。因此,消息#5 在控制台上出现两次(一次passstderr一次,一次passstdout)。 with语句完成后,状态与之前一样,因此出现消息 6(如消息 1),而消息 7 没有(如消息 2)。

如果我们运行生成的脚本,结果如下:

$ python logctx.py
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.

如果再次运行它,但是将stderr传递到/dev/null,则会看到以下内容,这是唯一写入stdout的消息:

$ python logctx.py 2>/dev/null
5. This should appear twice - once on stderr and once on stdout.

再一次,但将stdout输送到/dev/null,我们得到:

$ python logctx.py >/dev/null
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.

在这种情况下,按预期不会出现打印到stdout的消息#5.

当然,这里描述的方法可以通用化,例如临时附加日志记录过滤器。请注意,以上代码可在 Python 2 和 Python 3 中使用。

CLI 应用程序入门模板

这是一个示例,显示了如何进行:

  • 使用基于命令行参数的日志记录级别

  • 分派到单独文件中的多个子命令,所有子命令以一致的方式记录在同一级别

  • 利用简单,最少的配置

假设我们有一个命令行应用程序,其任务是停止,启动或重新启动某些服务。为了说明起见,可以将其组织为文件app.py,该文件是应用程序的主要脚本,并具有在start.pystop.pyrestart.py中实现的单个命令。进一步假设我们要pass命令行参数控制应用程序的详细程度,默认为logging.INFO。这是app.py可以写的一种方式:

import argparse
import importlib
import logging
import os
import sys

def main(args=None):
    scriptname = os.path.basename(__file__)
    parser = argparse.ArgumentParser(scriptname)
    levels = ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')
    parser.add_argument('--log-level', default='INFO', choices=levels)
    subparsers = parser.add_subparsers(dest='command',
                                       help='Available commands:')
    start_cmd = subparsers.add_parser('start', help='Start a service')
    start_cmd.add_argument('name', metavar='NAME',
                           help='Name of service to start')
    stop_cmd = subparsers.add_parser('stop',
                                     help='Stop one or more services')
    stop_cmd.add_argument('names', metavar='NAME', nargs='+',
                          help='Name of service to stop')
    restart_cmd = subparsers.add_parser('restart',
                                        help='Restart one or more services')
    restart_cmd.add_argument('names', metavar='NAME', nargs='+',
                             help='Name of service to restart')
    options = parser.parse_args()
    # the code to dispatch commands could all be in this file. For the purposes
    # of illustration only, we implement each command in a separate module.
    try:
        mod = importlib.import_module(options.command)
        cmd = getattr(mod, 'command')
    except (ImportError, AttributeError):
        print('Unable to find the code for command \'%s\'' % options.command)
        return 1
    # Could get fancy here and load configuration from file or dictionary
    logging.basicConfig(level=options.log_level,
                        format='%(levelname)s %(name)s %(message)s')
    cmd(options)

if __name__ == '__main__':
    sys.exit(main())

startstoprestart命令可以在单独的模块中实现,就像这样开始:

# start.py
import logging

logger = logging.getLogger(__name__)

def command(options):
    logger.debug('About to start %s', options.name)
    # actually do the command processing here ...
    logger.info('Started the \'%s\' service.', options.name)

并因此停止:

# stop.py
import logging

logger = logging.getLogger(__name__)

def command(options):
    n = len(options.names)
    if n == 1:
        plural = ''
        services = '\'%s\'' % options.names[0]
    else:
        plural = 's'
        services = ', '.join('\'%s\'' % name for name in options.names)
        i = services.rfind(', ')
        services = services[:i] + ' and ' + services[i + 2:]
    logger.debug('About to stop %s', services)
    # actually do the command processing here ...
    logger.info('Stopped the %s service%s.', services, plural)

并类似地重新启动:

# restart.py
import logging

logger = logging.getLogger(__name__)

def command(options):
    n = len(options.names)
    if n == 1:
        plural = ''
        services = '\'%s\'' % options.names[0]
    else:
        plural = 's'
        services = ', '.join('\'%s\'' % name for name in options.names)
        i = services.rfind(', ')
        services = services[:i] + ' and ' + services[i + 2:]
    logger.debug('About to restart %s', services)
    # actually do the command processing here ...
    logger.info('Restarted the %s service%s.', services, plural)

如果我们使用默认日志级别运行此应用程序,则将获得如下输出:

$ python app.py start foo
INFO start Started the 'foo' service.

$ python app.py stop foo bar
INFO stop Stopped the 'foo' and 'bar' services.

$ python app.py restart foo bar baz
INFO restart Restarted the 'foo', 'bar' and 'baz' services.

第一个单词是日志记录级别,第二个单词是事件记录位置的模块或软件包名称。

如果我们更改日志记录级别,则可以更改发送到日志的信息。例如,如果我们需要更多信息:

$ python app.py --log-level DEBUG start foo
DEBUG start About to start foo
INFO start Started the 'foo' service.

$ python app.py --log-level DEBUG stop foo bar
DEBUG stop About to stop 'foo' and 'bar'
INFO stop Stopped the 'foo' and 'bar' services.

$ python app.py --log-level DEBUG restart foo bar baz
DEBUG restart About to restart 'foo', 'bar' and 'baz'
INFO restart Restarted the 'foo', 'bar' and 'baz' services.

如果我们想要更少:

$ python app.py --log-level WARNING start foo
$ python app.py --log-level WARNING stop foo bar
$ python app.py --log-level WARNING restart foo bar baz

在这种情况下,这些命令不会向控制台输出任何内容,因为它们不会记录WARNING或更高级别的内容。

用于记录的 Qt GUI

时不时出现一个有关如何登录到 GUI 应用程序的问题。 Qt框架是流行的跨平台 UI 框架,具有使用PySide2PyQt5库的 Python 绑定。

以下示例显示如何登录到 Qt GUI。这引入了一个简单的QtHandler类,该类带有一个可调用对象,该类应该是进行 GUI 更新的主线程中的插槽。还创建了一个工作线程,以显示如何从 UI 本身(pass用于手动记录的按钮)以及在后台执行工作的工作线程(在这里,仅以随机级别随机记录消息)登录到 GUI 之间的短暂延迟)。

工作线程是使用 Qt 的QThread类而不是threading模块实现的,因为在某些情况下必须使用QThread,这可以更好地与其他Qt组件集成。

该代码应与PySide2PyQt5的最新版本一起使用。您应该能够使该方法适应 Qt 的早期版本。请参考代码片段中的 Comments 以获取更多详细信息。

import datetime
import logging
import random
import sys
import time

# Deal with minor differences between PySide2 and PyQt5
try:
    from PySide2 import QtCore, QtGui, QtWidgets
    Signal = QtCore.Signal
    Slot = QtCore.Slot
except ImportError:
    from PyQt5 import QtCore, QtGui, QtWidgets
    Signal = QtCore.pyqtSignal
    Slot = QtCore.pyqtSlot

logger = logging.getLogger(__name__)

#
# Signals need to be contained in a QObject or subclass in order to be correctly
# initialized.
#
class Signaller(QtCore.QObject):
    signal = Signal(str, logging.LogRecord)

#
# Output to a Qt GUI is only supposed to happen on the main thread. So, this
# handler is designed to take a slot function which is set up to run in the main
# thread. In this example, the function takes a string argument which is a
# formatted log message, and the log record which generated it. The formatted
# string is just a convenience - you could format a string for output any way
# you like in the slot function itself.
#
# You specify the slot function to do whatever GUI updates you want. The handler
# doesn't know or care about specific UI elements.
#
class QtHandler(logging.Handler):
    def __init__(self, slotfunc, *args, **kwargs):
        super(QtHandler, self).__init__(*args, **kwargs)
        self.signaller = Signaller()
        self.signaller.signal.connect(slotfunc)

    def emit(self, record):
        s = self.format(record)
        self.signaller.signal.emit(s, record)

#
# This example uses QThreads, which means that the threads at the Python level
# are named something like "Dummy-1". The function below gets the Qt name of the
# current thread.
#
def ctname():
    return QtCore.QThread.currentThread().objectName()

#
# Used to generate random levels for logging.
#
LEVELS = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
          logging.CRITICAL)

#
# This worker class represents work that is done in a thread separate to the
# main thread. The way the thread is kicked off to do work is via a button press
# that connects to a slot in the worker.
#
# Because the default threadName value in the LogRecord isn't much use, we add
# a qThreadName which contains the QThread name as computed above, and pass that
# value in an "extra" dictionary which is used to update the LogRecord with the
# QThread name.
#
# This example worker just outputs messages sequentially, interspersed with
# random delays of the order of a few seconds.
#
class Worker(QtCore.QObject):
    @Slot()
    def start(self):
        extra = {'qThreadName': ctname() }
        logger.debug('Started work', extra=extra)
        i = 1
        # Let the thread run until interrupted. This allows reasonably clean
        # thread termination.
        while not QtCore.QThread.currentThread().isInterruptionRequested():
            delay = 0.5 + random.random() * 2
            time.sleep(delay)
            level = random.choice(LEVELS)
            logger.log(level, 'Message after delay of %3.1f: %d', delay, i, extra=extra)
            i += 1

#
# Implement a simple UI for this cookbook example. This contains:
#
# * A read-only text edit window which holds formatted log messages
# * A button to start work and log stuff in a separate thread
# * A button to log something from the main thread
# * A button to clear the log window
#
class Window(QtWidgets.QWidget):

    COLORS = {
        logging.DEBUG: 'black',
        logging.INFO: 'blue',
        logging.WARNING: 'orange',
        logging.ERROR: 'red',
        logging.CRITICAL: 'purple',
    }

    def __init__(self, app):
        super(Window, self).__init__()
        self.app = app
        self.textedit = te = QtWidgets.QPlainTextEdit(self)
        # Set whatever the default monospace font is for the platform
        f = QtGui.QFont('nosuchfont')
        f.setStyleHint(f.Monospace)
        te.setFont(f)
        te.setReadOnly(True)
        PB = QtWidgets.QPushButton
        self.work_button = PB('Start background work', self)
        self.log_button = PB('Log a message at a random level', self)
        self.clear_button = PB('Clear log window', self)
        self.handler = h = QtHandler(self.update_status)
        # Remember to use qThreadName rather than threadName in the format string.
        fs = '%(asctime)s %(qThreadName)-12s %(levelname)-8s %(message)s'
        formatter = logging.Formatter(fs)
        h.setFormatter(formatter)
        logger.addHandler(h)
        # Set up to terminate the QThread when we exit
        app.aboutToQuit.connect(self.force_quit)

        # Lay out all the widgets
        layout = QtWidgets.QVBoxLayout(self)
        layout.addWidget(te)
        layout.addWidget(self.work_button)
        layout.addWidget(self.log_button)
        layout.addWidget(self.clear_button)
        self.setFixedSize(900, 400)

        # Connect the non-worker slots and signals
        self.log_button.clicked.connect(self.manual_update)
        self.clear_button.clicked.connect(self.clear_display)

        # Start a new worker thread and connect the slots for the worker
        self.start_thread()
        self.work_button.clicked.connect(self.worker.start)
        # Once started, the button should be disabled
        self.work_button.clicked.connect(lambda : self.work_button.setEnabled(False))

    def start_thread(self):
        self.worker = Worker()
        self.worker_thread = QtCore.QThread()
        self.worker.setObjectName('Worker')
        self.worker_thread.setObjectName('WorkerThread')  # for qThreadName
        self.worker.moveToThread(self.worker_thread)
        # This will start an event loop in the worker thread
        self.worker_thread.start()

    def kill_thread(self):
        # Just tell the worker to stop, then tell it to quit and wait for that
        # to happen
        self.worker_thread.requestInterruption()
        if self.worker_thread.isRunning():
            self.worker_thread.quit()
            self.worker_thread.wait()
        else:
            print('worker has already exited.')

    def force_quit(self):
        # For use when the window is closed
        if self.worker_thread.isRunning():
            self.kill_thread()

    # The functions below update the UI and run in the main thread because
    # that's where the slots are set up

    @Slot(str, logging.LogRecord)
    def update_status(self, status, record):
        color = self.COLORS.get(record.levelno, 'black')
        s = '<pre><font color="%s">%s</font></pre>' % (color, status)
        self.textedit.appendHtml(s)

    @Slot()
    def manual_update(self):
        # This function uses the formatted message passed in, but also uses
        # information from the record to format the message in an appropriate
        # color according to its severity (level).
        level = random.choice(LEVELS)
        extra = {'qThreadName': ctname() }
        logger.log(level, 'Manually logged!', extra=extra)

    @Slot()
    def clear_display(self):
        self.textedit.clear()

def main():
    QtCore.QThread.currentThread().setObjectName('MainThread')
    logging.getLogger().setLevel(logging.DEBUG)
    app = QtWidgets.QApplication(sys.argv)
    example = Window(app)
    example.show()
    sys.exit(app.exec_())

if __name__=='__main__':
    main()