On this page
Logging Cookbook
Author
- Vinay Sajip <vinay_sajip at red-dove dot com>
该页面包含许多与日志记录有关的食谱,这些食谱在过去被发现有用。
使用登录多个模块
多次调用logging.getLogger('someLogger')
会返回对同一 Logger 对象的引用。只要在同一 Python 解释器进程中,不仅在同一模块内,而且在各个模块之间都是如此。对于相同对象的引用是正确的。此外,应用程序代码可以在一个模块中定义和配置父 Logger,并在单独的模块中创建(但不配置)子 Logger,并且所有对子 Logger 的调用都将传递给父 Logger。这是一个主要模块:
import logging
import auxiliary_module
# create logger with 'spam_application'
logger = logging.getLogger('spam_application')
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# add the handlers to the logger
logger.addHandler(fh)
logger.addHandler(ch)
logger.info('creating an instance of auxiliary_module.Auxiliary')
a = auxiliary_module.Auxiliary()
logger.info('created an instance of auxiliary_module.Auxiliary')
logger.info('calling auxiliary_module.Auxiliary.do_something')
a.do_something()
logger.info('finished auxiliary_module.Auxiliary.do_something')
logger.info('calling auxiliary_module.some_function()')
auxiliary_module.some_function()
logger.info('done with auxiliary_module.some_function()')
这是辅助模块:
import logging
# create logger
module_logger = logging.getLogger('spam_application.auxiliary')
class Auxiliary:
def __init__(self):
self.logger = logging.getLogger('spam_application.auxiliary.Auxiliary')
self.logger.info('creating an instance of Auxiliary')
def do_something(self):
self.logger.info('doing something')
a = 1 + 1
self.logger.info('done doing something')
def some_function():
module_logger.info('received a call to "some_function"')
输出如下所示:
2005-03-23 23:47:11,663 - spam_application - INFO -
creating an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,665 - spam_application.auxiliary.Auxiliary - INFO -
creating an instance of Auxiliary
2005-03-23 23:47:11,665 - spam_application - INFO -
created an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,668 - spam_application - INFO -
calling auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,668 - spam_application.auxiliary.Auxiliary - INFO -
doing something
2005-03-23 23:47:11,669 - spam_application.auxiliary.Auxiliary - INFO -
done doing something
2005-03-23 23:47:11,670 - spam_application - INFO -
finished auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,671 - spam_application - INFO -
calling auxiliary_module.some_function()
2005-03-23 23:47:11,672 - spam_application.auxiliary - INFO -
received a call to 'some_function'
2005-03-23 23:47:11,673 - spam_application - INFO -
done with auxiliary_module.some_function()
从多个线程记录
从多个线程进行日志记录不需要任何特殊的工作。以下示例显示了从主(初始)线程和另一个线程进行的日志记录:
import logging
import threading
import time
def worker(arg):
while not arg['stop']:
logging.debug('Hi from myfunc')
time.sleep(0.5)
def main():
logging.basicConfig(level=logging.DEBUG, format='%(relativeCreated)6d %(threadName)s %(message)s')
info = {'stop': False}
thread = threading.Thread(target=worker, args=(info,))
thread.start()
while True:
try:
logging.debug('Hello from main')
time.sleep(0.75)
except KeyboardInterrupt:
info['stop'] = True
break
thread.join()
if __name__ == '__main__':
main()
运行时,脚本应打印如下内容:
0 Thread-1 Hi from myfunc
3 MainThread Hello from main
505 Thread-1 Hi from myfunc
755 MainThread Hello from main
1007 Thread-1 Hi from myfunc
1507 MainThread Hello from main
1508 Thread-1 Hi from myfunc
2010 Thread-1 Hi from myfunc
2258 MainThread Hello from main
2512 Thread-1 Hi from myfunc
3009 MainThread Hello from main
3013 Thread-1 Hi from myfunc
3515 Thread-1 Hi from myfunc
3761 MainThread Hello from main
4017 Thread-1 Hi from myfunc
4513 MainThread Hello from main
4518 Thread-1 Hi from myfunc
这显示了日志输出如预期的那样散布。当然,这种方法适用于比此处显示的线程更多的线程。
多个处理程序和格式化程序
Logger 是普通的 Python 对象。 addHandler()方法没有您可以添加的处理程序数量的最小或最大配额。有时,将应用程序将所有严重性的所有消息记录到文本文件,同时将错误或更高级别的消息记录到控制台将是有益的。要进行设置,只需配置适当的处理程序即可。应用程序代码中的日志记录调用将保持不变。这是对先前基于模块的简单配置示例的略微修改:
import logging
logger = logging.getLogger('simple_example')
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
fh.setFormatter(formatter)
# add the handlers to logger
logger.addHandler(ch)
logger.addHandler(fh)
# 'application' code
logger.debug('debug message')
logger.info('info message')
logger.warning('warn message')
logger.error('error message')
logger.critical('critical message')
请注意,“应用程序”代码并不关心多个处理程序。所做的更改只是添加和配置了名为* fh *的新处理程序。
使用较高或较低严重性过滤器创建新的处理程序的能力在编写和测试应用程序时非常有用。而不是使用许多print
语句进行调试,而使用logger.debug
:与 print 语句不同,在稍后您将其删除或 Comments 掉之后,logger.debug 语句可以在源代码中保持不变并保持休眠状态,直到再次需要它们为止。那时,唯一需要进行的更改是修改 Logger 和/或处理程序的严重性级别以进行调试。
登录到多个目的地
假设您要使用不同的消息格式和在不同的情况下登录控制台和文件。假设您要记录 DEBUG 或更高级别的消息到文件,而 INFO 或更高级别的消息记录到控制台。我们还假设该文件应包含时间戳,但控制台消息不应包含时间戳。这是实现此目的的方法:
import logging
# set up logging to file - see previous section for more details
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
datefmt='%m-%d %H:%M',
filename='/temp/myapp.log',
filemode='w')
# define a Handler which writes INFO messages or higher to the sys.stderr
console = logging.StreamHandler()
console.setLevel(logging.INFO)
# set a format which is simpler for console use
formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
# tell the handler to use this format
console.setFormatter(formatter)
# add the handler to the root logger
logging.getLogger('').addHandler(console)
# Now, we can log to the root logger, or any other logger. First the root...
logging.info('Jackdaws love my big sphinx of quartz.')
# Now, define a couple of other loggers which might represent areas in your
# application:
logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')
logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')
运行此命令时,在控制台上,您将看到
root : INFO Jackdaws love my big sphinx of quartz.
myapp.area1 : INFO How quickly daft jumping zebras vex.
myapp.area2 : WARNING Jail zesty vixen who grabbed pay from quack.
myapp.area2 : ERROR The five boxing wizards jump quickly.
在文件中,您会看到类似
10-22 22:19 root INFO Jackdaws love my big sphinx of quartz.
10-22 22:19 myapp.area1 DEBUG Quick zephyrs blow, vexing daft Jim.
10-22 22:19 myapp.area1 INFO How quickly daft jumping zebras vex.
10-22 22:19 myapp.area2 WARNING Jail zesty vixen who grabbed pay from quack.
10-22 22:19 myapp.area2 ERROR The five boxing wizards jump quickly.
如您所见,DEBUG 消息仅显示在文件中。其他消息发送到两个目的地。
本示例使用控制台和文件处理程序,但是您可以使用任意数量和所选处理程序的组合。
配置服务器示例
这是使用日志记录配置服务器的模块示例:
import logging
import logging.config
import time
import os
# read initial config file
logging.config.fileConfig('logging.conf')
# create and start listener on port 9999
t = logging.config.listen(9999)
t.start()
logger = logging.getLogger('simpleExample')
try:
# loop through logging calls to see the difference
# new configurations make, until Ctrl+C is pressed
while True:
logger.debug('debug message')
logger.info('info message')
logger.warning('warn message')
logger.error('error message')
logger.critical('critical message')
time.sleep(5)
except KeyboardInterrupt:
# cleanup
logging.config.stopListening()
t.join()
这是一个脚本,该脚本采用文件名并将该文件发送到服务器,并以新的日志记录配置正确地以二进制编码的长度开头:
#!/usr/bin/env python
import socket, sys, struct
with open(sys.argv[1], 'rb') as f:
data_to_send = f.read()
HOST = 'localhost'
PORT = 9999
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print('connecting...')
s.connect((HOST, PORT))
print('sending config...')
s.send(struct.pack('>L', len(data_to_send)))
s.send(data_to_send)
s.close()
print('complete')
处理阻止的处理程序
有时,您必须让日志记录处理程序来执行其工作,而又不会阻塞正在从中进行日志记录的线程。这在 Web 应用程序中很常见,尽管当然在其他情况下也会发生。
表现迟钝的常见罪魁祸首是SMTPHandler:由于开发人员无法控制的多种原因(例如,性能不佳的邮件或网络基础结构),发送电子邮件可能会花费很长时间。但是几乎所有基于网络的处理程序都可以阻止:即使SocketHandler操作也可能在后台进行 DNS 查询,这太慢了(该查询可以深入到套接字库代码的深处,Python 层以下以及控件之外)。
一种解决方案是使用两部分方法。对于第一部分,仅将QueueHandler附加到从性能关键线程访问的那些 Logger 上。他们只是简单地写入队列,队列的大小可以调整为足够大的容量,也可以初始化而没有上限。尽管可能需要捕获queue.Full异常作为代码中的预防措施,但是通常会很快接受对队列的写操作。如果您是在代码中包含性能关键线程的库开发人员,请确保对此进行记录(以及仅将QueueHandlers
附加到 Logger 的建议),以使其他使用您代码的开发人员受益。
解决方案的第二部分是QueueListener,它已被设计为QueueHandler的对应物。 QueueListener非常简单:它传递了一个队列和一些处理程序,并启动了一个内部线程,该线程侦听其队列中从QueueHandlers
(或LogRecords
的任何其他来源)发送的 LogRecords。 LogRecords
从队列中删除,并传递给处理程序进行处理。
拥有单独的QueueListener类的优点是您可以使用同一实例为多个QueueHandlers
服务。比起拥有现有处理程序类的线程版本,这将更加节省资源,后者将在每个处理程序中消耗一个线程,而没有任何特别的好处。
下面是使用这两个类的示例(Ellipsis了导入):
que = queue.Queue(-1) # no limit on size
queue_handler = QueueHandler(que)
handler = logging.StreamHandler()
listener = QueueListener(que, handler)
root = logging.getLogger()
root.addHandler(queue_handler)
formatter = logging.Formatter('%(threadName)s: %(message)s')
handler.setFormatter(formatter)
listener.start()
# The log output will display the thread which generated
# the event (the main thread) rather than the internal
# thread which monitors the internal queue. This is what
# you want to happen.
root.warning('Look out!')
listener.stop()
运行时将产生:
MainThread: Look out!
在版本 3.5 中进行了更改:在 Python 3.5 之前,QueueListener总是将从队列中接收到的所有消息传递给初始化它的每个处理程序。 (这是因为假定级别过滤都是在填充队列的另一侧完成的.)从 3.5 开始,可以pass将关键字参数respect_handler_level=True
传递给侦听器的构造函数来更改此行为。完成此操作后,侦听器会将每个消息的级别与处理程序的级别进行比较,并且仅在适当的情况下才将消息传递给处理程序。
pass网络发送和接收日志记录事件
假设您要pass网络发送日志记录事件,并在接收端进行处理。一种简单的方法是在发送端将SocketHandler实例附加到根 Logger:
import logging, logging.handlers
rootLogger = logging.getLogger('')
rootLogger.setLevel(logging.DEBUG)
socketHandler = logging.handlers.SocketHandler('localhost',
logging.handlers.DEFAULT_TCP_LOGGING_PORT)
# don't bother with a formatter, since a socket handler sends the event as
# an unformatted pickle
rootLogger.addHandler(socketHandler)
# Now, we can log to the root logger, or any other logger. First the root...
logging.info('Jackdaws love my big sphinx of quartz.')
# Now, define a couple of other loggers which might represent areas in your
# application:
logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')
logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')
在接收端,您可以使用socketserver模块设置接收器。这是一个基本的工作示例:
import pickle
import logging
import logging.handlers
import socketserver
import struct
class LogRecordStreamHandler(socketserver.StreamRequestHandler):
"""Handler for a streaming logging request.
This basically logs the record using whatever logging policy is
configured locally.
"""
def handle(self):
"""
Handle multiple requests - each expected to be a 4-byte length,
followed by the LogRecord in pickle format. Logs the record
according to whatever policy is configured locally.
"""
while True:
chunk = self.connection.recv(4)
if len(chunk) < 4:
break
slen = struct.unpack('>L', chunk)[0]
chunk = self.connection.recv(slen)
while len(chunk) < slen:
chunk = chunk + self.connection.recv(slen - len(chunk))
obj = self.unPickle(chunk)
record = logging.makeLogRecord(obj)
self.handleLogRecord(record)
def unPickle(self, data):
return pickle.loads(data)
def handleLogRecord(self, record):
# if a name is specified, we use the named logger rather than the one
# implied by the record.
if self.server.logname is not None:
name = self.server.logname
else:
name = record.name
logger = logging.getLogger(name)
# N.B. EVERY record gets logged. This is because Logger.handle
# is normally called AFTER logger-level filtering. If you want
# to do filtering, do it at the client end to save wasting
# cycles and network bandwidth!
logger.handle(record)
class LogRecordSocketReceiver(socketserver.ThreadingTCPServer):
"""
Simple TCP socket-based logging receiver suitable for testing.
"""
allow_reuse_address = True
def __init__(self, host='localhost',
port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
handler=LogRecordStreamHandler):
socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
self.abort = 0
self.timeout = 1
self.logname = None
def serve_until_stopped(self):
import select
abort = 0
while not abort:
rd, wr, ex = select.select([self.socket.fileno()],
[], [],
self.timeout)
if rd:
self.handle_request()
abort = self.abort
def main():
logging.basicConfig(
format='%(relativeCreated)5d %(name)-15s %(levelname)-8s %(message)s')
tcpserver = LogRecordSocketReceiver()
print('About to start TCP server...')
tcpserver.serve_until_stopped()
if __name__ == '__main__':
main()
首先运行服务器,然后运行 Client 端。在 Client 端,控制台上未打印任何内容。在服务器端,您应该看到类似以下内容:
About to start TCP server...
59 root INFO Jackdaws love my big sphinx of quartz.
59 myapp.area1 DEBUG Quick zephyrs blow, vexing daft Jim.
69 myapp.area1 INFO How quickly daft jumping zebras vex.
69 myapp.area2 WARNING Jail zesty vixen who grabbed pay from quack.
69 myapp.area2 ERROR The five boxing wizards jump quickly.
请注意,在某些情况下,pickle 存在一些安全问题。如果这些影响您,您可以使用替代序列化方案,方法是重写makePickle()
方法并在那里实现替代方案,并改编上述脚本以使用替代序列化方案。
将上下文信息添加到您的日志记录输出中
有时您希望日志输出除了传递给日志调用的参数外,还包含上下文信息。例如,在联网应用中,可能希望在日志中记录特定于 Client 端的信息(例如,远程 Client 端的用户名或 IP 地址)。尽管可以使用* extra *参数来实现此目的,但是以这种方式传递信息并不总是很方便。尽管可能很想在每个连接的基础上创建Logger
个实例,但这不是一个好主意,因为这些实例不会被垃圾回收。尽管实际上这不是问题,但当Logger
实例的数量取决于您要在记录应用程序时使用的粒度级别时,如果Logger
实例的数量实际上变得无界,则可能很难 Management。
使用 LoggerAdapters 传递上下文信息
传递上下文信息与日志事件信息一起输出的一种简单方法是使用LoggerAdapter
类。此类设计为看起来像Logger
,因此您可以调用debug()
,info()
,warning()
,error()
,exception()
,critical()
和log()
。这些方法与Logger
中的方法具有相同的签名,因此您可以互换使用两种类型的实例。
创建LoggerAdapter
的实例时,您将为其传递Logger
实例和一个包含上下文信息的类似 dict 的对象。当您在LoggerAdapter
的实例上调用其中一个日志记录方法时,它会将调用委派给传递给其构造函数的Logger
的基础实例,并安排在委派的调用中传递上下文信息。这是LoggerAdapter
的代码段:
def debug(self, msg, /, *args, **kwargs):
"""
Delegate a debug call to the underlying logger, after adding
contextual information from this adapter instance.
"""
msg, kwargs = self.process(msg, kwargs)
self.logger.debug(msg, *args, **kwargs)
LoggerAdapter
的process()
方法是将上下文信息添加到日志输出的位置。它传递了日志记录调用的消息和关键字参数,并将它们的(可能)修改后的版本传递回用于基础 Logger 的调用中。该方法的默认实现不考虑消息,而是在关键字参数中插入“额外”键,其值是传递给构造函数的类似 dict 的对象。当然,如果您在对适配器的调用中传递了“ extra”关键字参数,则它将被静默覆盖。
使用“额外”的优点是,将类似 dict 的对象中的值合并到LogRecord
实例的__dict_中,从而使您可以将自定义字符串与Formatter
实例一起使用,这些字符串了解类似 dict 的对象的键。如果您需要其他方法,例如如果要在消息字符串前添加上下文信息或将上下文信息附加到消息字符串,则只需要子类LoggerAdapter
并重写process()
即可完成所需的操作。这是一个简单的示例:
class CustomAdapter(logging.LoggerAdapter):
"""
This example adapter expects the passed in dict-like object to have a
'connid' key, whose value in brackets is prepended to the log message.
"""
def process(self, msg, kwargs):
return '[%s] %s' % (self.extra['connid'], msg), kwargs
您可以这样使用:
logger = logging.getLogger(__name__)
adapter = CustomAdapter(logger, {'connid': some_conn_id})
然后,您登录到适配器的所有事件的值都将在日志消息之前加上some_conn_id
的值。
使用除字典以外的对象传递上下文信息
您不需要将实际的 dict 传递给LoggerAdapter
-您可以传递实现__getitem__
和__iter__
的类的实例,这样它看起来像是记录日志的 dict。如果要动态生成值(而 dict 中的值将是恒定的),这将很有用。
使用过滤器传递上下文信息
您还可以使用用户定义的Filter
将上下文信息添加到日志输出中。允许Filter
个实例修改传递给它们的LogRecords
,包括添加其他属性,然后可以使用适当的格式字符串或需要的自定义Formatter
输出这些属性。
例如,在 Web 应用程序中,可以将正在处理的请求(或至少是其中有趣的部分)存储在 threadlocal(threading.local)变量中,然后从Filter
访问以添加例如来自请求的信息- ,即LogRecord
的远程 IP 地址和远程用户的用户名-,使用属性名'ip'和'user',如上面的LoggerAdapter
示例所示。在那种情况下,可以使用相同的格式字符串来获得与上面所示类似的输出。这是一个示例脚本:
import logging
from random import choice
class ContextFilter(logging.Filter):
"""
This is a filter which injects contextual information into the log.
Rather than use actual contextual information, we just use random
data in this demo.
"""
USERS = ['jim', 'fred', 'sheila']
IPS = ['123.231.231.123', '127.0.0.1', '192.168.0.1']
def filter(self, record):
record.ip = choice(ContextFilter.IPS)
record.user = choice(ContextFilter.USERS)
return True
if __name__ == '__main__':
levels = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL)
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)-15s %(name)-5s %(levelname)-8s IP: %(ip)-15s User: %(user)-8s %(message)s')
a1 = logging.getLogger('a.b.c')
a2 = logging.getLogger('d.e.f')
f = ContextFilter()
a1.addFilter(f)
a2.addFilter(f)
a1.debug('A debug message')
a1.info('An info message with %s', 'some parameters')
for x in range(10):
lvl = choice(levels)
lvlname = logging.getLevelName(lvl)
a2.log(lvl, 'A message at %s level with %d %s', lvlname, 2, 'parameters')
运行时会产生以下内容:
2010-09-06 22:38:15,292 a.b.c DEBUG IP: 123.231.231.123 User: fred A debug message
2010-09-06 22:38:15,300 a.b.c INFO IP: 192.168.0.1 User: sheila An info message with some parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1 User: sheila A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR IP: 127.0.0.1 User: jim A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG IP: 127.0.0.1 User: sheila A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR IP: 123.231.231.123 User: fred A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 192.168.0.1 User: jim A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1 User: sheila A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG IP: 192.168.0.1 User: jim A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f ERROR IP: 127.0.0.1 User: sheila A message at ERROR level with 2 parameters
2010-09-06 22:38:15,301 d.e.f DEBUG IP: 123.231.231.123 User: fred A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f INFO IP: 123.231.231.123 User: fred A message at INFO level with 2 parameters
从多个进程登录到单个文件
尽管日志记录是线程安全的,并且支持在单个进程中从多个线程登录到单个文件,但是不支持从多个进程*登录到单个文件,因为没有序列化访问的标准方法跨 Python 中的多个进程复制到单个文件。如果需要从多个进程登录到单个文件,执行此操作的一种方法是使所有进程都登录到SocketHandler
,并有一个单独的进程,该进程实现一个套接字服务器,该服务器从套接字读取并记录到文件。 (如果愿意,可以在现有进程之一中指定一个线程来执行此Function.)This section详细记录了此方法,并包括一个有效的套接字接收器,可以用作您适应自己的起点应用程序。
您还可以编写自己的处理程序,该处理程序使用multiprocessing模块中的Lock类来序列化对进程的文件访问。现有的FileHandler
和子类目前不使用multiprocessing,尽管将来可能会使用。请注意,目前multiprocessing模块并非在所有平台上都提供工作锁定Function(请参见https://bugs.python.org/issue3770)。
或者,您可以使用Queue
和QueueHandler将所有日志记录事件发送到多进程应用程序中的某个进程。以下示例脚本演示了如何执行此操作;在示例中,一个单独的侦听器进程侦听其他进程发送的事件,并根据其自己的日志记录配置记录这些事件。尽管该示例仅演示了一种实现方式(例如,您可能要使用侦听器线程而不是单独的侦听器进程–实现将是类似的),但它确实为侦听器和其他进程提供了完全不同的日志记录配置在您的应用程序中,并且可以用作满足您自己特定要求的代码的基础:
# You'll need these imports in your own code
import logging
import logging.handlers
import multiprocessing
# Next two import lines for this demo only
from random import choice, random
import time
#
# Because you'll want to define the logging configurations for listener and workers, the
# listener and worker process functions take a configurer parameter which is a callable
# for configuring logging for that process. These functions are also passed the queue,
# which they use for communication.
#
# In practice, you can configure the listener however you want, but note that in this
# simple example, the listener does not apply level or filter logic to received records.
# In practice, you would probably want to do this logic in the worker processes, to avoid
# sending events which would be filtered out between processes.
#
# The size of the rotated files is made small so you can see the results easily.
def listener_configurer():
root = logging.getLogger()
h = logging.handlers.RotatingFileHandler('mptest.log', 'a', 300, 10)
f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
h.setFormatter(f)
root.addHandler(h)
# This is the listener process top-level loop: wait for logging events
# (LogRecords)on the queue and handle them, quit when you get a None for a
# LogRecord.
def listener_process(queue, configurer):
configurer()
while True:
try:
record = queue.get()
if record is None: # We send this as a sentinel to tell the listener to quit.
break
logger = logging.getLogger(record.name)
logger.handle(record) # No level or filter logic applied - just do it!
except Exception:
import sys, traceback
print('Whoops! Problem:', file=sys.stderr)
traceback.print_exc(file=sys.stderr)
# Arrays used for random selections in this demo
LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL]
LOGGERS = ['a.b.c', 'd.e.f']
MESSAGES = [
'Random message #1',
'Random message #2',
'Random message #3',
]
# The worker configuration is done at the start of the worker process run.
# Note that on Windows you can't rely on fork semantics, so each process
# will run the logging configuration code when it starts.
def worker_configurer(queue):
h = logging.handlers.QueueHandler(queue) # Just the one handler needed
root = logging.getLogger()
root.addHandler(h)
# send all messages, for demo; no other level or filter logic applied.
root.setLevel(logging.DEBUG)
# This is the worker process top-level loop, which just logs ten events with
# random intervening delays before terminating.
# The print messages are just so you know it's doing something!
def worker_process(queue, configurer):
configurer(queue)
name = multiprocessing.current_process().name
print('Worker started: %s' % name)
for i in range(10):
time.sleep(random())
logger = logging.getLogger(choice(LOGGERS))
level = choice(LEVELS)
message = choice(MESSAGES)
logger.log(level, message)
print('Worker finished: %s' % name)
# Here's where the demo gets orchestrated. Create the queue, create and start
# the listener, create ten workers and start them, wait for them to finish,
# then send a None to the queue to tell the listener to finish.
def main():
queue = multiprocessing.Queue(-1)
listener = multiprocessing.Process(target=listener_process,
args=(queue, listener_configurer))
listener.start()
workers = []
for i in range(10):
worker = multiprocessing.Process(target=worker_process,
args=(queue, worker_configurer))
workers.append(worker)
worker.start()
for w in workers:
w.join()
queue.put_nowait(None)
listener.join()
if __name__ == '__main__':
main()
上述脚本的一个变体将日志记录保留在主进程中的单独线程中:
import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue
import random
import threading
import time
def logger_thread(q):
while True:
record = q.get()
if record is None:
break
logger = logging.getLogger(record.name)
logger.handle(record)
def worker_process(q):
qh = logging.handlers.QueueHandler(q)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(qh)
levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL]
loggers = ['foo', 'foo.bar', 'foo.bar.baz',
'spam', 'spam.ham', 'spam.ham.eggs']
for i in range(100):
lvl = random.choice(levels)
logger = logging.getLogger(random.choice(loggers))
logger.log(lvl, 'Message no. %d', i)
if __name__ == '__main__':
q = Queue()
d = {
'version': 1,
'formatters': {
'detailed': {
'class': 'logging.Formatter',
'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'INFO',
},
'file': {
'class': 'logging.FileHandler',
'filename': 'mplog.log',
'mode': 'w',
'formatter': 'detailed',
},
'foofile': {
'class': 'logging.FileHandler',
'filename': 'mplog-foo.log',
'mode': 'w',
'formatter': 'detailed',
},
'errors': {
'class': 'logging.FileHandler',
'filename': 'mplog-errors.log',
'mode': 'w',
'level': 'ERROR',
'formatter': 'detailed',
},
},
'loggers': {
'foo': {
'handlers': ['foofile']
}
},
'root': {
'level': 'DEBUG',
'handlers': ['console', 'file', 'errors']
},
}
workers = []
for i in range(5):
wp = Process(target=worker_process, name='worker %d' % (i + 1), args=(q,))
workers.append(wp)
wp.start()
logging.config.dictConfig(d)
lp = threading.Thread(target=logger_thread, args=(q,))
lp.start()
# At this point, the main process could do some useful work of its own
# Once it's done that, it can wait for the workers to terminate...
for wp in workers:
wp.join()
# And now tell the logging thread to finish up, too
q.put(None)
lp.join()
此变体说明了如何为特定的 Logger 应用配置-例如foo
Logger 具有一个特殊的处理程序,该处理程序将foo
子系统中的所有事件存储在文件mplog-foo.log
中。日志记录机制将在主进程中使用它(即使在工作进程中生成了日志记录事件)也可以将消息定向到适当的目的地。
Using concurrent.futures.ProcessPoolExecutor
如果要使用concurrent.futures.ProcessPoolExecutor启动工作进程,则需要稍微不同地创建队列。代替
queue = multiprocessing.Queue(-1)
你应该使用
queue = multiprocessing.Manager().Queue(-1) # also works with the examples above
然后您可以从中替换工作程序创建:
workers = []
for i in range(10):
worker = multiprocessing.Process(target=worker_process,
args=(queue, worker_configurer))
workers.append(worker)
worker.start()
for w in workers:
w.join()
为此(记住首先导入concurrent.futures):
with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
for i in range(10):
executor.submit(worker_process, queue, worker_configurer)
使用文件旋转
有时您想让日志文件增长到一定大小,然后打开一个新文件并记录到该文件。您可能需要保留一定数量的这些文件,并且在创建了那么多文件之后,旋转文件,以使文件数和文件大小都保持有界。对于此使用模式,日志记录包提供了RotatingFileHandler
:
import glob
import logging
import logging.handlers
LOG_FILENAME = 'logging_rotatingfile_example.out'
# Set up a specific logger with our desired output level
my_logger = logging.getLogger('MyLogger')
my_logger.setLevel(logging.DEBUG)
# Add the log message handler to the logger
handler = logging.handlers.RotatingFileHandler(
LOG_FILENAME, maxBytes=20, backupCount=5)
my_logger.addHandler(handler)
# Log some messages
for i in range(20):
my_logger.debug('i = %d' % i)
# See what files are created
logfiles = glob.glob('%s*' % LOG_FILENAME)
for filename in logfiles:
print(filename)
结果应该是 6 个单独的文件,每个文件都包含应用程序的日志历史记录的一部分:
logging_rotatingfile_example.out
logging_rotatingfile_example.out.1
logging_rotatingfile_example.out.2
logging_rotatingfile_example.out.3
logging_rotatingfile_example.out.4
logging_rotatingfile_example.out.5
最新文件始终为logging_rotatingfile_example.out
,并且每次达到大小限制时都会使用后缀.1
重命名。每个现有备份文件都被重命名以增加后缀(.1
变为.2
等),并且.6
文件被删除。
显然,作为一个极端示例,此示例将日志长度设置得太小。您可能希望将* maxBytes *设置为适当的值。
使用其他格式样式
将日志记录添加到 Python 标准库后,格式化具有可变内容的消息的唯一方法是使用%-formatting 方法。从那时起,Python 获得了两种新的格式化方法:string.Template(在 Python 2.4 中添加)和str.format()(在 Python 2.6 中添加)。
日志记录(从 3.2 版开始)为这两种其他格式设置样式提供了改进的支持。增强了Formatter
类,以使用名为style
的附加可选关键字参数。默认为'%'
,但其他可能的值为'{'
和'$'
,它们对应于其他两种格式样式。向后兼容性默认情况下保持(如您所愿),但是pass显式指定样式参数,您可以指定适用于str.format()或string.Template的格式字符串。这是一个示例控制台会话,展示了各种可能性:
>>> import logging
>>> root = logging.getLogger()
>>> root.setLevel(logging.DEBUG)
>>> handler = logging.StreamHandler()
>>> bf = logging.Formatter('{asctime} {name} {levelname:8s} {message}',
... style='{')
>>> handler.setFormatter(bf)
>>> root.addHandler(handler)
>>> logger = logging.getLogger('foo.bar')
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:11:55,341 foo.bar DEBUG This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:12:11,526 foo.bar CRITICAL This is a CRITICAL message
>>> df = logging.Formatter('$asctime $name ${levelname} $message',
... style='$')
>>> handler.setFormatter(df)
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:13:06,924 foo.bar DEBUG This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:13:11,494 foo.bar CRITICAL This is a CRITICAL message
>>>
请注意,finally输出到日志的日志消息的格式完全独立于单个日志消息的构造方式。那仍然可以使用%格式,如下所示:
>>> logger.error('This is an%s %s %s', 'other,', 'ERROR,', 'message')
2010-10-28 15:19:29,833 foo.bar ERROR This is another, ERROR, message
>>>
记录调用(logger.debug()
,logger.info()
等)仅采用实际记录消息本身的位置参数,而关键字参数仅用于确定如何处理实际记录调用的选项(例如exc_info
关键字参数指示应记录回溯信息) ,或extra
关键字参数来指示要添加到日志中的其他上下文信息)。因此,您不能使用str.format()或string.Template语法直接进行日志记录调用,因为日志记录包在内部使用%-formatting 合并格式字符串和变量参数。在保留向后兼容性的同时,不会对此进行任何更改,因为现有代码中存在的所有日志记录调用都将使用%格式的字符串。
但是,有一种方法可以使用\ {}-和$-格式来构造您的单个日志消息。回想一下,对于一条消息,您可以使用任意对象作为消息格式字符串,并且日志记录程序包将对该对象调用str()
以获取实际的格式字符串。请考虑以下两类:
class BraceMessage:
def __init__(self, fmt, /, *args, **kwargs):
self.fmt = fmt
self.args = args
self.kwargs = kwargs
def __str__(self):
return self.fmt.format(*self.args, **self.kwargs)
class DollarMessage:
def __init__(self, fmt, /, **kwargs):
self.fmt = fmt
self.kwargs = kwargs
def __str__(self):
from string import Template
return Template(self.fmt).substitute(**self.kwargs)
这些中的任何一个都可以代替格式字符串,以允许\ {}-或$ -formatting 用来构建实际的“消息”部分,该部分出现在格式化日志输出中,代替“%(message)s”或“{message}”或“ $ message”。每当您要记录某些内容时,都很难使用类名,但是如果您使用诸如__(双下划线的别名,请不要与混淆,单个下划线用作gettext.gettext()的别名/别名)的名称非常可口。或其弟兄)。
上面的类不包含在 Python 中,尽管它们很容易复制并粘贴到您自己的代码中。它们可以按如下方式使用(假设它们在名为wherever
的模块中语句):
>>> from wherever import BraceMessage as __
>>> print(__('Message with {0} {name}', 2, name='placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})',
... point=p))
Message with coordinates: (0.50, 0.50)
>>> from wherever import DollarMessage as __
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>
尽管上面的示例使用print()
来显示格式化的工作原理,但是您当然会使用logger.debug()
或类似的方法来实际记录这种格式。
需要注意的一件事是,使用这种方法不会对性能造成任何重大影响:实际的格式设置不是在进行日志记录调用时发生,而是在(如果有)记录的消息实际上将由处理程序输出到日志时发生。因此,可能会使您不满意的唯一不寻常的事情是括号会围绕格式字符串和参数,而不仅仅是格式字符串。这是因为_表示法只是对 XXXMessage 类之一的构造函数调用的语法糖。
如果愿意,可以使用LoggerAdapter
来达到与上述类似的效果,如以下示例所示:
import logging
class Message:
def __init__(self, fmt, args):
self.fmt = fmt
self.args = args
def __str__(self):
return self.fmt.format(*self.args)
class StyleAdapter(logging.LoggerAdapter):
def __init__(self, logger, extra=None):
super(StyleAdapter, self).__init__(logger, extra or {})
def log(self, level, msg, /, *args, **kwargs):
if self.isEnabledFor(level):
msg, kwargs = self.process(msg, kwargs)
self.logger._log(level, Message(msg, args), (), **kwargs)
logger = StyleAdapter(logging.getLogger(__name__))
def main():
logger.debug('Hello, {}', 'world!')
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
main()
当在 Python 3.2 或更高版本上运行时,以上脚本应记录消息Hello, world!
。
Customizing LogRecord
每个日志记录事件都由一个LogRecord实例表示。当记录了事件且未按记录程序的级别过滤掉事件时,将创建LogRecord,并填充有关该事件的信息,然后将其传递给该记录程序的处理程序(及其祖先,直到并包括该记录程序,在该记录程序中进一步传播)层次结构已禁用)。在 Python 3.2 之前,只有两个地方完成了此创建:
Logger.makeRecord(),这是在记录事件的正常过程中调用的。这直接调用LogRecord来创建实例。
makeLogRecord(),用包含要添加到 LogRecord 的属性的字典来调用。通常在网络上已接收到合适的字典时调用该字典(例如,passSocketHandler以 pickle 的形式,或passHTTPHandler以 JSON 的形式)。
这通常意味着,如果您需要对LogRecord做任何特殊的事情,则必须执行以下一项操作。
创建自己的Logger子类,该子类将覆盖Logger.makeRecord(),并在实例化您关心的所有 Logger 之前使用setLoggerClass()对其进行设置。
在(例如)几个不同的库想做不同的事情的情况下,第一种方法会有些笨拙。每个人都会try设置自己的Logger子类,而最后一个这样做的人将获胜。
第二种方法在很多情况下都能很好地发挥作用,但不允许您例如使用专门的LogRecord子类。库开发人员可以在 Logger 上设置一个合适的过滤器,但是他们必须记住每次引入新 Logger 时都要这样做(他们只需添加新的软件包或模块并执行
logger = logging.getLogger(__name__)
在模块级别)。这可能是太多要考虑的事情。开发人员也可以将过滤器添加到附加到其顶层 Logger 的NullHandler上,但是如果应用程序开发人员将处理程序附加到较低层的库 Logger 上,则不会调用该过滤器-因此该处理程序的输出将无法反映该过滤器的意图。库开发人员。
在 Python 3.2 及更高版本中,LogRecord创建是pass工厂指定的,您可以指定该工厂。您可以使用setLogRecordFactory()设置工厂,也可以使用getLogRecordFactory()询问工厂。使用LogRecord构造函数相同的签名来调用工厂,因为LogRecord是工厂的默认设置。
这种方法允许自定义工厂控制 LogRecord 创建的所有方面。例如,您可以返回子类,或仅使用以下类似的模式将一些其他属性添加到创建后的 Logging:
old_factory = logging.getLogRecordFactory()
def record_factory(*args, **kwargs):
record = old_factory(*args, **kwargs)
record.custom_attribute = 0xdecafbad
return record
logging.setLogRecordFactory(record_factory)
这种模式允许不同的库将工厂链接在一起,并且只要它们不覆盖彼此的属性或无意覆盖作为标准提供的属性,就不会感到惊讶。但是,应该记住,链中的每个链接都会为所有日志记录操作增加运行时开销,并且该技术仅应在使用Filter无法提供所需结果时使用。
子类化 QueueHandler-ZeroMQ 示例
您可以使用QueueHandler
子类将消息发送到其他类型的队列,例如 ZeroMQ“发布”套接字。在下面的示例中,套接字是单独创建的,并传递给处理程序(作为其“队列”):
import zmq # using pyzmq, the Python binding for ZeroMQ
import json # for serializing records portably
ctx = zmq.Context()
sock = zmq.Socket(ctx, zmq.PUB) # or zmq.PUSH, or other suitable value
sock.bind('tcp://*:5556') # or wherever
class ZeroMQSocketHandler(QueueHandler):
def enqueue(self, record):
self.queue.send_json(record.__dict__)
handler = ZeroMQSocketHandler(sock)
当然,还有其他组织方式,例如,传递处理程序创建套接字所需的数据:
class ZeroMQSocketHandler(QueueHandler):
def __init__(self, uri, socktype=zmq.PUB, ctx=None):
self.ctx = ctx or zmq.Context()
socket = zmq.Socket(self.ctx, socktype)
socket.bind(uri)
super().__init__(socket)
def enqueue(self, record):
self.queue.send_json(record.__dict__)
def close(self):
self.queue.close()
子类化 QueueListener-ZeroMQ 示例
您还可以子类QueueListener
来从其他种类的队列中获取消息,例如 ZeroMQ'subscribe'套接字。这是一个例子:
class ZeroMQSocketListener(QueueListener):
def __init__(self, uri, /, *handlers, **kwargs):
self.ctx = kwargs.get('ctx') or zmq.Context()
socket = zmq.Socket(self.ctx, zmq.SUB)
socket.setsockopt_string(zmq.SUBSCRIBE, '') # subscribe to everything
socket.connect(uri)
super().__init__(socket, *handlers, **kwargs)
def dequeue(self):
msg = self.queue.recv_json()
return logging.makeLogRecord(msg)
See also
Module logging
日志记录模块的 API 参考。
Module logging.config
日志记录模块的配置 API。
Module logging.handlers
日志记录模块随附的有用处理程序。
示例基于字典的配置
以下是日志记录配置字典的示例-取自Django 项目的文档。该字典传递给dictConfig()以使配置生效:
LOGGING = {
'version': 1,
'disable_existing_loggers': True,
'formatters': {
'verbose': {
'format': '%(levelname)s %(asctime)s %(module)s %(process)d %(thread)d %(message)s'
},
'simple': {
'format': '%(levelname)s %(message)s'
},
},
'filters': {
'special': {
'()': 'project.logging.SpecialFilter',
'foo': 'bar',
}
},
'handlers': {
'null': {
'level':'DEBUG',
'class':'django.utils.log.NullHandler',
},
'console':{
'level':'DEBUG',
'class':'logging.StreamHandler',
'formatter': 'simple'
},
'mail_admins': {
'level': 'ERROR',
'class': 'django.utils.log.AdminEmailHandler',
'filters': ['special']
}
},
'loggers': {
'django': {
'handlers':['null'],
'propagate': True,
'level':'INFO',
},
'django.request': {
'handlers': ['mail_admins'],
'level': 'ERROR',
'propagate': False,
},
'myproject.custom': {
'handlers': ['console', 'mail_admins'],
'level': 'INFO',
'filters': ['special']
}
}
}
有关此配置的更多信息,请参见 Django 文档的relevant section。
使用旋转器和命名器来自定义日志旋转处理
以下代码段给出了如何定义命名器和旋转器的示例,该代码显示了基于 zlib 的日志文件压缩:
def namer(name):
return name + ".gz"
def rotator(source, dest):
with open(source, "rb") as sf:
data = sf.read()
compressed = zlib.compress(data, 9)
with open(dest, "wb") as df:
df.write(compressed)
os.remove(source)
rh = logging.handlers.RotatingFileHandler(...)
rh.rotator = rotator
rh.namer = namer
这些不是“ true” .gz 文件,因为它们是裸压缩数据,没有像实际 gzip 文件中那样的“容器”。此代码段仅用于说明目的。
更详细的 multiprocessing 示例
以下工作示例显示了如何将日志记录与使用配置文件的多重处理一起使用。这些配置非常简单,但是可以说明在实际的 multiprocessing 方案中如何实现更复杂的配置。
在该示例中,主进程产生一个侦听器进程和一些辅助进程。每个主进程,侦听器和工作程序都有三个单独的配置(工作程序都共享相同的配置)。我们可以看到主要过程中的日志记录,工作人员如何记录到 QueueHandler 以及侦听器如何实现 QueueListener 和更复杂的日志记录配置,以及安排将pass队列接收的事件分派到配置中指定的处理程序。请注意,这些配置仅是说明性的,但是您应该能够使此示例适应您自己的方案。
这是脚本-文档字符串和 Comments 有望解释其工作原理:
import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue, Event, current_process
import os
import random
import time
class MyHandler:
"""
A simple handler for logging events. It runs in the listener process and
dispatches events to loggers based on the name in the received record,
which then get dispatched, by the logging system, to the handlers
configured for those loggers.
"""
def handle(self, record):
if record.name == "root":
logger = logging.getLogger()
else:
logger = logging.getLogger(record.name)
if logger.isEnabledFor(record.levelno):
# The process name is transformed just to show that it's the listener
# doing the logging to files and console
record.processName = '%s (for %s)' % (current_process().name, record.processName)
logger.handle(record)
def listener_process(q, stop_event, config):
"""
This could be done in the main process, but is just done in a separate
process for illustrative purposes.
This initialises logging according to the specified configuration,
starts the listener and waits for the main process to signal completion
via the event. The listener is then stopped, and the process exits.
"""
logging.config.dictConfig(config)
listener = logging.handlers.QueueListener(q, MyHandler())
listener.start()
if os.name == 'posix':
# On POSIX, the setup logger will have been configured in the
# parent process, but should have been disabled following the
# dictConfig call.
# On Windows, since fork isn't used, the setup logger won't
# exist in the child, so it would be created and the message
# would appear - hence the "if posix" clause.
logger = logging.getLogger('setup')
logger.critical('Should not appear, because of disabled logger ...')
stop_event.wait()
listener.stop()
def worker_process(config):
"""
A number of these are spawned for the purpose of illustration. In
practice, they could be a heterogeneous bunch of processes rather than
ones which are identical to each other.
This initialises logging according to the specified configuration,
and logs a hundred messages with random levels to randomly selected
loggers.
A small sleep is added to allow other processes a chance to run. This
is not strictly needed, but it mixes the output from the different
processes a bit more than if it's left out.
"""
logging.config.dictConfig(config)
levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL]
loggers = ['foo', 'foo.bar', 'foo.bar.baz',
'spam', 'spam.ham', 'spam.ham.eggs']
if os.name == 'posix':
# On POSIX, the setup logger will have been configured in the
# parent process, but should have been disabled following the
# dictConfig call.
# On Windows, since fork isn't used, the setup logger won't
# exist in the child, so it would be created and the message
# would appear - hence the "if posix" clause.
logger = logging.getLogger('setup')
logger.critical('Should not appear, because of disabled logger ...')
for i in range(100):
lvl = random.choice(levels)
logger = logging.getLogger(random.choice(loggers))
logger.log(lvl, 'Message no. %d', i)
time.sleep(0.01)
def main():
q = Queue()
# The main process gets a simple configuration which prints to the console.
config_initial = {
'version': 1,
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'INFO'
}
},
'root': {
'handlers': ['console'],
'level': 'DEBUG'
}
}
# The worker process configuration is just a QueueHandler attached to the
# root logger, which allows all messages to be sent to the queue.
# We disable existing loggers to disable the "setup" logger used in the
# parent process. This is needed on POSIX because the logger will
# be there in the child following a fork().
config_worker = {
'version': 1,
'disable_existing_loggers': True,
'handlers': {
'queue': {
'class': 'logging.handlers.QueueHandler',
'queue': q
}
},
'root': {
'handlers': ['queue'],
'level': 'DEBUG'
}
}
# The listener process configuration shows that the full flexibility of
# logging configuration is available to dispatch events to handlers however
# you want.
# We disable existing loggers to disable the "setup" logger used in the
# parent process. This is needed on POSIX because the logger will
# be there in the child following a fork().
config_listener = {
'version': 1,
'disable_existing_loggers': True,
'formatters': {
'detailed': {
'class': 'logging.Formatter',
'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
},
'simple': {
'class': 'logging.Formatter',
'format': '%(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'simple',
'level': 'INFO'
},
'file': {
'class': 'logging.FileHandler',
'filename': 'mplog.log',
'mode': 'w',
'formatter': 'detailed'
},
'foofile': {
'class': 'logging.FileHandler',
'filename': 'mplog-foo.log',
'mode': 'w',
'formatter': 'detailed'
},
'errors': {
'class': 'logging.FileHandler',
'filename': 'mplog-errors.log',
'mode': 'w',
'formatter': 'detailed',
'level': 'ERROR'
}
},
'loggers': {
'foo': {
'handlers': ['foofile']
}
},
'root': {
'handlers': ['console', 'file', 'errors'],
'level': 'DEBUG'
}
}
# Log some initial events, just to show that logging in the parent works
# normally.
logging.config.dictConfig(config_initial)
logger = logging.getLogger('setup')
logger.info('About to create workers ...')
workers = []
for i in range(5):
wp = Process(target=worker_process, name='worker %d' % (i + 1),
args=(config_worker,))
workers.append(wp)
wp.start()
logger.info('Started worker: %s', wp.name)
logger.info('About to create listener ...')
stop_event = Event()
lp = Process(target=listener_process, name='listener',
args=(q, stop_event, config_listener))
lp.start()
logger.info('Started listener')
# We now hang around for the workers to finish their work.
for wp in workers:
wp.join()
# Workers all done, listening can now stop.
# Logging in the parent still works normally.
logger.info('Telling listener to stop ...')
stop_event.set()
lp.join()
logger.info('All done.')
if __name__ == '__main__':
main()
将 BOM 插入发送到 SysLogHandler 的消息中
RFC 5424要求将 Unicode 消息作为一组字节发送到 syslog 守护程序,该字节具有以下结构:可选的纯 ASCII 组件,后跟 UTF-8 字节 Sequences 标记(BOM),然后是使用 UTF 编码的 Unicode -8. (请参见 规范的相关部分。)
在 Python 3.1 中,代码已添加到SysLogHandler中,以将 BOM 表插入消息中,但是不幸的是,它的实现不正确,因为 BOM 表出现在消息的开头,因此不允许任何纯 ASCII 组件出现在消息之前。
由于此行为被破坏,因此错误的 BOM 插入代码已从 Python 3.2.4 及更高版本中删除。但是,它不会被替换,并且如果要生成 RFC 5424兼容的消息,其中包括 BOM,使用 UTF-8 编码的 BOM 表,可选的纯 ASCII 序列以及其后的任意 Unicode 编码,则需要执行以下:
- 使用格式字符串将Formatter实例附加到SysLogHandler实例,例如:
'ASCII section\ufeffUnicode section'
当使用 UTF-8 编码时,Unicode 代码点 U FEFF 将被编码为 UTF-8 BOM –字节串b'\xef\xbb\xbf'
。
用任何喜欢的占位符替换 ASCII 节,但要确保替换后出现在其中的数据始终是 ASCII(这样,在 UTF-8 编码后,数据将保持不变)。
用您喜欢的任何占位符替换 Unicode 部分;如果替换后出现的数据中包含的字符超出了 ASCII 范围,则可以-使用 UTF-8 进行编码。
格式化的消息将passSysLogHandler
使用 UTF-8 编码进行编码。如果您遵循上述规则,则应该能够产生 RFC 5424兼容的消息。如果您不这样做,则日志记录可能不会抱怨,但是您的消息将不符合 RFC 5424,并且您的 syslog 守护程序可能会抱怨。
实施结构化日志记录
尽管大多数日志记录消息都是供人阅读的,因此不容易pass机器解析,但是在某些情况下,您可能希望以结构化格式输出消息,该结构化格式能够被程序解析(而无需复杂的正则表达式)解析日志消息)。使用日志记录包可以轻松实现。有多种方法可以实现此目的,但是以下是一种简单的方法,该方法使用 JSON 以机器可解析的方式序列化事件:
import json
import logging
class StructuredMessage:
def __init__(self, message, /, **kwargs):
self.message = message
self.kwargs = kwargs
def __str__(self):
return '%s >>> %s' % (self.message, json.dumps(self.kwargs))
_ = StructuredMessage # optional, to improve readability
logging.basicConfig(level=logging.INFO, format='%(message)s')
logging.info(_('message 1', foo='bar', bar='baz', num=123, fnum=123.456))
如果运行了上面的脚本,它将输出:
message 1 >>> {"fnum": 123.456, "num": 123, "bar": "baz", "foo": "bar"}
请注意,根据所使用的 Python 版本,项目的 Sequences 可能会有所不同。
如果需要更专业的处理,则可以使用自定义 JSON 编码器,如以下完整示例所示:
from __future__ import unicode_literals
import json
import logging
# This next bit is to ensure the script runs unchanged on 2.x and 3.x
try:
unicode
except NameError:
unicode = str
class Encoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, set):
return tuple(o)
elif isinstance(o, unicode):
return o.encode('unicode_escape').decode('ascii')
return super(Encoder, self).default(o)
class StructuredMessage:
def __init__(self, message, /, **kwargs):
self.message = message
self.kwargs = kwargs
def __str__(self):
s = Encoder().encode(self.kwargs)
return '%s >>> %s' % (self.message, s)
_ = StructuredMessage # optional, to improve readability
def main():
logging.basicConfig(level=logging.INFO, format='%(message)s')
logging.info(_('message 1', set_value={1, 2, 3}, snowman='\u2603'))
if __name__ == '__main__':
main()
运行上面的脚本时,它会打印:
message 1 >>> {"snowman": "\u2603", "set_value": [1, 2, 3]}
请注意,根据所使用的 Python 版本,项目的 Sequences 可能会有所不同。
使用 dictConfig()自定义处理程序
有时您想以特定方式自定义日志处理程序,如果您使用dictConfig(),则无需子类就可以做到这一点。例如,考虑您可能要设置日志文件的所有权。在 POSIX 上,可以使用shutil.chown()轻松完成此操作,但是 stdlib 中的文件处理程序不提供内置支持。您可以使用简单的函数来自定义处理程序的创建,例如:
def owned_file_handler(filename, mode='a', encoding=None, owner=None):
if owner:
if not os.path.exists(filename):
open(filename, 'a').close()
shutil.chown(filename, *owner)
return logging.FileHandler(filename, mode, encoding)
然后,您可以在传递给dictConfig()的日志记录配置中指定pass调用此函数来创建日志记录处理程序:
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'default': {
'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
},
},
'handlers': {
'file':{
# The values below are popped from this dictionary and
# used to create the handler, set the handler's level and
# its formatter.
'()': owned_file_handler,
'level':'DEBUG',
'formatter': 'default',
# The values below are passed to the handler creator callable
# as keyword arguments.
'owner': ['pulse', 'pulse'],
'filename': 'chowntest.log',
'mode': 'w',
'encoding': 'utf-8',
},
},
'root': {
'handlers': ['file'],
'level': 'DEBUG',
},
}
在此示例中,我仅出于说明目的使用pulse
用户和组设置所有权。将其放到工作脚本chowntest.py
中:
import logging, logging.config, os, shutil
def owned_file_handler(filename, mode='a', encoding=None, owner=None):
if owner:
if not os.path.exists(filename):
open(filename, 'a').close()
shutil.chown(filename, *owner)
return logging.FileHandler(filename, mode, encoding)
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'default': {
'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
},
},
'handlers': {
'file':{
# The values below are popped from this dictionary and
# used to create the handler, set the handler's level and
# its formatter.
'()': owned_file_handler,
'level':'DEBUG',
'formatter': 'default',
# The values below are passed to the handler creator callable
# as keyword arguments.
'owner': ['pulse', 'pulse'],
'filename': 'chowntest.log',
'mode': 'w',
'encoding': 'utf-8',
},
},
'root': {
'handlers': ['file'],
'level': 'DEBUG',
},
}
logging.config.dictConfig(LOGGING)
logger = logging.getLogger('mylogger')
logger.debug('A debug message')
要运行此程序,您可能需要以root
的身份运行:
$ sudo python3.3 chowntest.py
$ cat chowntest.log
2013-11-05 09:34:51,128 DEBUG mylogger A debug message
$ ls -l chowntest.log
-rw-r--r-- 1 pulse pulse 55 2013-11-05 09:34 chowntest.log
请注意,此示例使用 Python 3.3,因为这是shutil.chown()出现的地方。此方法应与任何支持dictConfig()的 Python 版本一起使用-即 Python 2.7、3.2 或更高版本。在 3.3 之前的版本中,您需要使用来实现实际的所有权更改。 os.chown()。
实际上,创建处理程序的Function可能位于项目中某个地方的 Util 模块中。代替配置中的行:
'()': owned_file_handler,
您可以使用例如:
'()': 'ext://project.util.owned_file_handler',
其中project.util
可以替换为函数所在包的实际名称。在上面的工作脚本中,应使用'ext://__main__.owned_file_handler'
。在这里,实际的可调用项由ext://
规范中的dictConfig()解析。
希望本示例还指出了如何实现其他类型的文件更改的方式-例如使用os.chmod()以相同的方式设置特定的 POSIX 权限位。
当然,该方法还可以扩展到除FileHandler之外的其他类型的处理程序-例如,旋转文件处理程序之一,或完全不同的处理程序类型。
在整个应用程序中使用特定的格式样式
在 Python 3.2 中,Formatter获得了style
关键字参数,虽然为了向后兼容而默认为%
,但允许{
或$
的规范支持str.format()和string.Template支持的格式化方法。请注意,这支配了finally输出到日志的日志消息的格式,并且与如何构造单个日志消息完全正交。
记录调用(debug(),info()等)仅采用实际记录消息本身的位置参数,而关键字参数仅用于确定如何处理记录调用的选项(例如exc_info
关键字参数指示应记录回溯信息,或extra
关键字参数表示要添加到日志中的其他上下文信息)。因此,您不能使用str.format()或string.Template语法直接进行日志记录调用,因为日志记录包在内部使用%格式合并格式字符串和变量参数。在保留向后兼容性的同时,不会对此进行任何更改,因为现有代码中存在的所有日志记录调用都将使用%格式的字符串。
有人建议将格式样式与特定的 Logger 相关联,但是这种方法也会遇到向后兼容性问题,因为任何现有代码都可能使用给定的 Logger 名称和%-formatting。
为了使日志能够在任何第三方库和您的代码之间互操作,必须在单个日志调用级别上决定格式。这开辟了几种方式,可以容纳其他格式的样式。
使用 LogRecord 工厂
在 Python 3.2 中,连同上面提到的Formatter更改,日志记录包获得了允许用户使用setLogRecordFactory()函数设置自己的LogRecord子类的Function。您可以使用它来设置自己的LogRecord子类,该子类pass覆盖getMessage()方法来执行 Right Thing。此方法的 Base Class 实现是msg % args
格式发生的地方,您可以替换替代格式;但是,您应该谨慎地支持所有格式样式,并允许%格式为默认格式,以确保与其他代码的互操作性。就像基本实现一样,也应注意调用str(self.msg)
。
有关更多信息,请参考setLogRecordFactory()和LogRecord上的参考文档。
使用自定义消息对象
还有另一种也许更简单的方法,您可以使用\ {}-和$-格式化来构造您的单个日志消息。您可能会记得(从使用任意对象作为消息),在记录日志时,可以使用任意对象作为消息格式字符串,并且日志记录包将对该对象调用str()以获取实际的格式字符串。请考虑以下两类:
class BraceMessage:
def __init__(self, fmt, /, *args, **kwargs):
self.fmt = fmt
self.args = args
self.kwargs = kwargs
def __str__(self):
return self.fmt.format(*self.args, **self.kwargs)
class DollarMessage:
def __init__(self, fmt, /, **kwargs):
self.fmt = fmt
self.kwargs = kwargs
def __str__(self):
from string import Template
return Template(self.fmt).substitute(**self.kwargs)
这些中的任何一个都可以代替格式字符串,以允许使用\ {}-或$格式来构建实际的“消息”部分,该部分将显示在格式化日志输出中,以代替“%(message)s”或“{message}”或“ $ message”。如果您发现每次要记录某事时都使用类名有点笨拙,则可以在消息中使用诸如M
或_
之类的别名(或者使用__
,如果使用_
进行本地化)则使其更加可口。 )。
下面给出了这种方法的示例。首先,使用str.format()进行格式化:
>>> __ = BraceMessage
>>> print(__('Message with {0} {1}', 2, 'placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})', point=p))
Message with coordinates: (0.50, 0.50)
其次,使用string.Template进行格式化:
>>> __ = DollarMessage
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>
需要注意的一件事是,使用这种方法不会对性能造成任何重大影响:实际的格式设置不是在进行日志记录调用时发生,而是在(如果有)记录的消息实际上将由处理程序输出到日志时发生。因此,可能会使您不满意的唯一不寻常的事情是括号会围绕格式字符串和参数,而不仅仅是格式字符串。这是因为_表示法只是对上面显示的XXXMessage
类之一进行构造函数调用的语法糖。
使用 dictConfig()配置过滤器
您可以*使用_ 配置过滤器,尽管乍看之下操作方法可能并不明显(因此该方法)。由于Filter是标准库中唯一包含的过滤器类,并且不太可能满足许多要求(仅作为 Base Class 存在),因此通常需要使用覆盖的filter()方法定义自己的Filter子类。为此,请在过滤器的配置字典中指定()
键,并指定将用于创建过滤器的 callable(一个类是最明显的,但是您可以提供任何返回Filter实例的 callable)。这是一个完整的示例:
import logging
import logging.config
import sys
class MyFilter(logging.Filter):
def __init__(self, param=None):
self.param = param
def filter(self, record):
if self.param is None:
allow = True
else:
allow = self.param not in record.msg
if allow:
record.msg = 'changed: ' + record.msg
return allow
LOGGING = {
'version': 1,
'filters': {
'myfilter': {
'()': MyFilter,
'param': 'noshow',
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'filters': ['myfilter']
}
},
'root': {
'level': 'DEBUG',
'handlers': ['console']
},
}
if __name__ == '__main__':
logging.config.dictConfig(LOGGING)
logging.debug('hello')
logging.debug('hello - noshow')
本示例说明如何以关键字参数的形式将配置数据传递给构造实例的可调用对象。运行时,以上脚本将打印:
changed: hello
这表明过滤器正在按配置工作。
还有两点需要注意:
如果您无法在配置中直接引用可调用对象(例如,如果它位于其他模块中,并且无法将其直接导入配置字典所在的位置),则可以使用访问外部对象中描述的
ext://...
形式。例如,您可以在上面的示例中使用文本'ext://__main__.MyFilter'
而不是MyFilter
。除过滤器外,此技术还可用于配置自定义处理程序和格式化程序。有关日志如何支持在配置中使用用户定义的对象的更多信息,请参见User-defined objects,请参阅上面的其他食谱使用 dictConfig()自定义处理程序。
自定义 exception 格式
有时您可能想要进行自定义的异常格式设置-出于参数的考虑,假设即使存在异常信息,每个记录的事件也只需要一行。您可以使用自定义格式器类来执行此操作,如以下示例所示:
import logging
class OneLineExceptionFormatter(logging.Formatter):
def formatException(self, exc_info):
"""
Format an exception so that it prints on a single line.
"""
result = super(OneLineExceptionFormatter, self).formatException(exc_info)
return repr(result) # or format into one line however you want to
def format(self, record):
s = super(OneLineExceptionFormatter, self).format(record)
if record.exc_text:
s = s.replace('\n', '') + '|'
return s
def configure_logging():
fh = logging.FileHandler('output.txt', 'w')
f = OneLineExceptionFormatter('%(asctime)s|%(levelname)s|%(message)s|',
'%d/%m/%Y %H:%M:%S')
fh.setFormatter(f)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(fh)
def main():
configure_logging()
logging.info('Sample message')
try:
x = 1 / 0
except ZeroDivisionError as e:
logging.exception('ZeroDivisionError: %s', e)
if __name__ == '__main__':
main()
运行时,这将产生一个包含两行的文件:
28/01/2015 07:21:23|INFO|Sample message|
28/01/2015 07:21:23|ERROR|ZeroDivisionError: integer division or modulo by zero|'Traceback (most recent call last):\n File "logtest7.py", line 30, in main\n x = 1 / 0\nZeroDivisionError: integer division or modulo by zero'|
尽管上面的处理很简单,但是它为如何根据自己的喜好格式化异常信息指明了方向。 traceback模块可能有助于满足更多特殊需求。
记录日志信息
在某些情况下,希望以可听而不是可见的格式呈现日志消息。如果您的系统中具有文本转语音(TTS)Function,即使它没有 Python 绑定,也很容易做到。大多数 TTS 系统都有您可以运行的命令行程序,可以使用subprocess从处理程序中调用该程序。这里假设 TTS 命令行程序不会与用户交互或花费很长时间才能完成,并且记录消息的频率不会很高,不会使用户充满消息,并且拥有一次而不是同时说出一条消息,下面的示例实现在处理另一条消息之前先 await 一条消息,这可能会使其他处理程序保持 await 状态。这是一个显示此方法的简短示例,它假定espeak
TTS 包可用:
import logging
import subprocess
import sys
class TTSHandler(logging.Handler):
def emit(self, record):
msg = self.format(record)
# Speak slowly in a female English voice
cmd = ['espeak', '-s150', '-ven+f3', msg]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
# wait for the program to finish
p.communicate()
def configure_logging():
h = TTSHandler()
root = logging.getLogger()
root.addHandler(h)
# the default formatter just returns the message
root.setLevel(logging.DEBUG)
def main():
logging.info('Hello')
logging.debug('Goodbye')
if __name__ == '__main__':
configure_logging()
sys.exit(main())
运行时,此脚本应以女性声音说“你好”,然后说“再见”。
当然,以上方法可以适用于其他 TTS 系统,甚至可以与其他可以pass命令行运行的外部程序处理消息的系统一起使用。
缓冲日志消息并有条件地输出
在某些情况下,您可能希望将消息记录在临时区域中,并且仅在发生特定情况时才输出消息。例如,您可能要开始在一个函数中记录调试事件,并且如果该函数完成而没有错误,则您不希望使用收集的调试信息使日志混乱,但是,如果有错误,则需要所有调试要输出的信息以及错误。
这是一个示例,显示了如何使用装饰器为希望日志记录以这种方式运行的函数执行此操作。它使用logging.handlers.MemoryHandler,它允许缓冲记录的事件,直到发生某种情况为止,此时将缓冲的事件flushed
-传递给另一个处理程序(target
处理程序)进行处理。默认情况下,MemoryHandler
在其缓冲区被填满或看到级别大于或等于指定阈值的事件时刷新。如果要自定义冲洗行为,可以将此配方与MemoryHandler
的更专门的子类一起使用。
该示例脚本具有一个简单的函数foo
,该函数仅循环浏览所有日志记录级别,写入sys.stderr
表示要登录的级别,然后实际在该级别记录消息。您可以将参数传递给foo
,如果为 true,它将以 ERROR 和 CRITICAL 级别记录-否则,它仅以 DEBUG,INFO 和 WARNING 级别记录。
该脚本只是安排用装饰器装饰foo
,它将执行所需的条件日志记录。装饰器将 Logger 作为参数,并在对装饰函数的调用期间附加一个内存处理程序。装饰器还可以使用目标处理程序,应该进行刷新的级别以及缓冲区的容量(缓冲的记录数)进行参数化。这些默认为StreamHandler,分别写入sys.stderr
,logging.ERROR
和100
。
这是脚本:
import logging
from logging.handlers import MemoryHandler
import sys
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
def log_if_errors(logger, target_handler=None, flush_level=None, capacity=None):
if target_handler is None:
target_handler = logging.StreamHandler()
if flush_level is None:
flush_level = logging.ERROR
if capacity is None:
capacity = 100
handler = MemoryHandler(capacity, flushLevel=flush_level, target=target_handler)
def decorator(fn):
def wrapper(*args, **kwargs):
logger.addHandler(handler)
try:
return fn(*args, **kwargs)
except Exception:
logger.exception('call failed')
raise
finally:
super(MemoryHandler, handler).flush()
logger.removeHandler(handler)
return wrapper
return decorator
def write_line(s):
sys.stderr.write('%s\n' % s)
def foo(fail=False):
write_line('about to log at DEBUG ...')
logger.debug('Actually logged at DEBUG')
write_line('about to log at INFO ...')
logger.info('Actually logged at INFO')
write_line('about to log at WARNING ...')
logger.warning('Actually logged at WARNING')
if fail:
write_line('about to log at ERROR ...')
logger.error('Actually logged at ERROR')
write_line('about to log at CRITICAL ...')
logger.critical('Actually logged at CRITICAL')
return fail
decorated_foo = log_if_errors(logger)(foo)
if __name__ == '__main__':
logger.setLevel(logging.DEBUG)
write_line('Calling undecorated foo with False')
assert not foo(False)
write_line('Calling undecorated foo with True')
assert foo(True)
write_line('Calling decorated foo with False')
assert not decorated_foo(False)
write_line('Calling decorated foo with True')
assert decorated_foo(True)
运行此脚本时,应观察到以下输出:
Calling undecorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling undecorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
about to log at CRITICAL ...
Calling decorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling decorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
Actually logged at DEBUG
Actually logged at INFO
Actually logged at WARNING
Actually logged at ERROR
about to log at CRITICAL ...
Actually logged at CRITICAL
如您所见,仅当记录严重性为 ERROR 或更高的事件时,才会发生实际的日志记录输出,但是在这种情况下,还会记录严重性较低的任何先前事件。
您当然可以使用传统的装饰方式:
@log_if_errors(logger)
def foo(fail=False):
...
pass配置使用 UTC(GMT)格式化时间
有时您想使用 UTC 格式化时间,可以使用类似 UTCFormatter 的类来完成,如下所示:
import logging
import time
class UTCFormatter(logging.Formatter):
converter = time.gmtime
然后您可以在代码中使用UTCFormatter
而不是Formatter。如果要pass配置完成此操作,则可以将dictConfig() API 与以下完整示例所示的方法结合使用:
import logging
import logging.config
import time
class UTCFormatter(logging.Formatter):
converter = time.gmtime
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'utc': {
'()': UTCFormatter,
'format': '%(asctime)s %(message)s',
},
'local': {
'format': '%(asctime)s %(message)s',
}
},
'handlers': {
'console1': {
'class': 'logging.StreamHandler',
'formatter': 'utc',
},
'console2': {
'class': 'logging.StreamHandler',
'formatter': 'local',
},
},
'root': {
'handlers': ['console1', 'console2'],
}
}
if __name__ == '__main__':
logging.config.dictConfig(LOGGING)
logging.warning('The local time is %s', time.asctime())
运行此脚本时,它应显示如下内容:
2015-10-17 12:53:29,501 The local time is Sat Oct 17 13:53:29 2015
2015-10-17 13:53:29,501 The local time is Sat Oct 17 13:53:29 2015
显示了如何将时间设置为本地时间和 UTC 格式,每个处理程序一个。
使用上下文 Management 器进行选择性日志记录
有时候,临时更改日志记录配置并在执行某些操作后将其还原会很有用。为此,上下文 Management 器是保存和还原日志上下文的最明显的方法。这是一个这样的上下文 Management 器的简单示例,它允许您有选择地更改日志记录级别并仅在上下文 Management 器的范围内添加日志记录处理程序:
import logging
import sys
class LoggingContext:
def __init__(self, logger, level=None, handler=None, close=True):
self.logger = logger
self.level = level
self.handler = handler
self.close = close
def __enter__(self):
if self.level is not None:
self.old_level = self.logger.level
self.logger.setLevel(self.level)
if self.handler:
self.logger.addHandler(self.handler)
def __exit__(self, et, ev, tb):
if self.level is not None:
self.logger.setLevel(self.old_level)
if self.handler:
self.logger.removeHandler(self.handler)
if self.handler and self.close:
self.handler.close()
# implicit return of None => don't swallow exceptions
如果指定级别值,那么 Logger 的级别在上下文 Management 器覆盖的 with 块的范围内设置为该值。如果指定处理程序,则在进入该块时将其添加到 Logger,并在退出该块时将其删除。您也可以要求 Manager 在代码块退出时为您关闭处理程序-如果您不再需要该处理程序,则可以执行此操作。
为了说明它是如何工作的,我们可以在上面添加以下代码块:
if __name__ == '__main__':
logger = logging.getLogger('foo')
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.INFO)
logger.info('1. This should appear just once on stderr.')
logger.debug('2. This should not appear.')
with LoggingContext(logger, level=logging.DEBUG):
logger.debug('3. This should appear once on stderr.')
logger.debug('4. This should not appear.')
h = logging.StreamHandler(sys.stdout)
with LoggingContext(logger, level=logging.DEBUG, handler=h, close=True):
logger.debug('5. This should appear twice - once on stderr and once on stdout.')
logger.info('6. This should appear just once on stderr.')
logger.debug('7. This should not appear.')
我们最初将 Logger 的级别设置为INFO
,因此出现消息#1,而没有消息#2.然后,在下面的with
块中将级别临时更改为DEBUG
,因此出现消息#3.块退出后,Logger 的级别恢复为INFO
,因此消息#4 不出现。在下一个with
块中,我们再次将级别设置为DEBUG
,但还添加了写入sys.stdout
的处理程序。因此,消息#5 在控制台上出现两次(一次passstderr
一次,一次passstdout
)。 with
语句完成后,状态与之前一样,因此出现消息 6(如消息 1),而消息 7 没有(如消息 2)。
如果我们运行生成的脚本,结果如下:
$ python logctx.py
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.
如果再次运行它,但是将stderr
传递到/dev/null
,则会看到以下内容,这是唯一写入stdout
的消息:
$ python logctx.py 2>/dev/null
5. This should appear twice - once on stderr and once on stdout.
再一次,但将stdout
输送到/dev/null
,我们得到:
$ python logctx.py >/dev/null
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.
在这种情况下,按预期不会出现打印到stdout
的消息#5.
当然,这里描述的方法可以通用化,例如临时附加日志记录过滤器。请注意,以上代码可在 Python 2 和 Python 3 中使用。
CLI 应用程序入门模板
这是一个示例,显示了如何进行:
使用基于命令行参数的日志记录级别
分派到单独文件中的多个子命令,所有子命令以一致的方式记录在同一级别
利用简单,最少的配置
假设我们有一个命令行应用程序,其任务是停止,启动或重新启动某些服务。为了说明起见,可以将其组织为文件app.py
,该文件是应用程序的主要脚本,并具有在start.py
,stop.py
和restart.py
中实现的单个命令。进一步假设我们要pass命令行参数控制应用程序的详细程度,默认为logging.INFO
。这是app.py
可以写的一种方式:
import argparse
import importlib
import logging
import os
import sys
def main(args=None):
scriptname = os.path.basename(__file__)
parser = argparse.ArgumentParser(scriptname)
levels = ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')
parser.add_argument('--log-level', default='INFO', choices=levels)
subparsers = parser.add_subparsers(dest='command',
help='Available commands:')
start_cmd = subparsers.add_parser('start', help='Start a service')
start_cmd.add_argument('name', metavar='NAME',
help='Name of service to start')
stop_cmd = subparsers.add_parser('stop',
help='Stop one or more services')
stop_cmd.add_argument('names', metavar='NAME', nargs='+',
help='Name of service to stop')
restart_cmd = subparsers.add_parser('restart',
help='Restart one or more services')
restart_cmd.add_argument('names', metavar='NAME', nargs='+',
help='Name of service to restart')
options = parser.parse_args()
# the code to dispatch commands could all be in this file. For the purposes
# of illustration only, we implement each command in a separate module.
try:
mod = importlib.import_module(options.command)
cmd = getattr(mod, 'command')
except (ImportError, AttributeError):
print('Unable to find the code for command \'%s\'' % options.command)
return 1
# Could get fancy here and load configuration from file or dictionary
logging.basicConfig(level=options.log_level,
format='%(levelname)s %(name)s %(message)s')
cmd(options)
if __name__ == '__main__':
sys.exit(main())
start
,stop
和restart
命令可以在单独的模块中实现,就像这样开始:
# start.py
import logging
logger = logging.getLogger(__name__)
def command(options):
logger.debug('About to start %s', options.name)
# actually do the command processing here ...
logger.info('Started the \'%s\' service.', options.name)
并因此停止:
# stop.py
import logging
logger = logging.getLogger(__name__)
def command(options):
n = len(options.names)
if n == 1:
plural = ''
services = '\'%s\'' % options.names[0]
else:
plural = 's'
services = ', '.join('\'%s\'' % name for name in options.names)
i = services.rfind(', ')
services = services[:i] + ' and ' + services[i + 2:]
logger.debug('About to stop %s', services)
# actually do the command processing here ...
logger.info('Stopped the %s service%s.', services, plural)
并类似地重新启动:
# restart.py
import logging
logger = logging.getLogger(__name__)
def command(options):
n = len(options.names)
if n == 1:
plural = ''
services = '\'%s\'' % options.names[0]
else:
plural = 's'
services = ', '.join('\'%s\'' % name for name in options.names)
i = services.rfind(', ')
services = services[:i] + ' and ' + services[i + 2:]
logger.debug('About to restart %s', services)
# actually do the command processing here ...
logger.info('Restarted the %s service%s.', services, plural)
如果我们使用默认日志级别运行此应用程序,则将获得如下输出:
$ python app.py start foo
INFO start Started the 'foo' service.
$ python app.py stop foo bar
INFO stop Stopped the 'foo' and 'bar' services.
$ python app.py restart foo bar baz
INFO restart Restarted the 'foo', 'bar' and 'baz' services.
第一个单词是日志记录级别,第二个单词是事件记录位置的模块或软件包名称。
如果我们更改日志记录级别,则可以更改发送到日志的信息。例如,如果我们需要更多信息:
$ python app.py --log-level DEBUG start foo
DEBUG start About to start foo
INFO start Started the 'foo' service.
$ python app.py --log-level DEBUG stop foo bar
DEBUG stop About to stop 'foo' and 'bar'
INFO stop Stopped the 'foo' and 'bar' services.
$ python app.py --log-level DEBUG restart foo bar baz
DEBUG restart About to restart 'foo', 'bar' and 'baz'
INFO restart Restarted the 'foo', 'bar' and 'baz' services.
如果我们想要更少:
$ python app.py --log-level WARNING start foo
$ python app.py --log-level WARNING stop foo bar
$ python app.py --log-level WARNING restart foo bar baz
在这种情况下,这些命令不会向控制台输出任何内容,因为它们不会记录WARNING
或更高级别的内容。
用于记录的 Qt GUI
时不时出现一个有关如何登录到 GUI 应用程序的问题。 Qt框架是流行的跨平台 UI 框架,具有使用PySide2或PyQt5库的 Python 绑定。
以下示例显示如何登录到 Qt GUI。这引入了一个简单的QtHandler
类,该类带有一个可调用对象,该类应该是进行 GUI 更新的主线程中的插槽。还创建了一个工作线程,以显示如何从 UI 本身(pass用于手动记录的按钮)以及在后台执行工作的工作线程(在这里,仅以随机级别随机记录消息)登录到 GUI 之间的短暂延迟)。
工作线程是使用 Qt 的QThread
类而不是threading模块实现的,因为在某些情况下必须使用QThread
,这可以更好地与其他Qt
组件集成。
该代码应与PySide2
或PyQt5
的最新版本一起使用。您应该能够使该方法适应 Qt 的早期版本。请参考代码片段中的 Comments 以获取更多详细信息。
import datetime
import logging
import random
import sys
import time
# Deal with minor differences between PySide2 and PyQt5
try:
from PySide2 import QtCore, QtGui, QtWidgets
Signal = QtCore.Signal
Slot = QtCore.Slot
except ImportError:
from PyQt5 import QtCore, QtGui, QtWidgets
Signal = QtCore.pyqtSignal
Slot = QtCore.pyqtSlot
logger = logging.getLogger(__name__)
#
# Signals need to be contained in a QObject or subclass in order to be correctly
# initialized.
#
class Signaller(QtCore.QObject):
signal = Signal(str, logging.LogRecord)
#
# Output to a Qt GUI is only supposed to happen on the main thread. So, this
# handler is designed to take a slot function which is set up to run in the main
# thread. In this example, the function takes a string argument which is a
# formatted log message, and the log record which generated it. The formatted
# string is just a convenience - you could format a string for output any way
# you like in the slot function itself.
#
# You specify the slot function to do whatever GUI updates you want. The handler
# doesn't know or care about specific UI elements.
#
class QtHandler(logging.Handler):
def __init__(self, slotfunc, *args, **kwargs):
super(QtHandler, self).__init__(*args, **kwargs)
self.signaller = Signaller()
self.signaller.signal.connect(slotfunc)
def emit(self, record):
s = self.format(record)
self.signaller.signal.emit(s, record)
#
# This example uses QThreads, which means that the threads at the Python level
# are named something like "Dummy-1". The function below gets the Qt name of the
# current thread.
#
def ctname():
return QtCore.QThread.currentThread().objectName()
#
# Used to generate random levels for logging.
#
LEVELS = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL)
#
# This worker class represents work that is done in a thread separate to the
# main thread. The way the thread is kicked off to do work is via a button press
# that connects to a slot in the worker.
#
# Because the default threadName value in the LogRecord isn't much use, we add
# a qThreadName which contains the QThread name as computed above, and pass that
# value in an "extra" dictionary which is used to update the LogRecord with the
# QThread name.
#
# This example worker just outputs messages sequentially, interspersed with
# random delays of the order of a few seconds.
#
class Worker(QtCore.QObject):
@Slot()
def start(self):
extra = {'qThreadName': ctname() }
logger.debug('Started work', extra=extra)
i = 1
# Let the thread run until interrupted. This allows reasonably clean
# thread termination.
while not QtCore.QThread.currentThread().isInterruptionRequested():
delay = 0.5 + random.random() * 2
time.sleep(delay)
level = random.choice(LEVELS)
logger.log(level, 'Message after delay of %3.1f: %d', delay, i, extra=extra)
i += 1
#
# Implement a simple UI for this cookbook example. This contains:
#
# * A read-only text edit window which holds formatted log messages
# * A button to start work and log stuff in a separate thread
# * A button to log something from the main thread
# * A button to clear the log window
#
class Window(QtWidgets.QWidget):
COLORS = {
logging.DEBUG: 'black',
logging.INFO: 'blue',
logging.WARNING: 'orange',
logging.ERROR: 'red',
logging.CRITICAL: 'purple',
}
def __init__(self, app):
super(Window, self).__init__()
self.app = app
self.textedit = te = QtWidgets.QPlainTextEdit(self)
# Set whatever the default monospace font is for the platform
f = QtGui.QFont('nosuchfont')
f.setStyleHint(f.Monospace)
te.setFont(f)
te.setReadOnly(True)
PB = QtWidgets.QPushButton
self.work_button = PB('Start background work', self)
self.log_button = PB('Log a message at a random level', self)
self.clear_button = PB('Clear log window', self)
self.handler = h = QtHandler(self.update_status)
# Remember to use qThreadName rather than threadName in the format string.
fs = '%(asctime)s %(qThreadName)-12s %(levelname)-8s %(message)s'
formatter = logging.Formatter(fs)
h.setFormatter(formatter)
logger.addHandler(h)
# Set up to terminate the QThread when we exit
app.aboutToQuit.connect(self.force_quit)
# Lay out all the widgets
layout = QtWidgets.QVBoxLayout(self)
layout.addWidget(te)
layout.addWidget(self.work_button)
layout.addWidget(self.log_button)
layout.addWidget(self.clear_button)
self.setFixedSize(900, 400)
# Connect the non-worker slots and signals
self.log_button.clicked.connect(self.manual_update)
self.clear_button.clicked.connect(self.clear_display)
# Start a new worker thread and connect the slots for the worker
self.start_thread()
self.work_button.clicked.connect(self.worker.start)
# Once started, the button should be disabled
self.work_button.clicked.connect(lambda : self.work_button.setEnabled(False))
def start_thread(self):
self.worker = Worker()
self.worker_thread = QtCore.QThread()
self.worker.setObjectName('Worker')
self.worker_thread.setObjectName('WorkerThread') # for qThreadName
self.worker.moveToThread(self.worker_thread)
# This will start an event loop in the worker thread
self.worker_thread.start()
def kill_thread(self):
# Just tell the worker to stop, then tell it to quit and wait for that
# to happen
self.worker_thread.requestInterruption()
if self.worker_thread.isRunning():
self.worker_thread.quit()
self.worker_thread.wait()
else:
print('worker has already exited.')
def force_quit(self):
# For use when the window is closed
if self.worker_thread.isRunning():
self.kill_thread()
# The functions below update the UI and run in the main thread because
# that's where the slots are set up
@Slot(str, logging.LogRecord)
def update_status(self, status, record):
color = self.COLORS.get(record.levelno, 'black')
s = '<pre><font color="%s">%s</font></pre>' % (color, status)
self.textedit.appendHtml(s)
@Slot()
def manual_update(self):
# This function uses the formatted message passed in, but also uses
# information from the record to format the message in an appropriate
# color according to its severity (level).
level = random.choice(LEVELS)
extra = {'qThreadName': ctname() }
logger.log(level, 'Manually logged!', extra=extra)
@Slot()
def clear_display(self):
self.textedit.clear()
def main():
QtCore.QThread.currentThread().setObjectName('MainThread')
logging.getLogger().setLevel(logging.DEBUG)
app = QtWidgets.QApplication(sys.argv)
example = Window(app)
example.show()
sys.exit(app.exec_())
if __name__=='__main__':
main()