我有一个非常简单的应用程序,可以启动 PubSub 订阅者 StreamingPull 客户端。我已将其部署在 Kubernetes 上,以便可以扩展。当我部署一个 Pod 时,一切都会按预期进行。当我扩展到 2 个容器时,我开始收到重复的消息。我知道会出现一些小的重复消息,但几乎一半的消息(有时更多)会被多次接收。
我的进程大约需要 600 毫秒来处理一条消息。订阅确认截止时间设置为 600 秒。我发布了 1000 条消息,订阅在不到一分钟的时间内就被清空了,但 recognize_message_operation 指标显示大约 1500 个调用,其中有少量的 response_code 已过期。我的过程中没有出现任何故障,所有消息均在处理时得到确认。日志显示两个容器在同一时间接收到相同的消息。处理所有消息的时间远低于订阅的确认截止日期,并且 Python 客户端应该处理租约管理,所以我不确定为什么会有过期的消息。我也不明白为什么同一条消息会同时发送到多个订阅者客户端。
最小工作示例:
import time
from google.cloud import pubsub_v1
PROJECT_ID = 'my-project'
PUBSUB_TOPIC_ID = 'duplicate-test'
PUBSUB_SUBSCRIPTION_ID = 'duplicate-test'
def subscribe(sleep_time=None):
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
PROJECT_ID, PUBSUB_SUBSCRIPTION_ID)
def callback(message):
print(message.data.decode())
if sleep_time:
time.sleep(sleep_time)
print(f'acking {message.data.decode()}')
message.ack()
future = subscriber.subscribe(
subscription_path, callback=callback)
print(f'Listening for messages on {subscription_path}')
future.result()
def publish(num_messages):
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, PUBSUB_TOPIC_ID)
for i in range(num_messages):
publisher.publish(topic_path, str(i).encode())
在两个终端中,运行 subscribe(1)。在第三个终端中,运行publish(200)。对我来说,这将在两个订户终端中提供重复的内容。
两个订阅者同时收到相同的消息是不常见的,除非:
- 由于重试,该消息发布了两次(因此就 Cloud Pub/Sub 而言,有两条消息)。在这种情况下,两条消息的内容将相同,但它们的消息 ID 将不同。因此,可能值得确保您正在查看服务提供的消息 ID,以确保消息确实是重复的。
- 订阅者处于不同的订阅状态,这意味着每个订阅者都会收到all的消息。
如果这两种情况都不是,那么重复的情况应该相对较少。有通过流式拉取处理大量小消息积压的边缘情况 https://cloud.google.com/pubsub/docs/pull#streamingpull_dealing_with_large_backlogs_of_small_messages(这是 Python 客户端库使用的)。基本上,如果非常小的消息以突发方式发布,然后订阅者消费该突发,则可以看到您所看到的行为。所有消息最终都会发送到两个订阅者之一,并在未完成消息数量的流量控制限制之后进行缓冲。这些消息可能会超过其确认截止时间,从而导致重新传递(可能会传递给其他订阅者)。第一个订阅者的缓冲区中仍然有这些消息,并且也会看到这些消息。
但是,如果您始终看到两个新启动的订阅者立即收到具有相同消息 ID 的相同消息,那么您应该联系 Google Cloud 支持人员,并提供您的项目名称、订阅名称和消息 ID 示例。他们将能够更好地调查为什么会发生这种立即重复。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)