Kafka中如何使用事务以及如何使用abortTransaction?

2024-05-01

我是 kafka 新手,我使用 Kafka Producer Java api。 面对Kafka的这个问题,Kafka: Invalid transition attempted from state COMMITTING_TRANSACTION to state ABORTING_TRANSACTION.

人们已经写过producer.abortTransaction()仅当没有正在进行的交易时才应调用...... 知道如何检查飞行中是否有交易吗?以及如何清除/停止它们?

这是我的代码:

try { 
  producer.send(record, new Callback() { 
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) { 
      if ( e != null){ 
        logger.info("Record was not sent due to kafka issue");
        throw new KafkaException("Record was not sent due to kafka issue");
      }
    }
  });
} catch (KafkaException e){
  producer.abortTransaction(); 
}

我需要实现的是检测kafka何时停止,在这种情况下清除所有缓冲区,以便当kafka再次启动时这些缓冲区中的记录不会出现在消费者端。

在这种情况下,您通常要做的是应用 Java 文档中描述的事务。Kafka生产者 https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html:

 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("transactional.id", "my-transactional-id");
 Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

 producer.initTransactions();

 try {
     producer.beginTransaction();
     for (int i = 0; i < 100; i++)
         producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
     producer.commitTransaction();
 } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
     // We can't recover from these exceptions, so our only option is to close the producer and exit.
     producer.close();
 } catch (KafkaException e) {
     // For all other exceptions, just abort the transaction and try again.
     producer.abortTransaction();
 }
 producer.close();

这样,如果满足以下条件,则 100 条记录要么全部对消费者可见,要么全部不可见:isolation.level被设定为read_committed.

你正在关闭producer.close()对于不可恢复的异常,例如

  • 生产者防护异常:这个致命异常表明另一个生产者具有相同的transactional.id已经开始了。只能有一个生产者实例transactional.id在任何给定时间,最新启动的实例都会“隔离”先前的实例,以便它们无法再发出事务请求。当遇到此异常时,必须关闭生产者实例。

  • 乱序序列异常:此异常表明代理从生产者处收到了意外的序列号,这意味着数据可能已丢失。如果生产者仅配置为幂等性(即如果enable.idempotence已设置并且没有transactional.id已配置),可以继续使用相同的生产者实例发送,但这样做有重新排序发送记录的风险。对于事务性生产者来说,这是一个致命错误,您应该关闭生产者。

  • 授权异常:[不言自明]

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka中如何使用事务以及如何使用abortTransaction? 的相关文章

