我正在使用这个问题底部的 Python 代码监听 Google PubSub 消息。它实际上是来自 Google 的异步拉取示例。
我运行我的程序并输出到文件:
python my_script.py | tee log.txt
如果我在接收消息时运行程序,则两个print()
语句输出并且一切都按预期进行。
但是,如果我在发布消息之前运行该程序,则不会有任何输出。他们俩print()
语句不输出,代码只是阻塞。我期待看到两人print()
声明。
I tried python my_script.py > log.txt
但这没有什么区别。
If I Ctrl + C
程序的堆栈跟踪显示了这一点:
^CTraceback (most recent call last):
File "my_script.py", line 58, in <module>
streaming_pull_future.result()
File "python3-3.8.6-env/lib/python3.8/site-packages/google/cloud/pubsub_v1/futures.py", line 102, in result
err = self.exception(timeout=timeout)
File "python3-3.8.6-env/lib/python3.8/site-packages/google/cloud/pubsub_v1/futures.py", line 121, in exception
if not self._completed.wait(timeout=timeout):
File "python3-3.8.6/lib/python3.8/threading.py", line 558, in wait
signaled = self._cond.wait(timeout)
File "python3-3.8.6/lib/python3.8/threading.py", line 302, in wait
waiter.acquire()
KeyboardInterrupt
Exception ignored in: <_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>
BrokenPipeError: [Errno 32] Broken pipe
我的猜测是阻塞调用result()
是否阻止将日志输出刷新到文件?
无论当前是否正在发布消息,是否都可以运行程序并查看日志记录?
如果不是,当消息最终发布时,日志记录最终会被“刷新”吗?
Code:
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id )
print(f"Subscribing to {subscription_path}..\n")
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
with subscriber:
try:
streaming_pull_future.result()
except TimeoutError:
streaming_pull_future.cancel()