Debezium Postgres Kafka 连接器心跳未提交 LSN

2023-12-30

我在 AWS RDS 上有一个 Postgres Db,并且有一个 kafka 连接器 (Debezium Postgres) 正在监听表。连接器的配置:

{
  "name": "my-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.dbname": "my_db",
    "database.user": "my_user",
    "max.queue.size": "32000",
    "slot.name": "my_slot",
    "tasks.max": "1",
    "publication.name": "my_publication",
    "database.server.name": "postgres",
    "heartbeat.interval.ms": "1000",
    "database.port": "my_port",
    "include.schema.changes": "false",
    "plugin.name": "pgoutput",
    "table.whitelist": "public.my_table",
    "tombstones.on.delete": "false",
    "database.hostname": "my_host",
    "database.password": "my_password",
    "name": "my-connector",
    "max.batch.size": "10000",
    "database.whitelist": "my_db",
    "snapshot.mode": "never"
  },
  "tasks": [
    {
      "connector": "my-connector",
      "task": 0
    }
  ],
  "type": "source"
}

该表不像其他表那样频繁更新,这最初导致了复制滞后,如下所示:

SELECT slot_name,
  pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as replicationSlotLag,
  pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) as confirmedLag,
  active
FROM pg_replication_slots;
           slot_name           | replicationslotlag | confirmedlag | active
-------------------------------+--------------------+--------------+--------
 my_slot                       | 1664 MB            | 1664 MB      | t

它会变得如此之大,以至于有可能耗尽所有磁盘空间。

我添加了一个心跳,如果我登录到 kafka 代理并设置一个控制台消费者,如下所示:./kafka-console-consumer.sh --bootstrap-server my.broker.address:9092 --topic __debezium-heartbeat.postgres --from-beginning --consumer.config=/etc/kafka/consumer.properties它会转储所有心跳消息,然后每 1000 毫秒显示一条新消息。

然而,插槽的尺寸仍在不断增大。如果我执行类似在表中插入一条虚拟记录之类的操作,它会将插槽设置​​回一个小滞后,这样就可以了。

不过,我想用心跳来做这件事。我不想插入定期消息,因为这听起来会增加复杂性。为什么心跳没有减少槽大小?


请看一下https://debezium.io/documentation/reference/1.0/connectors/postgresql.html#wal-disk-space https://debezium.io/documentation/reference/1.0/connectors/postgresql.html#wal-disk-space

