Google PubSub Python 多个订阅者客户端接收重复消息

2024-01-08

我有一个非常简单的应用程序,可以启动 PubSub 订阅者 StreamingPull 客户端。我已将其部署在 Kubernetes 上,以便可以扩展。当我部署一个 Pod 时,一切都会按预期进行。当我扩展到 2 个容器时,我开始收到重复的消息。我知道会出现一些小的重复消息,但几乎一半的消息(有时更多)会被多次接收。

我的进程大约需要 600 毫秒来处理一条消息。订阅确认截止时间设置为 600 秒。我发布了 1000 条消息,订阅在不到一分钟的时间内就被清空了,但 recognize_message_operation 指标显示大约 1500 个调用,其中有少量的 response_code 已过期。我的过程中没有出现任何故障,所有消息均在处理时得到确认。日志显示两个容器在同一时间接收到相同的消息。处理所有消息的时间远低于订阅的确认截止日期,并且 Python 客户端应该处理租约管理,所以我不确定为什么会有过期的消息。我也不明白为什么同一条消息会同时发送到多个订阅者客户端。

最小工作示例:

import time

from google.cloud import pubsub_v1

PROJECT_ID = 'my-project'
PUBSUB_TOPIC_ID = 'duplicate-test'
PUBSUB_SUBSCRIPTION_ID = 'duplicate-test'

def subscribe(sleep_time=None):
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        PROJECT_ID, PUBSUB_SUBSCRIPTION_ID)

    def callback(message):
        print(message.data.decode())
        if sleep_time:
            time.sleep(sleep_time)
        print(f'acking {message.data.decode()}')
        message.ack()

    future = subscriber.subscribe(
        subscription_path, callback=callback)
    print(f'Listening for messages on {subscription_path}')
    future.result()


def publish(num_messages):
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(PROJECT_ID, PUBSUB_TOPIC_ID)
    for i in range(num_messages):
        publisher.publish(topic_path, str(i).encode())

在两个终端中,运行 subscribe(1)。在第三个终端中,运行publish(200)。对我来说,这将在两个订户终端中提供重复的内容。


两个订阅者同时收到相同的消息是不常见的,除非:

  1. 由于重试,该消息发布了两次(因此就 Cloud Pub/Sub 而言,有两条消息)。在这种情况下,两条消息的内容将相同,但它们的消息 ID 将不同。因此,可能值得确保您正在查看服务提供的消息 ID,以确保消息确实是重复的。
  2. 订阅者处于不同的订阅状态,这意味着每个订阅者都会收到all的消息。

如果这两种情况都不是,那么重复的情况应该相对较少。有通过流式拉取处理大量小消息积压的边缘情况 https://cloud.google.com/pubsub/docs/pull#streamingpull_dealing_with_large_backlogs_of_small_messages(这是 Python 客户端库使用的)。基本上,如果非常小的消息以突发方式发布,然后订阅者消费该突发,则可以看到您所看到的行为。所有消息最终都会发送到两个订阅者之一,并在未完成消息数量的流量控制限制之后进行缓冲。这些消息可能会超过其确认截止时间,从而导致重新传递(可能会传递给其他订阅者)。第一个订阅者的缓冲区中仍然有这些消息,并且也会看到这些消息。

