事务性 Kafka 生产者

2024-05-08

我正在尝试让我的卡夫卡生产者具有事务性。 我正在发送 10 条消息。如果发生任何错误,则不应向 kafka 发送任何消息,即不发送或全部消息。

我正在使用 Spring Boot KafkaTemplate。

@Configuration
@EnableKafka
public class KakfaConfiguration {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> config = new HashMap<>();

        // props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
        // props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
        // appProps.getJksLocation());
        // props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
        // appProps.getJksPassword());
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.ACKS_CONFIG, acks);
        config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackOffMsConfig);
        config.put(ProducerConfig.RETRIES_CONFIG, retries);
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "prod-99");

        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean(name = "ktm")
    public KafkaTransactionManager kafkaTransactionManager() {
        KafkaTransactionManager ktm = new KafkaTransactionManager(producerFactory());
        ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        return ktm;
    }

}

我正在发送 10 条消息,如下所示,如文档中所述。应发送 9 条消息,并且 I 消息大小超过 1MB,由于以下原因被 Kafka 代理拒绝RecordTooLargeException

https://docs.spring.io/spring-kafka/reference/html/#using-kafkatransactionmanager https://docs.spring.io/spring-kafka/reference/html/#using-kafkatransactionmanager

@Component
@EnableTransactionManagement
class Sender {

    @Autowired
    private KafkaTemplate<String, String> template;

    private static final Logger LOG = LoggerFactory.getLogger(Sender.class);

    @Transactional("ktm")
    public void sendThem(List<String> toSend) throws InterruptedException {
        List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
        CountDownLatch latch = new CountDownLatch(toSend.size());
        ListenableFutureCallback<SendResult<String, String>> callback = new ListenableFutureCallback<SendResult<String, String>>() {

            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOG.info(" message sucess : " + result.getProducerRecord().value());
                latch.countDown();
            }

            @Override
            public void onFailure(Throwable ex) {
                LOG.error("Message Failed ");
                latch.countDown();
            }
        };

        toSend.forEach(str -> {
            ListenableFuture<SendResult<String, String>> future = template.send("t_101", str);
            future.addCallback(callback);
        });

        if (latch.await(12, TimeUnit.MINUTES)) {
            LOG.info("All sent ok");
        } else {
            for (int i = 0; i < toSend.size(); i++) {
                if (!futures.get(i).isDone()) {
                    LOG.error("No send result for " + toSend.get(i));
                }
            }
        }

但是当我看到主题 t_hello_world 9 消息时就在那里。我的期望是看到 0 条消息,因为我的生产者是事务性的。 我怎样才能实现它?

我收到以下日志

2020-04-30 18:04:36.036 ERROR 18688 --- [   scheduling-1] o.s.k.core.DefaultKafkaProducerFactory   : commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1eb5a312, txId=prod-990]

org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
    at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:923) ~[kafka-clients-2.4.1.jar:na]
