我有一个用 python 编写的 API,可以调用 AWS 服务,特别是 sqs、s3 和 dynamodb。我正在尝试为 API 编写单元测试,并且想模拟对 AWS 的所有调用。我对 moto 作为模拟这些服务的一种方式进行了大量研究,但是我尝试过的每个实现都不会模拟我的调用并向 AWS 发送真实的请求。调查这个问题我发现人们讨论 https://github.com/spulec/moto/issues/1793使用 boto3>=1.8 时,boto 和 moto 之间存在一些不兼容性。有没有办法解决?我的最终问题是:当使用 boto3>=1.8 时,是否有一种简单的方法可以使用 moto 或其他库来模拟 boto3 对 sqs、s3 和 dynamodb 的调用?
以下是我当前使用的 boto3 和 moto 版本:
boto3 == 1.9.314
moto == 1.3.11
下面是我使用 moto 模拟对 sqs 的调用的最新尝试。我定义了一个 pytest 固定装置,在其中创建了一个 mock_sqs 会话和一个(希望是假的)队列。我使用这个装置来单元测试我的 get_queue_item 函数。
SQS脚本
# ptr_api.aws.sqs
import boto3
REGION = 'us-east-1'
sqs_r = boto3.resource('sqs', REGION)
sqs_c = boto3.client('sqs', REGION)
def get_queue_item(queue_name):
queue = sqs_r.get_queue_by_name(QueueName=queue_name)
queue_url = queue.url
response = sqs_c.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=1,
VisibilityTimeout=10,
WaitTimeSeconds=3
)
try:
message = response['Messages'][0]
receipt_handle = message['ReceiptHandle']
delete_response = sqs_c.delete_message(QueueUrl=queue_url,
ReceiptHandle=receipt_handle)
return message['Body']
except Exception as e:
print("error in get_queue_item: ")
print(e)
return False
测试SQS脚本
# test_sqs.py
import pytest
from moto import mock_sqs
import boto3
from ptr_api.aws.sqs import get_queue_item
@pytest.fixture
def sqs_mocker(scope='session', autouse=True):
mock = mock_sqs()
mock.start()
sqs_r = boto3.resource('sqs', 'us-east-1')
sqs_c = boto3.client('sqs', 'us-east-1')
queue_name = 'test_queue_please_dont_actually_exist'
queue_url = sqs_c.create_queue(
QueueName=queue_name
)['QueueUrl']
yield (sqs_c, queue_url, queue_name)
mock.stop()
def test_get_queue_item(sqs_mocker):
sqs_c, queue_url, queue_name = sqs_mocker
message_body = 'why hello there' # Create dummy message
sqs_c.send_message( # Send message to fake queue
QueueUrl=queue_url,
MessageBody=message_body,
)
res = get_queue_item(queue_name) # Test get_queue_item function
assert res == message_body
然而,当我检查控制台时,我看到队列实际上已创建。我也尝试过改变导入的顺序,但似乎没有任何效果。我尝试使用模拟装饰器,甚至短暂地尝试了 moto 的独立服务器模式。我是否做错了什么,或者这真的只是我听说的 boto3/moto 与较新版本的 boto3 不兼容?不幸的是,降级我的 boto3 版本不是一个选择。有没有其他方法可以通过另一个库获得我想要的结果?我对 localstack 进行了一些研究,但我想在完全放弃 moto 之前确保这是我唯一的选择。