但是,如果您始终看到两个新启动的订阅者立即收到具有相同消息 ID 的相同消息,那么您应该联系 Google Cloud 支持人员,并提供您的项目名称、订阅名称和消息 ID 示例。他们将能够更好地调查为什么会发生这种立即重复。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Google PubSub Python 多个订阅者客户端接收重复消息 的相关文章

  • 从 Dataflow 中的 BigQuery 读取时设置 MaximumBillingTier

    当我从 BigQuery 读取数据作为查询结果时 我正在运行 GCP Dataflow 作业 我正在使用 google cloud dataflow java sdk all 版本 1 9 0 设置管道的代码片段如下所示 PCollecti
  • kubernetes nginx ingress 无法将 HTTP 重定向到 HTTPS

    我有一个托管在 Google Cloud 平台中的网络应用程序 该应用程序位于负载均衡器后面 而负载均衡器本身位于入口后面 入口设置了 SSL 证书 并按预期接受 HTTPS 连接 但有一个问题 我无法让它将非 HTTPS 连接重定向到 H
  • 在同一事务上读取和修改 - Bigtable

    我正在构建一个优惠券系统 并且正在使用 Bigtable 我的架构有两列 客户 ID 和优惠券代码 我想查询表以检查客户是否已存在 如果为真 则返回代码 如果为假 则使用 ID 修改客户 ID 单元格并返回代码 我看到 Bigtable 中
  • gRPC(HTTP/2) 比使用 HTTP/2 的 REST 更快吗?

    目标是引入一种性能更好的传输和应用层协议latency and 网络吞吐量 目前 该应用程序使用REST with HTTP 1 1并且我们遇到了很高的延迟 我需要解决这个延迟问题并且我愿意使用gRPC HTTP 2 or 休息 HTTP2
  • 计算一次 GroupBy,然后将其传递给 Google DataFlow (Python SDK) 中的多个转换

    我正在使用适用于 Apache Beam 的 Python SDK 在 Google DataFlow 上运行特征提取管道 我需要运行多个转换 所有这些转换都希望项目按键分组 基于这个答案question https stackoverfl
  • 防止 Firebase 中的待处理写入事务不起作用

    我的目标是在单击按钮时将名称插入 Cloud Firestore 中 但如果用户未连接到互联网 我不希望保存处于挂起状态 我不喜欢 Firebase 保存待处理写入的行为 即使互联网连接已恢复 我研究发现Firebase 开发人员建议使用事
  • 如何使用google AI平台在线预测?

    我创建了一个自定义张量流模型并部署到谷歌云人工智能平台 但是 当我向在线预测 API 发送发布请求时 https ml googleapis com v1 projects my project models my model versio
  • 如何退出项目?

    我找不到离开该项目的方法 该项目的所有者不是我 任何通过 IAM 执行此操作的尝试都不会成功 我希望该项目停止显示在我的帐户中 获得许可的项目成员resourcemanager projects setIamPolicy需要在项目上添加 删
  • bq cmd 查询 Google Sheet 表出现“访问被拒绝:BigQuery BigQuery:未找到具有 Google Drive 范围的 OAuth 令牌”错误

    我有一个与Google Sheet连接的表 使用WebUI查询该表成功 但是如果我使用bq cmd查询 它将回显错误消息 访问被拒绝 BigQuery BigQuery Google Drive 没有 OAuth 令牌 范围已找到 我假设你
  • 是否可以使用 Anaconda 包作为 Google Cloud Functions 的依赖项?

    我正在使用 Python 运行时编写 Google Cloud Function 我需要包含一些无法使用的依赖项pip 如文档中所述here https cloud google com functions docs writing spe
  • 如何在 2 个项目之间移动 Google Cloud DNS 条目?

    我想做一些非常简单的事情 但看起来几乎不可能做到 我的 Google Cloud DNS 帐户中有不同的项目 我想将一些条目 域 从一个项目移动 迁移 到另一个项目 我不想走删除 重新创建路径 因为所有域都是活动的 并且我不希望有任何停机时
  • Cloud Run:429:请求被中止,因为没有可用的实例

    我们 作为一家公司 每天都会经历巨大的峰值 我们使用 Pub Sub gt Cloud Run 组合 我们遇到的问题是 当高流量到来时 Pub Sub 会尝试在没有任何流量控制的情况下同时将消息推送到 Cloud Run 结果 429 由于
  • Google Pub/Sub Java 示例

    我无法找到使用 java 从 pub sub 读取消息的方法 我在我的 pom 中使用这个 Maven 依赖项
  • 计算引擎启动脚本无法以非 root 用户身份执行

    将我的问题归结为最简单的情况 我使用带有以下启动脚本的 Compute Engine bin bash sudo useradd m drupal su drupal cd home drupal touch test txt 我可以确认
  • 从 Cloud Run 实例调用 Google Cloud API 的延迟

    当我出于某种原因从 Cloud Run 实例调用其他云 API 时 响应会出现巨大的延迟 一切都在 1 个项目内进行 即使从本地计算机调用也更快 几秒钟 但部署在云中 某些请求需要几分钟才能完成 据我所知 它与所有 API 相关 除了 Fi
  • 我可以在私有 GCP 网络子网中启动 Google 容器引擎 (GKE) 吗?

    我正在尝试在私有 GCP 网络子网中启动 Google 容器引擎 GKE 我创建了自定义 Google Cloud VPC 然后我也在该 VPC 下创建了自定义专用网络访问子网 1 当我使用私有子网创建 GKE 集群时 我的 Kuberne
  • 如何解密 BigQuery 中的列?

    我在 BigQuery 中有一些加密列 我想使用 BigQuery 函数对其进行解密 用于加密它们的机制是 AES 256 使用的向量以 UTF8 编码 生成的数据采用 Base64 加密 我想要做的是使用函数解密 BigQuery 中传递
  • GKE 上的 cloudrun 支持 websocket 吗?

    这是我第一次使用 GCP 我正在尝试将我的项目投入生产 但在让 websocket 通信正常工作时遇到了问题 我一直在谷歌上搜索 我非常不清楚 GKE 上运行的云是否支持入站 出站 Websocket 连接 限制文档指出 完全托管的 clo
  • Google 计算负载均衡器在 DELETE 时抛出 400 Bad Request

    我通过实例模板创建了一个实例组 并将该实例组与 http 负载均衡器使用的后端服务对齐 现在 当我从我创建的实例组中打开实例虚拟机的 URL 时 我可以执行以下操作GET POST and DELETE请求和所有请求都很快 一切都按预期进行
  • App Engine Flex 服务正在运行但找不到

    如何关闭我已删除且无法访问的 App Engine Flex 服务 昨天 我部署了一项新的 App Engine Flex 服务 然后将其删除 无论出于何种原因 该服务在一天后仍在运行 我可以通过查看日志查看器来确认它仍在运行并生成日志 当

