我正在尝试建立对 Google Cloud PubSub 主题的长期运行的 Pull 订阅。
我使用的代码与文档中给出的示例非常相似here https://cloud.google.com/pubsub/docs/pull#pubsub-pull-messages-async-python, i.e.:
def receive_messages(project, subscription_name):
"""Receives messages from a pull subscription."""
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)
def callback(message):
print('Received message: {}'.format(message))
message.ack()
subscriber.subscribe(subscription_path, callback=callback)
# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
time.sleep(60)
问题是我有时会收到以下回溯:
Exception in thread Consumer helper: consume bidirectional stream:
Traceback (most recent call last):
File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
self.run()
File "/usr/lib/python3.5/threading.py", line 862, in run
self._target(*self._args, **self._kwargs)
File "/path/to/google/cloud/pubsub_v1/subscriber/_consumer.py", line 248, in _blocking_consume
self._policy.on_exception(exc)
File "/path/to/google/cloud/pubsub_v1/subscriber/policy/thread.py", line 135, in on_exception
raise exception
File "/path/to/google/cloud/pubsub_v1/subscriber/_consumer.py", line 234, in _blocking_consume
for response in response_generator:
File "/path/to/grpc/_channel.py", line 348, in __next__
return self._next()
File "/path/to/grpc/_channel.py", line 342, in _next
raise self
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, The service was unable to fulfill your request. Please try again. [code=8a75])>
我看到这被引用于另一个问题 https://stackoverflow.com/questions/42638468/google-pubsub-error-code-8a75但在这里我想问如何在Python中正确处理它。我尝试将请求包装在异常中,但它似乎在后台运行,如果出现该错误,我无法重试。