无法刷新状态存储

2024-02-22

我正在尝试在 Kafka Streams 中创建一个 leftJoin,它对于大约 10 条记录工作正常,然后由于以下原因导致异常崩溃:NullPointerException用这样的代码:

private static KafkaStreams getKafkaStreams() {
    StreamsConfig streamsConfig = new StreamsConfig(getProperties());
    KStreamBuilder builder = new KStreamBuilder();

    KTable<String, Verkaeufer> umsatzTable = builder.table(Serdes.String(), EventstreamSerde.Verkaeufer(), CommonUtilsConstants.TOPIC_VERKAEUFER_STAMMDATEN);
    KStream<String, String> verkaeuferStream = builder.stream(CommonUtilsConstants.TOPIC_ANZAHL_UMSATZ_PER_VERKAEUFER);

    KStream<String, String> tuttiStream = verkaeuferStream.leftJoin(umsatzTable,
            (tutti, verkaeufer) -> ("Vorname=" + verkaeufer.getVorname().toString() +",Nachname=" +verkaeufer.getNachname().toString() +"," +tutti.toString()), Serdes.String(), Serdes.String());

    tuttiStream.to(Serdes.String(), Serdes.String(), CommonUtilsConstants.TOPIC_TUTTI);

    return new KafkaStreams(builder, streamsConfig);
}

StreamsConfig看起来像这样:

private static Properties getProperties() {
    Properties props = new Properties();
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CommonUtilsConstants.BOOTSTRAP_SERVER_CONFIGURATION);
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, CommonUtilsConstants.GID_TUTTI);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "1000");

    return props;
}

完整的堆栈跟踪:

22:19:36.550 [gid-tutti-8fe6be58-d5c5-41ce-982d-88081b98004e-StreamThread-1] ERROR o.a.k.s.p.internals.StreamThread - stream-thread [gid-tutti-8fe6be58-d5c5-41ce-982d-88081b98004e-StreamThread-1] Failed to commit StreamTask 0_0 state: org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed to flush state store KTABLE-SOURCE-STATE-STORE-0000000000
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:262)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:190)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:282)
at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:264)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
at org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:253)
at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:815)
at org.apache.kafka.streams.processor.internals.StreamThread.access$2800(StreamThread.java:73)
at org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:797)
at org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448)
at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:789)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:778)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:567)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) Caused by: java.lang.NullPointerException: null
at java.lang.String.<init>(String.java:143)
at ch.wesr.eventstream.commonutils.serde.GsonDeserializer.deserialize(GsonDeserializer.java:38)
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:163)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:90)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:78)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:145)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:103)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:97)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:107)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:260)
... 14 common frames omitted

Update:

这是什么GsonDeserialize好像

public class GsonDeserializer<T> implements Deserializer<T>{

    public static final String CONFIG_VALUE_CLASS = "default.value.deserializer.class";
    public static final String CONFIG_KEY_CLASS = "default.key.deserializer.class";
    private Class<T> deserializedClass;
    private Gson gson = new GsonBuilder().create();

    public GsonDeserializer() {}

    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        String configKey = isKey ? CONFIG_KEY_CLASS : CONFIG_VALUE_CLASS;
        String clsName = String.valueOf(config.get(configKey));
        try {
            if (deserializedClass == null) {
                deserializedClass = (Class<T>) Class.forName(clsName);
            }
        } catch (ClassNotFoundException e) {
            System.err.printf("Failed to configure GsonDeserializer. " +
                            "Did you forget to specify the '%s' property ?%n",
                    configKey);
            System.out.println(e.getMessage());
        }
    }

    @Override
    public T deserialize(String s, byte[] bytes) {
        return gson.fromJson(new String(bytes), deserializedClass);
    }

    @Override
    public void close() {}
}

只要缓存没有被刷新,你的反序列化器就不会被调用。这就是为什么它一开始就不会失败,您可以通过缓存大小参数和提交间隔(我们在提交时刷新)来增加失败之前的时间。

查看您的代码GsonDeserializer, 看起来new String(bytes)因 NPE 失败 --String构造函数不能接受null作为参数——您的反序列化器代码必须防范bytes==null并且应该返回null直接针对这种情况。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

无法刷新状态存储 的相关文章

