Spring Boot中事务同步与Database+kafka示例

2023-12-28

我想使用 Spring boot 编写一个新应用程序,使用 MySQL + Mango 数据库和 Spring Kafka 进行消息传递。

我尝试使用 Many POC 来同步 Kafka 和 DB 之间的事务,但在某些情况下失败了,并且我还搜索了许多存储库、博客以获取至少一个示例。我现在还没有得到任何例子。

如果有人至少给出一个示例或配置,这对所有人来说都是一个很好的参考。


干得好...

@SpringBootApplication
public class So56170932Application {

    public static void main(String[] args) {
        SpringApplication.run(So56170932Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> template.executeInTransaction(t -> t.send("so56170932a", "foo"));
    }

    @Bean
    public ChainedKafkaTransactionManager<Object, Object> chainedTm(KafkaTransactionManager<String, String> ktm,
            DataSourceTransactionManager dstm) {

        return new ChainedKafkaTransactionManager<>(ktm, dstm);
    }

    @Bean
    public DataSourceTransactionManager dstm(DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory,
            ChainedKafkaTransactionManager<Object, Object> ctm) {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.getContainerProperties().setTransactionManager(ctm);
        return factory;
    }

    @Component
    public static class Listener {

        private final JdbcTemplate jdbcTemplate;

        private final KafkaTemplate<String, String> kafkaTemplate;

        public Listener(JdbcTemplate jdbcTemplate, KafkaTemplate<String, String> kafkaTemplate) {
            this.jdbcTemplate = jdbcTemplate;
            this.kafkaTemplate = kafkaTemplate;
        }

        @KafkaListener(id = "so56170932a", topics = "so56170932a")
        public void listen1(String in) {
            this.kafkaTemplate.send("so56170932b", in.toUpperCase());
            this.jdbcTemplate.execute("insert into so56170932 (data) values ('" + in + "')");
        }

        @KafkaListener(id = "so56170932b", topics = "so56170932b")
        public void listen2(String in) {
            System.out.println(in);
        }

    }

    @Bean
    public NewTopic topicA() {
        return TopicBuilder.name("so56170932a").build();
    }

    @Bean
    public NewTopic topicB() {
        return TopicBuilder.name("so56170932b").build();
    }

}

and

spring.datasource.url=jdbc:mysql://localhost/integration?serverTimezone=UTC
spring.datasource.username=root
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.properties.isolation.level=read_committed

spring.kafka.producer.transaction-id-prefix=tx-

logging.level.org.springframework.transaction=trace
logging.level.org.springframework.kafka.transaction=debug
logging.level.org.springframework.jdbc=debug

and

mysql> select * from so56170932;
+------+
| data |
+------+
| foo  |
| foo  |
| foo  |
| foo  |
| foo  |
| foo  |
| foo  |
| foo  |
| foo  |
+------+
9 rows in set (0.00 sec)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spring Boot中事务同步与Database+kafka示例 的相关文章

随机推荐

  • Spring Boot应用程序启动后立即关闭

    我正在尝试构建一个简单的 Spring Boot 应用程序 当我运行 Spring Boot 应用程序时 它在启动后立即关闭 下面是控制台日志 Spring Boot v1 4 1 BUILD SNAP
  • 符号导数和积分

    我想集成功能f4关于x然后求新函数的导数t 我可以用数值方法计算积分 有没有办法在R中以符号方式计算这个积分和导数 lambda 1 ci 1 aa lt function u k t f4 lt function x f1 lt func
  • 如何让实验性 ngTemplateOutlet 发挥作用?

    我正在尝试在 Angular2 中构建一个列表组件 该组件从组件的用户那里获取项目 列和项目字段的模板 所以我正在尝试使用ngTemplateOutlet and ngOutletContext 我读过的都是实验性的 但我无法让它发挥作用
  • 使用 RXJava 和 Retrofit 获取标头信息

    我正在尝试将当前使用 Retrofit 的应用程序转换为使用 RX Java 为了处理分页 我传统上是从响应标头中获取 nextPage URL Override public void success Assignment assignm
  • 具有可选属性的 JSON 类型提供程序的数据示例

    我正在尝试使用 JSON 类型提供程序通过 API 访问 StackOverflow StackExchange 数据 它效果很好 但有一点需要注意 API 有一个节流阀 它由 退避 字段发出信号 其中包含您应该退避到下一个请求之前的秒数
  • Python 语音比较

    我有两个 wav 文件 我需要比较并确定它们是否包含相同的单词 顺序也相同 一段时间以来我一直在寻找最好的方法 我不知道如何让 pyspeech 使用文件作为输入 我尝试过让 CMU sphinx 项目正常工作 但我似乎无法让 GStrea
  • NGINX:“客户端在读取客户端请求行时发送了无效方法”

