使用 Kafka Streams 在输出中设置时间戳无法进行转换

2024-04-25

假设我们有一个变压器(用 Scala 编写)

new Transformer[String, V, (String, V)]() {
  var context: ProcessorContext = _

  override def init(context: ProcessorContext): Unit = {
    this.context = context
  }

  override def transform(key: String, value: V): (String, V) = {
    val timestamp = toTimestamp(value)
    context.forward(key, value, To.all().withTimestamp(timestamp))
    key -> value
  }

  override def close(): Unit = ()
}

where toTimestamp只是一个返回从记录值获取的时间戳的函数。一旦执行,就会出现一个NPE:

Exception in thread "...-6f3693b9-4e8d-4e65-9af6-928884320351-StreamThread-5" java.lang.NullPointerException
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:110)
    at CustomTransformer.transform()
    at CustomTransformer.transform()
    at org.apache.kafka.streams.scala.kstream.KStream$$anon$1$$anon$2.transform(KStream.scala:302)
    at org.apache.kafka.streams.scala.kstream.KStream$$anon$1$$anon$2.transform(KStream.scala:300)
    at 

本质上发生的是ProcessorContextImpl失败于:

public <K, V> void forward(final K key, final V value, final To to) {
    toInternal.update(to);
    if (toInternal.hasTimestamp()) {
        recordContext.setTimestamp(toInternal.timestamp());
    }
    final ProcessorNode previousNode = currentNode();

因为recordContext未初始化(只能由 KafkaStreams 在内部完成)。

这是一个后续问题使用 Kafka Streams 1 设置输出中的时间戳 https://stackoverflow.com/questions/52438161/set-timestamp-in-output-with-kafka-streams


如果您与transformer,你需要确保一个新的Transformer对象创建时TransformerSupplier#get()叫做。 (参见https://docs.confluence.io/current/streams/faq.html#why-do-i-get-an-illegalstateexception-when-accessing-record-metadata https://docs.confluent.io/current/streams/faq.html#why-do-i-get-an-illegalstateexception-when-accessing-record-metadata)

在最初的问题中,我认为这是关于你的context导致 NPE 的变量,但现在我意识到它与 Kafka Streams 内部结构有关。

Scala API 在 2.0.0 中存在一个错误,可能会导致相同的情况Transformer实例被重用(https://issues.apache.org/jira/browse/KAFKA-7250 https://issues.apache.org/jira/browse/KAFKA-7250). I think你遇到了这个错误。稍微重写一下代码应该可以解决问题。请注意,Kafka 2.0.1 和 Kafka 2.1.0 包含修复程序。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 Kafka Streams 在输出中设置时间戳无法进行转换 的相关文章

  • Spark:有没有办法打印出spark-shell和spark的类路径?

    我可以在 Spark shell 中成功运行 Spark 作业 但是当它打包并通过 Spark submit 运行时 我收到 NoSuchMethodError 这向我表明类路径存在某种不匹配 有没有办法可以比较两个类路径 某种日志记录语句
  • JVM 是否会内联对象的实例变量和方法?

    假设我有一个非常紧密的内部循环 每次迭代都会访问和改变一个簿记对象 该对象存储有关算法的一些简单数据 并具有用于操作它的简单逻辑 簿记对象是私有的和最终的 并且它的所有方法都是私有的 最终的和 inline 下面是一个示例 Scala 语法
  • 将案例类传递给函数参数

    抱歉问了一个简单的问题 我想将案例类传递给函数参数 并且想在函数内部进一步使用它 到目前为止我已经尝试过这个TypeTag and ClassTag但由于某种原因 我无法正确使用它 或者可能是我没有看到正确的位置 用例与此类似 case c
  • 使用 vs code,如何让 scala 格式工作并格式化我的代码?

    我的多项目 sbt 存储库中有 scala 格式插件 addSbtPlugin org scalameta sbt scalafmt 2 3 2 所以在 sbt 控制台中如果我运行 scalafmt 它工作正常 我的 build sbt 有
  • 如何在 Spark 中创建空数据帧

    我有一组基于 Avro 的配置单元表 我需要从中读取数据 由于Spark SQL使用hive serdes从HDFS读取数据 因此比直接读取HDFS慢很多 因此 我使用数据块 Spark Avro jar 从底层 HDFS 目录读取 Avr
  • Scala 中的多个类型下限

    我注意到tuple productIterator总是返回一个Iterator Any 想知道是否无法设置多个下限 因此它可能是最低公共超类型的迭代器 我尝试并搜索了一下 但只发现this https stackoverflow com q
  • 计算行的排名

    我想根据一个字段对用户 ID 进行排名 对于相同的字段值 排名应该相同 该数据位于 Hive 表中 e g user value a 5 b 10 c 5 d 6 Rank a 1 c 1 d 3 b 4 我怎样才能做到这一点 可以使用ra
  • 对于值类型,asInstanceOf[X] 和 toX 之间有什么区别吗?

    我使用 IntelliJ 将 Java 代码转换为 Scala 代码的功能 通常效果很好 看来 IntelliJ 用调用替换了所有强制转换asInstanceOf 是否有任何有效的用法asInstanceOf Int asInstanceO
  • 实施策略模式的函数式方法

    我正在尝试解决一个处理从一种温度单位到另一种温度单位 摄氏度 开尔文 华氏度 转换的问题 在Java中 我需要创建一个接口并提供多个实现来封装输入类型并将结果作为输出类型的单元返回 例如开尔文到摄氏度或摄氏度到华氏度等 我已经在 scala
  • Kafka 适合运行公共 API 吗?

    我有一个想要发布的事件流 它被划分为主题 不断更新 需要水平扩展 并且没有 SPOF 很好 并且可能需要在某些情况下重播旧事件 所有的功能似乎都与 Kafka 的功能相匹配 我想通过任何人都可以连接并获取事件的公共 API 将其发布到全世界
  • Scala 对大数的阶乘有时会崩溃,有时不会

    以下程序经过编译和测试 有时返回结果 有时充满屏幕 java lang StackOverflowError at scala BigInt apply BigInt scala 47 at scala BigInt equals BigI
  • 压缩 HList 的函数的推断类型

    谢谢https github com milessabin shapeless wiki Feature overview shapeless 2 0 0 https github com milessabin shapeless wiki
  • 使用 slick 3.0.0-RC1 无法在 TableQuery 上找到方法结果

    我正在尝试 Slick3 0 0 RC1我遇到了一个奇怪的问题 这是我的代码 import slick driver SQLiteDriver api import scala concurrent ExecutionContext Imp
  • 如何使用 Scala 在 Spark 中漂亮地打印 JSON 数据帧?

    我有一个数据帧 我想将其作为有效的 json 写入 json 文件 我当前的代码如下所示 val df DataFrame myFun df toJSON saveAsTextFile myFile json 输出的格式为 如何将文件内容组
  • 用于真实 Web 项目的 Scala-JS [已关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 有人用过吗Scala JS在真实的网络项目中 但不仅仅适用于普通的JavaScript在隔离环境中替换 我想尽可能多地使用 Scala 我希望可
  • Spark Scala:按小时或分钟计算两列的 DateDiff

    我在数据框中有两个时间戳列 我想获取它们的分钟差异 或者小时差异 目前我可以通过四舍五入获得日差 val df2 df1 withColumn time datediff df1 ts1 df1 ts2 但是 当我查看文档页面时https
  • .java 和 .scala 类之间是否可能存在循环依赖?

    假设我在 java 文件中定义了类 A 在 scala 文件中定义了类 B A 类使用 B 类 B 类使用 A 类 如果我使用 java 编译器 则会出现编译错误 因为 B 类尚未编译 如果我使用scala编译器A类将找不到 有没有可以同时
  • Spark:替换嵌套列中的空值

    我想更换所有n a以下数据框中的值unknown 它可以是scalar or complex nested column 如果它是一个StructField column我可以循环遍历列并替换n a using WithColumn 但我希
  • Scala repl 抛出错误

    当我打字时scala在终端上启动 repl 它会抛出此错误 scala gt init error error while loading AnnotatedElement class file usr lib jvm java 8 ora
  • 什么样的函数被认为是“可组合的”?

    维基百科文章函数组合 计算机科学 https en wikipedia org wiki Function composition computer science says 就像数学中通常的函数组合一样 每个函数的结果作为下一个函数的参数

随机推荐

  • 检查lua中是否存在目录?

    如何检查 lua 中是否存在目录 如果可能的话最好不使用 LuaFileSystem 模块 尝试做类似以下 python 行的事情 os path isdir path 这是一种在 Unix 和 Windows 上都适用的方式 无需任何外部
  • 从字符串中删除转义序列 '\' 以将其转换为 XmlDocument

    我有一个返回 struct 对象的 Web 服务 因此我得到以下 XML 字符串形式的响应 现在我需要将其加载到 XmlDocument 对象中 但如何删除字符串中的转义序列 每个 的 都会导致错误
  • AVPlayer - UILabel 在视频上不可见

    NSString urlPath NSURL videoUrl urlPath NSBundle mainBundle pathForResource fogLoop ofType mp4 videoUrl NSURL fileURLWit
  • Firebase 聚合属性值,无需获取所有相关文档

    我有以下 firebase 结构 company1 name Company One invoices invoice1 amount 300 currency EUR timestamp 1572608088 invoice2 amoun
  • 子类化 UICollectionViewCell 导致永远不会被选择

    我尝试对 UICollectionViewCell 进行子类化并从 nib 文件加载 id initWithFrame CGRect frame self super initWithFrame frame if self NSArray
  • 将 WebGL 应用程序部署为本机 iOS 或 Android 应用程序?

    有谁知道如何将 WebGL 应用程序部署为本机 iOS 或 Android 应用程序 商业中间件是可以接受的 但开放项目会更好 谢谢 作为 Joris 答案的延伸 这似乎是基于内森 德弗里斯的作品 http atnan com blog 2
  • 使用破折号显示 URL slug 时出现问题

    我为我的故事 URL 创建了一个带有破折号的 slug 例如 使用 slug 而不是 ID 来获取记录 https stackoverflow com questions 482636 fetching records with slug
  • 视频文件中的感兴趣区域

    这是我第一次在这里发帖 希望能得到积极的结果 因为我的研究已接近尾声 我想在我的代码中添加一个函数 该函数将仅处理视频文件的定义的感兴趣区域 我无法发布图片 因为我还没有声誉 但这里发布了同样的问题 gt http answers open
  • OpenCV的calcOpticalFlowPyrLK抛出异常

    一段时间以来 我一直在尝试使用 OpenCV 构建一个小型光流示例 除了函数调用 calcOpticalFlowPyrLK 之外 一切正常 该函数在控制台窗口中打印以下失败的断言 OpenCV错误 断言失败 mytype typ0 CV M
  • 将文本单词换行

    我使用下面的代码来包装长文本 由用户在文本区域中输入以进行评论 function addNewlines comments var result while trim comments length gt 0 result comments
  • 拖动并 connectToSortable 到 iframe 内的可排序 DIV 中

    我试图将一个元素从主页拖动到其中的 iframe 并在框架内使用可排序的 div 我能够使 div 可排序 并将可拖动的内容连接到可排序的内容 但元素放置的位置计算错误并在错误的位置排序 可能是因为iframe内的鼠标坐标与主页的位置不同
  • 数据列表验证中 Obj.length>0 错误

    我有一个脚本 我通过 tempmankey 在网站上运行它 该脚本基本上是在输入字段中插入数据列表 并根据数据列表选择的值更改其他输入值 code function bdi contains Truck No closest div nex
  • 如何检测Vista UAC是否启用?

    我需要我的应用程序根据是否启用 Vista UAC 来表现不同 我的应用程序如何检测用户计算机上的 UAC 状态 该注册表项应该告诉您 HKLM SOFTWARE Microsoft Windows CurrentVersion Polic
  • 定制 iPhone 键盘

    我需要 即客户要求 提供自定义键盘 供用户在文本字段和区域中输入文本 我已经有一些可以执行键盘操作并将测试附加到文本字段的东西 但是我想让它更通用并让它像标准的 iphone 键盘一样工作 即当用户选择可编辑文本控件时出现 目前我的控制器知
  • 使用 BrokeredMessage 从 Azure 服务总线队列 (v1) 反序列化强类型对象

    无论出于何种原因 我似乎无法弄清楚如何将我的对象从队列中取出并将其反序列化回它放入其中的内容 An账户事件DTO Azure函数成功将对象放入队列 FunctionName AccountCreatedHook public static
  • 从groupby中的列获取模式[重复]

    这个问题在这里已经有答案了 我试图获取 groupby 对象中列的模式 但出现此错误 incompatible index of inserted column with frame index 这是我遇到的问题 我不知道如何解决它 任何帮
  • 哪个类应该存储查找表? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 世界上有不同地点的代理 但任何地点都只有一个代理 每个特工都知道他在哪里 但我还需要快速检查给定位置是否有特工 因此 我还维护了一张从位置到代理
  • 对角度数据表中括号内的数字进行排序

    我在用角度数据表 http l lin github io angular datatables 根据网络服务响应填充我的表 我的网络服务返回一个如下所示的 json id 1 name abc count 20 id 2 name abc
  • 计算和合并行

    我有一个表 通过将国家 地区代码添加到每行的 国家 地区代码 列来记录 Web 应用程序中的用户位置 每行代表对特定区域的访问 所以我有一些数据 比如 COL1 COL2 COL3 countrycode asd asd asd NZ as
  • 使用 Kafka Streams 在输出中设置时间戳无法进行转换

    假设我们有一个变压器 用 Scala 编写 new Transformer String V String V var context ProcessorContext override def init context Processor