编辑 Kafka Listener Spring 应用程序以更改阶段/目标

2024-05-14

我可以利用另一个运行 Kafka 应用程序/代码库的团队来使用相同的数据/将其加载到我们的新暂存表中,而不是他们的。他们在“Messages”文件夹中有许多不同的 kafka 侦听器适配器 .java 文件,每个文件消耗不同类型的数据。

每个 kafka 侦听器都有许多导入、一个公共类、实际插入数据的 try 语句等。我需要为每个 kafka 侦听器 adapter.java 文件编辑什么,才能将阶段/目标表从其他团队更改为我的?我发现我只需要更改“groupid”和“topic”变量+(查找具有此格式的文件来配置/添加新的groupid/id: spring.kafka.consumer.group-id=new-groupid spring.kafka.listener .id=新id)。这是两个变量在 .java 文件中的使用方式:

   @KafkaListener(topics = "#{new java.util.HashMap(${dataflow.consumer.props.assets.crypto}).keySet()}", groupId = "${dataflow.consumer.assets.crypto.group}",containerFactory = "cryptoListenerContainerFactory", id = "${dataflow.consumer.assets.crypto.group}")

每个 kafka 侦听器适配器都有一个存储库变量,所以我不确定。看起来数据实际上被插入到引用“Repository”的代码块中:

   try {
        List<FundEntity> cryptoFilteredList = cryptoConsumerRecordList.stream()
                .filter(cryptoConsumerRecord -> recordIsNotFilterable(cryptoConsumerRecord, FundEntity.class))
                .map(ConsumerRecord::value)
                .collect(Collectors.toList());

        if (!cryptoFilteredList.isEmpty()) {

            this.Repository.storecryptoCollection(cryptoFilteredList, topic, firstOffset, partition, cryptoGroupId);

            kafkaDetailsSummary.setStatus(KafkaMessageSummary.KafkaStatus.PROCESSED);
            log.info(LogHelper.generateCryptoReconMessage(cryptoFilteredList), keyValue(KAFKA_DETAILS, kafkaDetailsSummary));
        }

spring.kafka.listener.id: 没有这样的财产,并且spring.kafka.consumer.group-id仅当没有时才使用groupId注释上的属性。

由于它们在注释中使用属性占位符,因此您只需设置dataflow.consumer.assets.crypto.group属性到您想要的值。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

编辑 Kafka Listener Spring 应用程序以更改阶段/目标 的相关文章

  • 通过 CMD 获取启用 SSL 的 Kafka 中的最新偏移量

    我一直在使用下面的 CMD 从打开纯文本端口的 Kafka 队列中获取最新的偏移量 kafka run class sh kafka tools GetOffsetShell broker list server 9092 topic sa
  • Apache Kafka 消费者组的偏移量如何过期?

    当我注意到一些奇怪的行为时 我正在对一个旧主题进行一些测试 阅读 Kafka 的日志时 我注意到这条 删除了 8 个过期的偏移量 消息 GroupCoordinator 1001 Stabilized group GROUP NAME ge
  • 事务性 Kafka 生产者

    我正在尝试让我的卡夫卡生产者具有事务性 我正在发送 10 条消息 如果发生任何错误 则不应向 kafka 发送任何消息 即不发送或全部消息 我正在使用 Spring Boot KafkaTemplate Configuration Enab
  • Kafka REST 代理 API 有哪些好处?

    我不知道Kafka REST Proxy API的优点 它是一个 REST API 所以我知道它对于管理来说很方便 人们为什么使用 Kafka REST 代理 API 添加对生产者或消费者的 Maven 依赖是否很麻烦 另外 我知道kafk
  • 是否可以使用Kafka传输文件?

    我每天都会生成数千个文件 我想使用 Kafka 进行流式传输 当我尝试读取该文件时 每一行都被视为一条单独的消息 我想知道如何将每个文件的内容作为 Kafka 主题中的单个消息 以及消费者如何将 Kafka 主题中的每条消息写入单独的文件中
  • 如何更改主题的起始偏移量?

    是否可以更改新主题的起始偏移量 我想创建一个新主题并从偏移量开始阅读10000 How 自从卡夫卡0 11 0 0 https issues apache org jira browse KAFKA 4743你可以使用脚本kafka con
  • Kafka Streams 内部数据管理

    在我的公司 我们广泛使用 Kafka 但出于容错的原因 我们一直使用关系数据库来存储多个中间转换和聚合的结果 现在我们正在探索 Kafka Streams 作为一种更自然的方式来做到这一点 通常 我们的需求非常简单 其中一个例子是 监听输入
  • 从副本消费

    Kafka 将主题的每个分区复制到指定的复制因子 据我所知 所有写入和读取请求都会路由到分区的领导者 有没有办法从追随者那里消费而不是从领导者那里消费 Kafka中的复制只是为了故障转移吗 在 Kafka 2 3 及更早版本中 您只能从领导
  • Kafka JDBC Sink Connector 对于具有可选字段的模式的消息给出空指针异常

    Kafka JDBC Sink Connector 对于具有可选字段 parentId 的模式的消息给出空指针异常 我错过了什么吗 我正在使用开箱即用的 JSONConverter 和 JDBC Sink Connector 关于 Kafk
  • Kafka Streams - 跳跃窗口 - 去重键

    我正在 4 小时窗口上进行跳跃窗口聚合 每 5 分钟前进一次 由于跳跃窗口重叠 我得到了具有不同聚合值的重复键 TimeWindows of 240 60 1000L advanceBy 5 60 1000L 如何消除具有重复数据的重复键或
  • 调试自定义 Kafka 连接器的简单有效的方法是什么?

    我正在使用几个 Kafka 连接器 在控制台输出中没有看到它们的创建 部署有任何错误 但是我没有得到我正在寻找的结果 没有任何结果 无论是期望的还是否则 我基于 Kafka 的示例 FileStream 连接器制作了这些连接器 因此我的调试
  • Kafka Consumer 无法加载任何密钥库类型和路径的 SSL 密钥库(Logstash ArcSight 模块)

    我需要为 Kafka Consumer 提供客户端身份验证证书 但是 它总是失败并出现以下异常 无法加载 SSL 密钥库 ssl cipher suites null ssl enabled protocols TLSv1 2 TLSv1
  • 连接到 Apache Kafka 多节点集群中的 Zookeeper

    我按照以下说明设置了多节点 kafka 集群 现在 如何连接到zookeeper 是否可以从 JAVA 中的生产者 消费者端仅连接到一个 ZooKeeper 或者是否有一种方法可以连接所有 ZooKeeper 节点 设置多节点 Apache
  • Apache Kafka 是否提供异步订阅回调 API?

    我的项目正在将 Apache Kafka 视为老化的基于 JMS 的消息传递方法的潜在替代品 为了让这个过渡尽可能的顺利 如果替代的排队系统 Kafka 有一个异步订阅机制那就更理想了 类似于我们当前项目使用的JMS机制MessageLis
  • 将数据从 Kafka 存储传输到 Kafka 主题

    我想在卡夫卡做这样的事情 继续将数据存储在 KStream Ktable Kafka store 中 当我的应用程序收到特定事件 数据时 仅将上述存储中的特定数据集发送到主题 我们可以在卡夫卡中做到这一点吗 我认为单独使用 Kafka 消费
  • 卡夫卡主题查看器? [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我想调试一些 Kafka 主题 这样我就知道消费者或生产者是否有问题 Kafka 是否有一个 UI 我
  • 为每个键使用主题中的最新值

    我有一个 Kafka 生产者 它正在以高速率生成消息 消息键是用户名 值是他在游戏中的当前分数 Kafka消费者处理消费消息的速度相对较慢 在这里 我的要求是显示最新的分数并避免显示陈旧的数据 但代价是某些分数可能永远不会显示 本质上 对于
  • 使用 offsets_for_times 从时间戳消费

    尝试使用 confluence kafka AvroConsumer 来消费给定时间戳的消息 if flag creating a list topic partitons to search list map lambda p Topic
  • 使用 Spring Boot 进行 Kafka 流

    我想在我的 Spring Boot 项目中使用 Kafka Streams 实时处理 所以我需要 Kafka Streams 配置或者我想使用 KStreams 或 KTable 但我在互联网上找不到示例 我做了生产者和消费者 现在我想实时
  • 如何使用 Kafka 发送大消息(超过 15MB)?

    我发送字符串消息到Kafka V 0 8使用 Java Producer API 如果消息大小约为 15 MB 我会得到MessageSizeTooLargeException 我尝试过设置message max bytes到 40 MB

随机推荐

  • 使用 AutoMapper 展开 DTO

    我一直在尝试使用 AutoMapper 来节省从 DTO 到域对象的时间 但是我在配置地图以使其正常工作时遇到了麻烦 并且我开始怀疑 AutoMapper 是否可能是错误的工具工作 考虑这个域对象的示例 一个实体和一个值 public cl
  • 即使 Excel 中存在多条记录,CopyFromRecordset 也仅复制并粘贴第一行

    我有一个包含表格数据的 Excel 工作表 strSQL SELECT S FIELD NAME1 S FIELD NAME2 S FIELD NAME3 from SourceData A1 IV6 S Dim cn as ADODB C
  • 基于 JavaScript 的 iPhone UI 框架

    我们有一个基于推送的网络应用程序 最近 我们计划为其制作一个 iPhone 应用程序 就像 Facebook 拥有 iPhone 应用程序和网站一样 我们正在寻找一个可以让我们快速前进的 UI 框架 我翻阅过PhoneGap http ww
  • 数学 - 映射数字

    如何将 a 和 b 之间的数字线性映射到 c 和 d 之间 也就是说 我希望 2 到 6 之间的数字映射到 10 到 20 之间的数字 但我需要广义的情况 我的脑子炸了 如果您的数字 X 位于 A 和 B 之间 并且您希望 Y 位于 C 和
  • PHPMailer:如何将 Content-Type 设置为 multipart/alternative

    我正在使用 phpmailer 发送电子邮件 但消息的标题中带有 Content Type text html 我怎样才能将其更改为多部分 替代 它应该类似于 mail gt 我的配置是 mail new PHPMailer mail gt
  • Hashicorp Vault 中的 SSL 证书配置

    我最近开始使用 Vault 来存储我的 api 密钥和机密 我正在尝试将其配置为使用 ssl 证书使用 HTTPS 并且我相信我已经完成了所有步骤 但是 当我尝试从浏览器启动该网址时 我会收到一个弹出窗口 要求选择证书 附图片 我不知道这里
  • If 语句中 Bool 计算错误

    只是为了好奇 我的代码有这个问题 e被评估为false 我知道通过查看数据库中的数据会得到错误 但 if 语句并不关心这一点 并假设这是真的 并试图抛出异常 有什么想法吗 edit 没有 在第 16 行末尾 价值false是正确的 我已经检
  • 使网格项跨越到隐式网格中的最后一行/列

    当我不知道行数时 是否可以使网格项跨度从第一行到最后一行 假设我有以下 HTML 内容 其中包含未知数量的框 我怎样才能做到第三个 box从第一条网格线到最后一条网格线 container display grid grid templat
  • 如何在React Material UI简单输入中启用文件上传?

    我正在创建一个简单的表单来使用带有 redux 表单和材料 ui 的 electro react boilerplate 来上传文件 问题是我不知道如何创建输入文件字段 因为材料用户界面不支持上传文件输入 关于如何实现这一目标有什么想法吗
  • 什么是内部类的合成反向引用

    我正在寻找应用程序中的内存泄漏 我正在使用的探查器告诉我寻找这些类型的引用 但我不知道我在寻找什么 有人可以解释一下吗 Thanks Elliott 您可以对 OUTER 类进行合成反向引用 但不能对内部类实例进行合成 e g class
  • Swift 3 中数组的 indexOf(_:) 方法的替换

    在我的项目 用 Swift 3 编写 中 我想使用从数组中检索元素的索引indexOf 方法 存在于 Swift 2 2 中 但我找不到任何替代方法 Swift 3 中是否有任何好的替代方法或类似的方法 Update 我忘记提及我想在自定义
  • 时间复杂度和运行时间有什么区别?

    时间复杂度和运行时间有什么区别 它们是一样的吗 运行时间是指程序运行所需的时间 时间复杂度是对输入大小趋于无穷大时运行时间渐进行为的描述 您可以说运行时间 是 O n 2 或其他什么 因为这是描述复杂性类和大 O 表示法的惯用方式 事实上
  • 如何在特定文件夹中运行 shell 命令

    我可以用这个out err exec Command git log Output 获取将在与可执行位置相同的路径中运行的命令的输出 如何指定要在哪个文件夹中运行命令 exec Command https golang org pkg os
  • 实现快速 Javascript 搜索?

    基本上 我有一个带有文本框的页面和 ul 列在其下面 这 ul 由用户的朋友列表填充 用户开始在文本框中输入朋友的名字 例如按 r 我想立即更新 ul 每次按键仅显示名字以 R 开头的朋友 例如 Richard Redmond Raheem
  • Powershell 添加的字符串类型的 ParameterizedProperty Chars 属性是什么?

    请注意 C gt Get Member MemberType eq ParameterizedProperty TypeName System String Name MemberType Definition Chars Paramete
  • Azure VM 自定义脚本扩展 SAS 令牌支持

    我正在尝试使用 ARM 模板将自定义脚本扩展部署到 Azure VM 并且希望让它使用 SAS 令牌从存储帐户下载文件 这是模板 简化 name CustomScriptExtension type Microsoft Compute vi
  • relativelayout导致动画不起作用?

    我有一个活动 其布局仅包含一个 VideoView 这是 XML
  • AllowAnonymous 与 OverrideAuthorizeAttribute

    AllowAnonymous 和 OverrideAuthorizeAttribute 的使用有什么区别 是一样的吗 http www asp net web api overview security authentication and
  • 确定向量中是否存在元素的最有效方法

    我有几种算法取决于确定元素是否存在于向量中的效率 在我看来 这 in 这相当于is element 应该是最有效的 因为它只返回一个布尔值 在测试了几种方法之后 令我惊讶的是 这些方法是迄今为止效率最低的 以下是我的分析 随着向量大小的增加
  • 编辑 Kafka Listener Spring 应用程序以更改阶段/目标

    我可以利用另一个运行 Kafka 应用程序 代码库的团队来使用相同的数据 将其加载到我们的新暂存表中 而不是他们的 他们在 Messages 文件夹中有许多不同的 kafka 侦听器适配器 java 文件 每个文件消耗不同类型的数据 每个