    为 Nginx 设置 SSL 并发出请求后 我收到带有神秘消息的乱码响应client sent invalid method while reading client request line 我在 Alpine Docker 容器中使用
  • 以编程方式获取 iOS 设备的 IMEI 或 UDID

    1 Apple 是否允许开发者检索用户设备的 IMEI 号码和 UDID 2 如何以编程方式获取这些值 3 如果 Apple 不允许开发人员收集 IMEI 号码 他们是否会为设备提供任何其他唯一号码 Apple 不再允许开发人员以编程方式获
  • 模板、内部结构、局部类型和纯虚函数,天哪

    考虑一个示例 其中方法是纯虚拟的 采用模板类型的参数 从外部类型注入 并且该模板类型是本地类型 在函数体中定义 这种情况会导致 g 下的编译时错误 诚然 这是一个相当极端的情况 但它确实源自真实的代码 这是一个可编译 可重现的示例 incl
  • 如何使用 Xcode 和 Git 处理不同的库搜索路径?

    我正在和朋友一起开发一个应用程序 我们使用 git 私有 作为版本控制系统 我们面临的问题是 Xcode 5 1 DP2 给出此警告 Apple Mach O 链接器警告 未找到目录 出现此警告的原因是 Xcode 在我的硬盘上找不到我朋友
  • 在原则 2 中指定表类型/存储引擎

    那么如何在原则 2 中指定用于给定实体的存储引擎呢 我正在创建一个需要全文索引的表 并且只有 MyISAM 存储引擎支持 MySQL 中的全文索引 另一方面 看起来 Doctrine 2 不支持开箱即用的全文索引 也没有全文搜索 那是对的吗
  • ColdFusion 变量竞争条件?

    我需要一些帮助来确定为什么这个特定的代码在极少数情况下会产生竞争条件 我找到了一个解决方案 我也会概述它 但我真的很想理解它 我们有一个基于 CMS 的系统 由许多松散地基于保险丝盒模型的模块组成 一切都通过单个index cfm 运行 在
  • 为 docker 容器提供可路由的 IP 地址

    我在 ubuntu 14 04 上运行它并且已经设置docker0到静态 IP 然后通过防火墙从公共 IP 路由到该静态 IP 我正在尝试将后端 API 设置为在 docker 容器中运行 但对一些事情感到困惑 1 我将如何映射docker
  • 了解 C++ 中随机数生成器的种子是什么

    我有一个非托管 C 控制台应用程序 其中使用 srand 和 rand 我不需要这个来解决特定问题 但很好奇 传递给 srand 的原始种子是否存储在我可以查询的内存中的某个位置 有什么办法可以知道种子是什么吗 不需要存储种子 只需要存储最
  • Vuetify 中的颜色主题更改不起作用

    我将 vuejs 与 vuetify 一起使用 我放置了基本 vuetify 模板并尝试更改颜色主题 但颜色不会切换 我的控制台中没有出现任何错误 并且我的缓存也被清除 main js代码 import Vue from vue impor
  • VM 快照期间 BizTalk 与 SQL 的连接问题

    我们有一个用于 BizTalk 的虚拟机和一个用于 SQL 后端的单独虚拟机 我们使用 Veeam 进行备份 这基本上会启动虚拟机的快照 当此快照在 SQL VM 上最终确定时 应用程序服务器上的 BizTalk 服务将失败 通常它们会自动
  • 检测是否开启“蓝牙扫描”定位

    从 Android M 开始 如果您启用了蓝牙设备 即使全球定位已关闭 也可以在后台扫描蓝牙设备 蓝牙扫描位置设置中的选项 参见屏幕截图 为了扫描 BLE 设备 必须满足以下条件 COARSE LOCATION or FINE LOCATI
  • 使用 C# 将布尔数组转换为字符串

    我有一个看起来像这样的数组 status 0 true status 1 true status 2 false status 3 true 实际上它更大 但仍然小于 20 我需要将其转换为 ABD 其中每个 true 代表字母表中的一个有
  • 使用芯片输入选择不显示所选值

    我有一个选择 输入采用芯片格式 我尝试了所选值的控制台日志 效果很好 但由于某种原因 它没有显示在选择框中 我在这里做错了什么 handleChange event gt this setState badge event target v
  • Spring Boot中事务同步与Database+kafka示例

    我想使用 Spring boot 编写一个新应用程序 使用 MySQL Mango 数据库和 Spring Kafka 进行消息传递 我尝试使用 Many POC 来同步 Kafka 和 DB 之间的事务 但在某些情况下失败了 并且我还搜索