您确实需要发出定期消息,但现在有一个帮助 -https://issues.redhat.com/browse/DBZ-1815 https://issues.redhat.com/browse/DBZ-1815

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Debezium Postgres Kafka 连接器心跳未提交 LSN 的相关文章

  • H2 和 PostgreSQL 兼容模式限制

    我使用 H2 数据库作为内存数据库进行测试 其中 PostgreSQL 在生产中使用 除了两者之间存在一些细微差别之外 此设置工作正常 我现在关心的一个问题是PostgreSQL 中标识符的长度限制为 64 https www postgr
  • Strimzi 运算符 Kafka 集群 ACL 未启用类型:简单

    我们知道要启用Kafka ACL属性authorizer class name kafka security auth SimpleAclAuthorizer要添加到server properties但是如果 Kafka 集群由 Strim
  • 找不到 io.confluence:kafka-protobuf-serializer:6.0.0

    直接的问题是 为什么 Gradle 没有解决我添加的这个依赖关系 dependencies kafka protobuf serializer implementation io confluent kafka protobuf seria
  • kafka Avro 多个主题的消息反序列化器

    我正在尝试以 avro 格式反序列化 kafka 消息 我使用以下代码 https github com ivangfr springboot kafka debezium ksql blob master kafka research c
  • 更改 IdentityServer4 实体框架表名称

    我正在尝试更改由 IdentityServer4 的 PersistedGrantDb 和 ConfigurationDb 创建的默认表名称 并让实体框架生成正确的 SQL 例如 而不是使用实体IdentityServer4 EntityF
  • 查找一列中具有相同值而另一列中具有其他值的行?

    我有一个 PostgreSQL 数据库 将用户存储在users他们参与的表格和对话conversation桌子 由于每个用户可以参与多个对话 并且每个对话可以涉及多个用户 因此我有一个conversation user链接表来跟踪哪些用户正
  • 使用 pg-promise 进行多行插入

    我想用一个插入多行INSERT查询 例如 INSERT INTO tmp col a col b VALUES a1 b1 a2 b2 有没有一种方法可以轻松地做到这一点 最好是对于像这样的对象数组 col a a1 col b b1 co
  • postgreSQL 将分区表(带插入触发器)从一台服务器转储和恢复到另一台服务器

    尝试将分区表从一台服务器转储到 PostgreSQL 9 4 5 中的另一台服务器 对 postgres 相当陌生 并继承了该项目 如果需要更多背景信息 请告诉我 dbname gt SELECT COUNT id FROM parent
  • Kafka Streams 内部数据管理

    在我的公司 我们广泛使用 Kafka 但出于容错的原因 我们一直使用关系数据库来存储多个中间转换和聚合的结果 现在我们正在探索 Kafka Streams 作为一种更自然的方式来做到这一点 通常 我们的需求非常简单 其中一个例子是 监听输入
  • 获取 Postgres 数据库中每个表的行数

    获取数据库中所有表的行数的最有效方法是什么 我正在使用 Postgres 数据库 结果示例 table name row count some table 1 234 foobar 5 678 another table 32 如果您想要特
  • org.postgresql.util.PSQLException:协议错误。会话设置失败

    我知道这些类型的问题已经存在 但提供的解决方案对我不起作用 在我的应用程序中 没有版本不匹配的黑白驱动程序和 PostgreSQL 服务器 我还没有找到任何其他解决方案 我正在使用 PostgreSQL 服务器 9 4 和 postgres
  • 从副本消费

    Kafka 将主题的每个分区复制到指定的复制因子 据我所知 所有写入和读取请求都会路由到分区的领导者 有没有办法从追随者那里消费而不是从领导者那里消费 Kafka中的复制只是为了故障转移吗 在 Kafka 2 3 及更早版本中 您只能从领导
  • 带回调或异步/等待的节点 postgres 事务?

    我正在运行 Node 7 6 0 它支持 async await node postgres 客户端池支持 async await 并且有一个很好的示例here https github com brianc node pg pool pl
  • 如何在 PostgreSQL 中克隆记录

    我想循环查询 但也保留下一个循环的实际记录 这样我就可以比较两个相邻的行 CREATE OR REPLACE FUNCTION public test RETURNS void AS body DECLARE previous RECORD
  • PSQL [错误] - 值被识别为列

    前几天刚开始学习数据库 我遇到了这个问题 我的值被识别为一列 并且它吐出了一个错误 这是我的News table id bodyText url createdAt updatedAt 这是我在 psql 中运行的命令 INSERT INT
  • 如何以编程方式使用包含多列的 where-in 子句执行 PostgreSQL 查询?

    我的查询是这样的 select from plat customs complex where code t code s in 01013090 10 01029010 90 它在 psql 控制台中运行良好 我的问题是如何在客户端代码中
  • 编辑 Kafka Listener Spring 应用程序以更改阶段/目标

    我可以利用另一个运行 Kafka 应用程序 代码库的团队来使用相同的数据 将其加载到我们的新暂存表中 而不是他们的 他们在 Messages 文件夹中有许多不同的 kafka 侦听器适配器 java 文件 每个文件消耗不同类型的数据 每个
  • Laravel 5.3 Eloquent 事务和外键限制

    我正在从事一个更大的项目 我们在一个 Postgres 数据库中有多个模式 我们在模式之间创建了外键 这是一个例子 gt 我们有公司模式和用户模式 公司模式有company users表 该表对user users表有外键限制 CREATE
  • PostgreSQL:有效地将 JSON 数组拆分为行

    我有一个表 表 A 其中包含一个包含 JSON 编码数据的文本列 JSON 数据始终是一个包含一到几千个普通对象的数组 我有另一个表 表 B 其中有几列 包括数据类型为 JSON 的列 我想从表 A 中选择所有行 将 json 数组拆分为其
  • 每个搜索词显示一行,如果未找到则替换默认值

    Query SELECT product id name FROM product WHERE barcode in 681027 8901030349379 679046 679047 679082 679228 679230 67923

随机推荐