随机推荐

  • 从 Chrome 扩展获取唯一的 ClientID?

    我正在开发 chrome 扩展 我需要能够将每个客户识别为独特的客户 我无法将 guid 存储在 cookie 中 因为 cookie 可以被删除 我需要从系统本身读取一些独特的东西 现在 我知道 JS 无法访问客户端资源 本地资源 但是
  • NextJS - ReactDOMServer 尚不支持 Suspense

    我目前正在尝试将加载器组件合并到使用 NextJS 构建的网站中 我想使用 Suspense 显示加载屏幕 可能是在刷新页面或更改路线后 我的代码是这样的 import Head from next head import Loader f
  • 为什么具有终结器的对象即使没有根也不会被收集?

    我遇到了可终结对象的问题 该对象未被收集GC if Dispose 没有被明确调用 我知道我应该打电话Dispose 显式地如果一个对象实现IDisposable 但我一直认为依赖框架是安全的 当一个对象变得未被引用时 它可以被收集 但经过
  • Android OpenGL ES 2:如何在主活动中使用 OpenGL 活动作为片段

    我对 Android 和 OpenGL ES 还很陌生 我必须在 OpenGL 中创建一个 GUI 并且我想将其用作Fragment在主要活动中 为了学习如何做到这一点 我尝试了 2 个教程 这个片段教程 http www techotop
  • 按值对 HashMap 进行排序[重复]

    这个问题在这里已经有答案了 我需要整理我的HashMap根据其中存储的值 这HashMap包含手机中存储的联系人姓名 另外 我需要在对值进行排序后立即对键进行自动排序 或者您可以说键和值绑定在一起 因此值的任何更改都应该反映在键中 Hash
  • 在 python 多处理工作池中使用初始化

    我正在研究工作人员的 multiprocessing Pool 试图用某种状态初始化工作人员 该池可以接受可调用的初始化 但不会传递对已初始化工作线程的引用 我见过的几个例子利用它调用全局变量 这看起来真的很讨厌 有没有什么好方法使用 mu
  • elisp 中的复数/虚数?

    elisp 支持虚数吗 我正在尝试通过运行 lisp 交互模式缓冲区来学习在线数学课程 Emacs elisp 有 高等数学 模块 库吗 Emacs 包括calc 一个支持复数的综合计算器 The manual is here C hig
  • C 或 C++ 中的日历日期算术(给给定日期添加 N 天)

    我已经得到了一个日期 我将其作为输入 例如 日 月 年 12 03 87 现在我需要找出之后的日期n days 我已经为此编写了代码 但效率不高 您能告诉我任何运行速度更快且复杂性更小的好的逻辑吗 include
  • Typescript:为什么 Visual Studio Code 不报告与命令行 tsc 相同的错误?

    如果我故意在代码中输入错误 我会收到错误 这是正确的代码 declare const State TwineState 如果我删除最后一个字符 然后在命令行中键入 tsc 则会出现以下错误 tsc prod spec ts 7 22 err
  • 如何处理 React Native 应用程序在 tvOS 和 Android TV 之间的扩展问题?

    Apple TV 的原生分辨率似乎为 1920x1080 如预期 但 Android TV Fire TV 的原生分辨率似乎为 961 5022957581195x540 8450413639423 根据Dimensions get win
  • 如何在不修改java.security文件的情况下在Java 8中启用SSLv3?

    在 JDK 8 中 默认情况下禁用 SSLv3 并启用 TLSv1 2 当我谷歌时 我发现很多帖子都通过注释掉以下行来启用 SSLv3java securitylib 文件夹中的文件 我想通过设置系统属性来启用 SSLv3 而不需要修改 j
  • 如何获取枚举条目的名称?

    我想迭代 TypeScript 枚举对象并获取每个枚举符号名称 例如 枚举 myEnum 条目 1 条目 2 for var entry in myEnum use entry s name here e g entry1 尽管已经给出了答
  • 类型错误:无法读取未定义的属性“prepareStyles”

    My Component好像 import React PropTypes from react import TransactionListRow from TransactionListRow import Table TableBod
  • Laravel 用户有权访问某些页面吗?

    我创建了一个 slug 页面 如下所示 Create pages table for dynamic pages id slug title page template 0 about about us about blade 1 cont
  • 使用 jQuery 调整父元素的高度以匹配其可见子元素的高度

    我有一个在容器中运行的幻灯片 需要容器的高度与可见幻灯片的高度相匹配 不幸的是 这些图像是绝对定位的 我对此无能为力 为了解决这个问题 我使用了一些 jQuery 魔法来处理相同的功能 由于某种原因 我的代码无法正常工作 每当 contai
  • 有没有JavaScript静态分析工具? [关闭]

    Closed 此问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 当我做了一些愚蠢的事情 例如变量名拼写错误 时 我习惯于让编译器抱怨 但 JavaScript 习惯于让
  • 为什么我不能直接将 document.getElementById 分配给不同的函数?

    所以我试图定义一个类似于 document getElementById 的函数 g 以下工作正常 var g function id return document getElementById id 但为什么这个更直接的代码不起作用呢
  • dplyr:mutate 内的整数采样

    我正在尝试在中生成一列tbl df这是一个 0 或 1 的随机整数 这是我正在使用的代码 library dplyr set seed 0 Dummy data frame to test df lt tbl df data frame x
  • 设置 Yii2 预览

    Yii2 预览版最近发布 可在github https github com yiisoft yii2 我想对其进行试驾 但到目前为止 文档 几乎立即就过时了 因为它仍在大量开发中 我曾尝试遵循本指南 http www yiiframewo
  • 无法刷新状态存储

    我正在尝试在 Kafka Streams 中创建一个 leftJoin 它对于大约 10 条记录工作正常 然后由于以下原因导致异常崩溃 NullPointerException用这样的代码 private static KafkaStrea