随机推荐

  • 使用脚本而不是麦克风向谷歌助手发送命令

    我已经在 Raspberry Pi 3 中配置了 Google Assistant SDK 演示应用程序工作正常 有没有办法使用Python脚本将 OK Google 示例命令 发送到Google Assistant SDK 或者它只接受来
  • 安装时自动启动 Windows 服务

    我有一个 Windows 服务和一个 MSI 安装程序 安装项目 安装项目具有用于安装和卸载的自定义操作 参数分别为 install 和 uninstall 我希望该服务在安装后立即启动 我的服务所做的就是启动一个进程 当服务停止时 它会p
  • 如何在R中拟合受限VAR模型?

    我试图了解如何拟合特定的 VAR 模型 不一般 据我了解 拟合诸如一般 VAR 1 之类的模型是通过以下方式完成的 从 Cran 导入 vars 包 例如 考虑 y 是一个 10 x 2 的矩阵 然后我在导入 vars 包后这样做 y df
  • Django-2.x 中的文件上传安全性

    有关安全文件上传的文档 https docs djangoproject com en 2 0 ref models fields file upload security https docs djangoproject com en 2
  • 如何使用 python 3.2 生成(并标记)随机整数?

    好吧 我承认我是编程新手 但我无法确定如何让 python v3 2 在我给定的参数之间生成随机正整数 为了让您能够理解上下文 我尝试创建一个猜谜游戏 其中用户输入参数 例如 1 到 50 然后计算机在给定数字之间生成一个随机数 然后用户必
  • NumPy 中具有非常大矩阵的线性回归 - 如何节省内存?

    所以我有这些巨大的矩阵 X 和 Y X 和 Y 都有 1 亿行 X 有 10 列 我正在尝试用这些矩阵实现线性回归 并且我需要数量 X T X 1 X T Y 我怎样才能尽可能节省空间地计算它 现在我有 X readMatrix fileX
  • 重写 Java 泛型方法

    我想创建一个接口 用于将对象复制到同一类的目标对象 简单的方法是使用强制转换 import org junit Test import org junit internal runners JUnit4ClassRunner import
  • 智能(?)数据库缓存

    我见过几个数据库缓存引擎 它们都非常愚蠢 即 keep this query cached for X minutes 并要求您在一次之后手动删除整个缓存存储库INSERT UPDATE DELETE查询已执行 大约两三年前 我为我正在从事
  • C# WinAPI 单击菜单项

    我试图单击名为 Media Subtitler 的程序中的菜单项 但无论我尝试做什么 它都不起作用 首先 我尝试使用函数 GetMenu 但它返回 IntPtr Zero 然后 我尝试使用 ALT 键 使用菜单的第一个字母 F 代表文件 但
  • 从 Laravel 中的多对多关系中获取 Auth 用户的特定数据

    如何在 Laravel 多对多关系中获取特定经过身份验证的用户的数据 我有一个页面 它将显示所有社区的所有最新主题 但是 我想确保它只显示当前登录用户所属社区的线程 不知道我说的有没有道理 User php public function
  • 有什么方法可以捕获代码级别的 AWS lambda 超时错误吗?

    有什么办法可以捕捉到AWS lambda timed out代码级别的错误 以便我有机会在退出之前处理错误lambda功能 虽然 lambda 环境不会触发 超时 事件 但您可以自己轻松完成此操作 每种语言都有一个由context目的 到获
  • 如何用VBA获取Excel中用户的语言?

    我需要通过 VBA 获取 Excel 中的用户语言 因为 Excel 不会自动翻译数据透视表 并且我正在数据透视表中进行搜索VBA 刚刚发现德语中的枢轴看起来像这样 而在英语中它是这样的 我可以考虑一个选项 写一个 VLOOKUP Exce
  • 不使用OpenGL的Linux基础图形编程

    对于学习者来说 开始使用 C C 2D 和 3D 进行基本图形编程有哪些好的选择 我想尝试一下书中给出的东西 3D 数学入门 https rads stackoverflow com amzn click com 1568817231 显然
  • 根据另一列中自动完成的结果更改 jqGrid 文本列的可编辑属性

    我正在使用带有内联编辑功能的 jqGrid 4 4 0 为了这个问题 我的网格有四列 一个 ID 列 SomeGridRowId 一个带有 jQ uery 自动完成功能的文本列 Autocomplete 一个单字符文本列 SingleCha
  • Formsheet ios 8 约束与 iphone 约束相同

    我遇到这个问题 ios 8 中的表单采用 紧凑 宽度常规 高度 即所有 iPhone 约束 设置的约束 而不是 任何 任何 或 常规 宽度常规 高度 我对 iPhone 和 iPad 有两种不同的设计 因为表单正在消耗 iPhone 的约束
  • Ember.js:折叠/推迟昂贵的观察者或计算属性

    在 Ember 应用程序中 假设您有一个观察者或一个观察数组的属性 如下所示 topContributor function loop over articles hence slow property email protected cd
  • 如何获取我的应用程序的“可用”处理器数量?

    我知道如何获取计算机上的物理处理器数量和逻辑处理器数量 但我想知道我的应用程序可以访问多少个逻辑处理器 例如 我在四核机器上进行开发 但我有许多单核用户 并且在许多情况下我 简化 了界面 或者遇到了多核系统从未遇到过的锁定问题 因此 为此
  • flutter 中 UniqueKey 的范围是什么?

    我是一个独特的人 我的意思是我与其他人不同在这世上 当我在flutter中创建UniqueKey时 什么是world它与其他的有何不同 A UniqueKey其唯一性与每个实例化对象都是一个单独的对象完全相同 并没有什么神奇之处实施Uniq
  • 在模拟器 Swift 上运行自定义键盘时出错

    我正在尝试制作自定义键盘 对于简单的自定义键盘 它运行良好 但是在单击表情符号时 它显示以下错误 viewServiceDidTerminateWithError 错误域 UIViewServiceInterfaceErrorDomain
  • Google PubSub Python 多个订阅者客户端接收重复消息

    我有一个非常简单的应用程序 可以启动 PubSub 订阅者 StreamingPull 客户端 我已将其部署在 Kubernetes 上 以便可以扩展 当我部署一个 Pod 时 一切都会按预期进行 当我扩展到 2 个容器时 我开始收到重复的