at org.apache.kafka.clients.producer.internals.TransactionManager.lambda$beginCommit$2(TransactionManager.java:297) ~[kafka-clients-2.4.1.jar:na]
at org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1013) ~[kafka-clients-2.4.1.jar:na]
at org.apache.kafka.clients.producer.internals.TransactionManager.beginCommit(TransactionManager.java:296) ~[kafka-clients-2.4.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.commitTransaction(KafkaProducer.java:713) ~[kafka-clients-2.4.1.jar:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.commitTransaction(DefaultKafkaProducerFactory.java



Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

2020-04-30 18:04:36.037  WARN 18688 --- [   scheduling-1] o.s.k.core.DefaultKafkaProducerFactory   : Error during transactional operation; producer removed from cache; possible cause: broker restarted during transaction: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1eb5a312, txId=prod-990]
2020-04-30 18:04:36.038  INFO 18688 --- [   scheduling-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-prod-990, transactionalId=prod-990] Closing the Kafka producer with timeoutMillis = 5000 **ms.
2020-04-30 18:04:36.038  INFO 18688 --- [oducer-prod-990] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-990, transactionalId=prod-990] Aborting incomplete transaction due to shutdown**

未提交的记录写入日志;当事务提交或回滚时,会在日志中写入一条额外的记录,其中包含事务的状态。

默认情况下,消费者可以看到所有记录,包括未提交的记录(但看不到特殊的提交/中止记录)。

对于控制台消费者,需要将隔离级别设置为read_committed。查看帮助:

--isolation-level <String>           Set to read_committed in order to      
                                       filter out transactional messages    
                                       which are not committed. Set to      
                                       read_uncommitted to read all          
                                       messages. (default: read_uncommitted)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

事务性 Kafka 生产者 的相关文章

  • 通过 CMD 获取启用 SSL 的 Kafka 中的最新偏移量

    我一直在使用下面的 CMD 从打开纯文本端口的 Kafka 队列中获取最新的偏移量 kafka run class sh kafka tools GetOffsetShell broker list server 9092 topic sa
  • 安装 confluence-kafka 时“文件名或扩展名太长”?

    我在使用 pip install confluence kafka 安装 confluence kafka 时遇到一些问题 但我收到此错误 文件名或扩展名太长 详细信息如下 Collecting confluent kafka Using
  • 即使在kafka机器重新启动后,如何保留kafka保留字节和kafka保留段[重复]

    这个问题已经存在了 we set retention bytes价值 104857600对于主题 topic test root confluent01 kafka topics zookeeper localhost 2181 alter
  • 我们如何读取给定时间范围内的Kafka主题?

    我需要读取 Kafka 主题中给定时间范围内的消息 我能想到的解决方案是首先找出时间范围开始的最大偏移量 然后继续消费消息 直到所有分区上的偏移量超过时间范围的末尾 有没有更好的方法来解决这个问题 谢谢 好吧 您肯定必须首先搜索适合时间范围
  • 是否可以使用 Kafka Streams 访问消息头?

    随着添加Headers http apache spinellicreations com kafka 0 11 0 0 javadoc org apache kafka common header Header html到记录 生产者记录
  • 事务性 Kafka 生产者

    我正在尝试让我的卡夫卡生产者具有事务性 我正在发送 10 条消息 如果发生任何错误 则不应向 kafka 发送任何消息 即不发送或全部消息 我正在使用 Spring Boot KafkaTemplate Configuration Enab
  • 如何复制或配置kafka connect插件文件?

    我已经从以下位置下载了插件文件https www confluence io connector kafka connect cdc microsoft sql https www confluent io connector kafka
  • 命名 kafka 主题的最佳实践是什么?

    我们是 kafka 的新手 我们有几个团队正在开发一些相互发布 订阅事件的应用程序 由于kafka主题名称将在团队之间共享 那么命名有什么最佳实践吗 基本上我们不希望看到 A 团队命名主题companyname appname events
  • 从副本消费

    Kafka 将主题的每个分区复制到指定的复制因子 据我所知 所有写入和读取请求都会路由到分区的领导者 有没有办法从追随者那里消费而不是从领导者那里消费 Kafka中的复制只是为了故障转移吗 在 Kafka 2 3 及更早版本中 您只能从领导
  • Kafka JDBC Sink Connector 对于具有可选字段的模式的消息给出空指针异常

    Kafka JDBC Sink Connector 对于具有可选字段 parentId 的模式的消息给出空指针异常 我错过了什么吗 我正在使用开箱即用的 JSONConverter 和 JDBC Sink Connector 关于 Kafk
  • Kafka Streams - 跳跃窗口 - 去重键

    我正在 4 小时窗口上进行跳跃窗口聚合 每 5 分钟前进一次 由于跳跃窗口重叠 我得到了具有不同聚合值的重复键 TimeWindows of 240 60 1000L advanceBy 5 60 1000L 如何消除具有重复数据的重复键或
  • Apache kafka - 消费者延迟选项

    我想在 Kafka 中为特定主题稍稍延迟启动一个消费者 具体来说 我希望消费者在从生成消息的时间起经过特定的时间延迟后开始使用该主题的消息 Kafka 中有任何属性或选项可以启用它吗 我们对火花流做了同样的事情 我希望 这种方法也适合您 这
  • 如何使用rest api设置kafka连接auto.offset.reset

    我创建了一个接收器 kafka 连接 将数据转换为其他存储 我想设置auto offset reset as latest当新连接器创建时kafka connect rest api 我已经设定consumer auto offset re
  • 连接到 Apache Kafka 多节点集群中的 Zookeeper

    我按照以下说明设置了多节点 kafka 集群 现在 如何连接到zookeeper 是否可以从 JAVA 中的生产者 消费者端仅连接到一个 ZooKeeper 或者是否有一种方法可以连接所有 ZooKeeper 节点 设置多节点 Apache
  • Spark shell (spark 3.0.0) 添加包 confluence kafka 5.5.1 javax.ws.rs-api 问题

    我本地的win10 WSL回到ubuntu 在ubuntu上 我安装了spark3 0 0 confluence平台5 5 1 手动下载 当我尝试运行spark shell或spark submit时 下面是shell示例 spark sh
  • 将数据从 Kafka 存储传输到 Kafka 主题

    我想在卡夫卡做这样的事情 继续将数据存储在 KStream Ktable Kafka store 中 当我的应用程序收到特定事件 数据时 仅将上述存储中的特定数据集发送到主题 我们可以在卡夫卡中做到这一点吗 我认为单独使用 Kafka 消费
  • 卡夫卡主题查看器? [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我想调试一些 Kafka 主题 这样我就知道消费者或生产者是否有问题 Kafka 是否有一个 UI 我
  • 如何在 Python 中以编程方式检查 Kafka Broker 是否已启动并运行

    我正在尝试使用来自 Kafka 主题的消息 我正在使用包装器confluent kafka消费者 我需要在开始使用消息之前检查连接是否已建立 我读到消费者很懒 所以我需要执行一些操作才能建立连接 但我想检查连接建立而不执行consume o
  • 从 Apache Kafka 中的主题删除消息

    所以我是 Apache Kafka 的新手 我正在尝试创建一个简单的应用程序 以便我可以更好地理解 API 我知道这个问题在这里被问了很多 但是如何清除存储在主题上的消息 记录 我看到的大多数答案都说要更改消息保留时间或删除并重新创建主题
  • 了解Kafka流groupBy和window

    我无法理解 kafka 流中的 groupBy groupById 和窗口的概念 我的目标是聚合一段时间内 例如 5 秒 的流数据 我的流数据看起来像 value 0 time 1533875665509 value 10 time 153

随机推荐

  • 如何在android中直接从.zip文件读取文件而不解压它

    过去几个月我一直在研究 android 现在我的问题是读取放在 sdcard 上的 zip 文件 我已经成功完成了将 zip 文件下载到 SD 卡上的编码 我已将 img zip 文件下载到 SD 卡上 此 img zip 包含 5 个图像
  • C++ 求矩阵最小和最大元素之间的元素和

    因此 我的程序按其应有的方式工作 但前提是最小和最大元素位于对角 所以我的问题是如何将二维数组从一个特定元素迭代到另一个元素 也许摆脱一些嵌套循环 我应该将这个数组转换为一维吗 这是代码的正确工作方式 就在这时 出现了问题 元素 0 1 和
  • 当子列表视图在颤动中到达末尾时,有什么方法可以滚动父列表视图吗?

    假设我有一个可滚动页面 在该页面内我有另一个可滚动列表视图 垂直 所以我希望当子列表视图到达末尾时 可滚动页面开始移动到其末尾 此外 当子列表视图到达顶部时 可滚动页面开始移动到顶部 怎样才能做到这一点 这是我的代码 Widget Fres
  • P2P网络游戏/应用程序:类似“战网”匹配服务器的不错选择

    我正在制作一个网络游戏 1v1 游戏中是 p2p 不需要游戏服务器 然而 为了让玩家能够 找到彼此 而不需要在另一种媒介中协调并输入IP地址 类似于网络游戏的现代时代 我需要有一个协调 匹配服务器 我无法使用常规网络托管 因为 客户端将使用
  • 构建 jar 后无法运行 exe

    我制作了一个简单的实用应用程序 其中我有一个要运行的exe文件 我通过使用它来运行 Runtime getRuntime exec this getClass getResource filename exe getPath 当我从 ide
  • Java 中的 sscanf 等效项[重复]

    这个问题在这里已经有答案了 可能的重复 用于使用已知模式解析字符串中的值的 sscanf 的 Java 等效项是什么 https stackoverflow com questions 8430022 what is the java eq
  • 在 Java 构建过程中更改常量的最佳方法

    我继承了一个在 Tomcat 下运行的 Java 应用程序 servlet 由于历史原因 根据应用程序的部署位置 本质上是品牌问题 代码具有不同的 外观和感觉 选项 有几个常量控制这个品牌过程 它们具有不同的功能 不应压缩为单个常量 即 B
  • VT_DATE 类型的微秒支持

    VT DATE 变体类型是否支持微秒分辨率 请告诉我如何在VB中显示相同的内容 http msdn microsoft com en us library ms221646 aspx http msdn microsoft com en u
  • 回调和部分回发有什么区别?

    有区别吗 或者这些术语是同义词吗 抱歉 如果之前有人问过这个问题 我只能找到a之间的区别full回发和回调 我已经知道完整回发有何不同 在使用 ASP Net 2 0 时 如果这很重要的话 顺便问一下 这重要吗 或者这些术语对于任何基于 W
  • 用于将字符串与预定义字符混合/混淆的简单算法

    我有一个字符串如下 它的长度是10 它代表基数 36 因此包含数字和大写字母 字符串的来源是数据库生成的序列 即从 1 及以上 正在转换为基数 36 我的问题是转换为base 36转换的结果也是连续 顺序的 例如 ID 1402 gt 00
  • 获取已安装的 Windows 应用商店应用程序列表

    有多种方法可以获取控制面板中 添加 删除程序 中已安装应用程序的列表 但我也想从 Windows 应用商店获取已安装应用程序的列表 到目前为止我还没有得到任何东西 有什么方法可以获取从 Windows 应用商店安装的应用程序列表吗 您可以在
  • SQL Proc 从 varchar 到 int 的“转换失败”。为什么要转换?

    我的问题是 为什么它从 varchar 转换为 int 我不确定它想做什么 CREATE PROCEDURE myTestProcedure TransId VARCHAR 15 AS BEGIN DECLARE Result VARCHA
  • MyBatis Spring Boot 自定义类型处理程序

    我需要 Spring Boot 和 MyBatis 集成方面的帮助 我对自定义 BaseTypeHandler 有疑问 我创建了一个映射器 MappedTypes LocalDateTime class public class Local
  • 什么是“更便宜”的性能明智的 $broadcast 或 $watch

    我的应用程序中有一种情况 每次用户的角色发生变化时 我都需要重新加载菜单 一个用户可以在多个公司中拥有角色 我想知道解决这个问题的最佳方法是什么 目前我正在做以下事情 app controller menuLoadingCtrl funct
  • 如何在辅助显示器上全屏显示图像?

    如何使用 PyQt5 PySide 或任何其他 Python 库在辅助 显示器上以全屏模式显示所需的图像 过去 我使用帧缓冲区图像查看器 Fbi https manpages ubuntu com manpages bionic man1
  • 安卓多点触控?

    作为一名开发人员 我倾向于先编程 然后再研究 我试图实现一个可以处理多个用户输入的屏幕 基本上映射的不仅仅是一根手指 我尝试了两件事 我有一个实现 OnTouchListener 的 Activity 类 这里我有两个单独的子视图 它们将
  • ios6 UIImageView - 加载-568h 图像

    我看过一些关于 UIImage 自动加载的帖子文件名 568 png新的 iOS6 中的图像 但我似乎无法在 UIImageView 类中重新创建它 我正在使用故事板 不是我的应用程序 只需要做一些检查 并且我有一个简单的布局 仅缩放图像视
  • 我可以通过编程方式获取连接到手机的 wifi 的 MAC 地址吗?

    我的手机已连接到 wifi 我想获取我的 wifi 的 MAC 地址 BSSID 和 mac 地址是同一回事 您可以通过此函数获取 mac 地址 只需导入 SystemConfiguration CaptiveNetwork func ge
  • 易于使用的Python加密库/包装器?

    我想在Python中用密码加密任意长度的字符串 我会比较喜欢not处理填充 密钥生成和 IV 因为说实话 我对密码学还不太了解 而且我想避免搞砸 我还更喜欢使用众所周知的密码作为 AES 我理想的库 我们称之为 MagicCrypt 会像这
  • 事务性 Kafka 生产者

    我正在尝试让我的卡夫卡生产者具有事务性 我正在发送 10 条消息 如果发生任何错误 则不应向 kafka 发送任何消息 即不发送或全部消息 我正在使用 Spring Boot KafkaTemplate Configuration Enab