我正在使用 log_cli=true 运行测试。
剧本:
import logging
import sys
from multiprocessing import Process
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
logger = logging.getLogger("leapp.actors.quagga_report")
class ActorContext:
def __init__(self):
self.log = logger
def run(self):
self.log.debug("Some msg")
current_actor_context = ActorContext()
def test_caplog_fails(caplog):
with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
p = Process(target=current_actor_context.run)
p.start()
p.join()
assert "Some msg" in caplog.text
def test_caplog_passes(caplog):
with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
current_actor_context.run()
assert "Some msg" in caplog.text
pytest log_cli 显示两个测试中的日志消息,
但是,caplog 只能看到第二次测试的消息。
第一次测试失败并出现以下回溯:
-------------------------------- live log call ---------------------------------
| 13:39:20 | 40212 | leapp.actors.quagga_report | DEBUG | test_logger_caplog_fails.py | Some msg
FAILED
tests/test_logger_caplog_fails.py:20 (test_caplog_fails)
Traceback (most recent call last):
File "/home/azhukov/Dropbox/code/lighting_talks/asyncio_subprocess_shells/tests/test_logger_caplog_fails.py", line 26, in test_caplog_fails
assert "Some msg" in caplog.text
AssertionError: assert 'Some msg' in ''
+ where '' = <_pytest.logging.LogCaptureFixture object at 0x7fb8a87f2370>.text
我正在寻找类似的问题Pytest 捕获不起作用 - caplog 和 capsys 为空 https://stackoverflow.com/questions/61702794/pytest-capture-not-working-caplog-and-capsys-are-empty,但是就我而言,该财产propagate=True
受到@hoefling 的启发,并且仍然愿意使用 caplog,有一个解决方案。这个想法是创建一个固定装置,它从 QueueHandler 处理程序获取队列并在主进程中重新发出日志,这些日志可由 caplog 捕获
import logging
import sys
from contextlib import contextmanager
from logging import handlers
from multiprocessing import Process, Queue
import pytest
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
logger = logging.getLogger(__name__)
class ActorContext:
def __init__(self):
self.log = logger
def run(self):
self.log.debug("Some msg")
current_actor_context = ActorContext()
@pytest.fixture()
def caplog_workaround():
@contextmanager
def ctx():
logger_queue = Queue()
logger = logging.getLogger()
logger.addHandler(handlers.QueueHandler(logger_queue))
yield
while not logger_queue.empty():
log_record: logging.LogRecord = logger_queue.get()
logger._log(
level=log_record.levelno,
msg=log_record.message,
args=log_record.args,
exc_info=log_record.exc_info,
)
return ctx
def test_caplog_already_not_fails(caplog, caplog_workaround):
with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
with caplog_workaround():
p = Process(target=current_actor_context.run)
p.start()
p.join()
assert "Some msg" in caplog.text
def test_caplog_passes(caplog, capsys):
with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
current_actor_context.run()
assert "Some msg" in caplog.text
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)