我似乎无法理解这里的问题。我正在为我的渠道消费者编写测试文档中的描述 https://channels.readthedocs.io/en/latest/topics/testing.html。我通常会使用 Django 默认的单元测试,但由于 Channels 需要使用 pytest,我正在使用它,但仍然希望维护编写测试的类结构。
我在编写类似于 unittest 的设置方法时遇到问题,我将使用它来初始化测试属性。我试过方法夹具 https://docs.pytest.org/en/latest/xunit_setup.html?highlight=setup_class#method-and-function-level-setup-teardown,类灯具使用自动使用=真 https://docs.pytest.org/en/latest/fixture.html#autouse-fixtures-xunit-setup-on-steroids,但它们似乎都不起作用,因为我无法访问测试方法中的实例属性。最后我决定写set_up
实例方法并为每个类手动调用它。这是我到目前为止所拥有的:
@pytest.mark.django_db
@pytest.mark.asyncio
class TestChatConsumer(object):
@database_sync_to_async
def set_up(self):
self.user = UserFactory()
self.api_client = APIClientFactory()
self.token = TokenFactory(user=self.user)
self.credentials = {
'AUTHORIZATION': self.token.key,
...
}
self.credentials_params = '&'.join(['{}={}'.format(k, v) for k, v in self.credentials.items()])
self.authless_endpoint = '/api/v1/ws/'
self.endpoint = self.authless_endpoint + '?' + self.credentials_params
async def test_connect_without_credentials(self):
await self.set_up()
communicator = WebsocketCommunicator(application, self.authless_endpoint)
connected, subprotocol = await communicator.connect()
assert not connected
async def test_connect_with_credentials(self):
await self.set_up()
communicator = WebsocketCommunicator(application, self.endpoint)
connected, subprotocol = await communicator.connect()
assert connected
问题是我不断得到psycopg2.InterfaceError: connection already closed
当我的消费者尝试访问数据库中的代码时connect
方法。它用database_sync_to_async
当我手动测试它时效果很好。
具体来说,该行中connect
引发错误的方法如下所示:
await database_sync_to_async(user.inbox.increment_connections)().
不确定问题到底是什么database_sync_to_async
应正确清理旧连接,以便可以正确创建新连接。
任何帮助将非常感激。