当日志在不同进程中发出时,caplog 中的消息为空

2024-05-07

我正在使用 log_cli=true 运行测试。 剧本:

import logging
import sys
from multiprocessing import Process

logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)

logger = logging.getLogger("leapp.actors.quagga_report")


class ActorContext:
    def __init__(self):
        self.log = logger

    def run(self):
        self.log.debug("Some msg")


current_actor_context = ActorContext()


def test_caplog_fails(caplog):
    with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
        p = Process(target=current_actor_context.run)
        p.start()
        p.join()
    assert "Some msg" in caplog.text


def test_caplog_passes(caplog):
    with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
        current_actor_context.run()
    assert "Some msg" in caplog.text

pytest log_cli 显示两个测试中的日志消息, 但是,caplog 只能看到第二次测试的消息。

第一次测试失败并出现以下回溯:

-------------------------------- live log call ---------------------------------
| 13:39:20 | 40212 | leapp.actors.quagga_report | DEBUG | test_logger_caplog_fails.py | Some msg
FAILED
tests/test_logger_caplog_fails.py:20 (test_caplog_fails)
Traceback (most recent call last):
  File "/home/azhukov/Dropbox/code/lighting_talks/asyncio_subprocess_shells/tests/test_logger_caplog_fails.py", line 26, in test_caplog_fails
    assert "Some msg" in caplog.text
AssertionError: assert 'Some msg' in ''
 +  where '' = <_pytest.logging.LogCaptureFixture object at 0x7fb8a87f2370>.text

我正在寻找类似的问题Pytest 捕获不起作用 - caplog 和 capsys 为空 https://stackoverflow.com/questions/61702794/pytest-capture-not-working-caplog-and-capsys-are-empty,但是就我而言,该财产propagate=True


受到@hoefling 的启发,并且仍然愿意使用 caplog,有一个解决方案。这个想法是创建一个固定装置,它从 QueueHandler 处理程序获取队列并在主进程中重新发出日志,这些日志可由 caplog 捕获

import logging
import sys
from contextlib import contextmanager
from logging import handlers
from multiprocessing import Process, Queue

import pytest

logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)


logger = logging.getLogger(__name__)


class ActorContext:
    def __init__(self):
        self.log = logger

    def run(self):
        self.log.debug("Some msg")


current_actor_context = ActorContext()


@pytest.fixture()
def caplog_workaround():
    @contextmanager
    def ctx():
        logger_queue = Queue()
        logger = logging.getLogger()
        logger.addHandler(handlers.QueueHandler(logger_queue))
        yield
        while not logger_queue.empty():
            log_record: logging.LogRecord = logger_queue.get()
            logger._log(
                level=log_record.levelno,
                msg=log_record.message,
                args=log_record.args,
                exc_info=log_record.exc_info,
            )

    return ctx


def test_caplog_already_not_fails(caplog, caplog_workaround):
    with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
        with caplog_workaround():
            p = Process(target=current_actor_context.run)
            p.start()
            p.join()
    assert "Some msg" in caplog.text


def test_caplog_passes(caplog, capsys):
    with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
        current_actor_context.run()
    assert "Some msg" in caplog.text
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

当日志在不同进程中发出时,caplog 中的消息为空 的相关文章

随机推荐