随机推荐

  • 如何将变量传递到 Azure 数据工厂 REST url 查询字符串

    我是 Azure 数据工厂的新手 我有一个链接到 REST api 的源数据集 该 API 的 url 有一个查询字符串 我有一个将数据从 REST 复制到数据库的活动 但我必须在查询字符串中传递不同的值 并对不同的值运行相同的活动 如何在
  • 使用 jQuery 获得第一堂课和最后一堂课

    可能是新手问题 我有这样的代码行 div class template active 我需要为自己准备每一堂课 我尝试了这段代码 this attr class 从该代码中我得到 模板处于活动状态 我需要的是一个带有 template 的字
  • Java 中的 lambda 目标类型和目标类型上下文是什么意思?

    我正在阅读 Herbert Schildt 的 Java 完整参考 中关于 lambda 的一章 其中有很多对 lambda 目标类型 和 目标类型上下文 的引用 函数式接口定义了目标类型的一个 拉姆达表达式 这里有一个关键点 只能使用 l
  • 如何为webpack使用自己的jade文件?

    我是 webpack 的新手 并试图弄清楚如何在 webpack dev server 中使用我自己的 html 文件以及我的 webpack 构建 在我的 app js 中我有 require jade index jade 但这并不意味
  • 通过页面打开 mysql 连接是完全鲁莽的吗?

    当查询数据库时 是否会感到极度偏执 每次必须完成新查询时 我都会打开和关闭 mysql 连接 我担心 尤其是启用 ajax 的页面 这会导致性能大幅下降 我应该继续使用此方法 还是至少在每个页面 而不是每个查询 中打开和关闭连接一次 顺便说
  • 如何在vala中使用gettext?

    当我尝试在 vala 中使用 gettext 时 我没有收到来自 vala 的错误或警告 但我从 c 编译器收到以下错误 usr include glib 2 0 glib gi18n lib h 29 2 error error You
  • 单击时显示数组中的下一个图像

    我正在努力设置可以通过屏幕箭头点击的图像 目前 我的图像全部通过循环和数组显示 我已经能够进行设置 以便当您将鼠标悬停在小图像预览上时 主图像将更改为该图像 也就是说 您可以将鼠标悬停在它们上以查看更大的版本 我的数组位于 mongo 模型
  • Unity构建错误

    所以我制作了我的游戏并尝试构建它 我收到一些对我来说毫无意义的错误 这是错误 UnityEditor BuildPlayerWindow BuildMethodException 2 个错误 在 UnityEditor BuildPlaye
  • C++ static constexpr 成员在类外重新声明

    对于以下代码 为什么 main 中的第一个案例无需重新声明 Foo bar 就可以正常工作 而带有该函数的第二个案例则需要它 struct Foo static constexpr int bar 30 Declaration of Foo
  • 从 ScheduledExecutorService 中删除 Runnable

    情况是这样的 代码如下 用户填写 3 个字段并按 添加 按钮 gt 创建一个 ToDoBean 并将其添加到 ToDoModel 扩展 AbstractTableModel 并使用模型中 ToDoBean 的索引创建一个 Runnable
  • 同一应用服务中的 Azure 函数是否在同一实例中运行

    我有一个场景 我的一个类有一个静态成员 我可以从函数应用程序设置其值 假设属于同一应用服务计划的另一个功能应用也使用同一个类 并且也设置 依赖静态成员的值 现在 如果两个功能应用程序从不同时运行 我们就没有问题了 另外 如果它们不在同一个实
  • 组合 scep 和 mdm 有效负载时出错 - 注册服务器未提供有效的身份证书

    我正在致力于推出自己的 MDM 服务 并尝试按照 Apple 的 MDM 协议文档的建议将 SCEP 和 MDM 有效负载结合起来 我在 C Net 中创建了自己的 SCEP Web 服务 并且我知道当我发送 SCEP 有效负载时 设备可以
  • 禅德导航。带有自定义选项的子菜单

    在我的布局脚本中 我也需要生成 渲染我的菜单 如果菜单项有一个子菜单 我会更改我的菜单项 以便它会呈现 li 原因是我会有一张图片 li li 网页元素存在 ul li a href da front news Nyt a li li cl
  • attributeError:模块“pyproj”没有属性“CRS”

    地图数据库扫描 import geopandas as gpd import contextily as ctx import pyproj from mpl toolkits axes grid1 inset locator import
  • Python:从源代码安装模块

    当然你们都知道答案 而且很容易 但我是 python 新手 我在网上找到了一段代码 读取雅虎财经的股价 usr bin env python Copyright c 2007 2008 Corey Goldberg email protec
  • 无需使用abs函数或if语句即可获取绝对值

    我在想如何在不使用的情况下获得整数的绝对值if声明也不abs 起初我使用的是左移位 lt lt 试图将负号移出范围 然后将位右移回原来的位置 但不幸的是它对我不起作用 请让我知道为什么它不起作用以及其他替代方法 From 位摆弄黑客 htt
  • NSMutableAttributedString 的自动换行

    我有 NSMutableAttributedString 并且字符串很长 我想在 UIlabel 上显示它时进行自动换行 如果是 NSString 我会继续做这样的事情 动态 UILabel 截断文本 https stackoverflow
  • C++:如何通过时间和本地时间获取实际时间?

    我正在寻找一种在 C 中以 HH MM SS 方式节省时间的方法 我在这里看到它们有很多解决方案 经过一番研究后我选择了time and localtime 然而 似乎localtime函数有点棘手 因为它says http rabbit
  • 如何编写javadoc链接?

    如何将链接写入 javadoc 目前 我有类似的东西 link java lang Math sqrt double Math sqrt 生成文本Math sqrt应该链接到java lang Math sqrt double 然而 API
  • Kafka中如何使用事务以及如何使用abortTransaction?

    我是 kafka 新手 我使用 Kafka Producer Java api 面对Kafka的这个问题 Kafka Invalid transition attempted from state COMMITTING TRANSACTIO