KafkaStreams 不生成发送到目标主题的消息

2024-06-24

我一直在尝试使用 KafkaStreams 来计算传感器读取的温度的移动平均值。我有 Producer,它从 mqtt 代理获取消息并将它们推送到 kafka:

String topic = "TEMPERATURE";
Producer<String, Double> producer = new KafkaProducer<>(properties);

            mqttClient.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable throwable) {
                    System.out.println("Connection to broker lost!" + throwable.getMessage());

                }

                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                    try{
                        String time = new Timestamp(System.currentTimeMillis()).toString();
                        String content = new String(mqttMessage.getPayload());

                        System.out.println("\nReceived a Message!" +
                                "\n\tTime:    " + time +
                                "\n\tTopic:   " + topic +
                                "\n\tMessage: " + content +
                                "\n\tQoS:     " + mqttMessage.getQos() + "\n");
                        double temp = Double.valueOf(content.substring(2));


                        producer.send(new ProducerRecord<>(topic, time, temp), (recordMetadata, e) -> {
                            if (e != null) {
                                e.printStackTrace();
                            } else {
                                System.out.printf("Produced record to topic %s partition [%d] @ offset %d%n",
                                        recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
                            }
                        });
                        producer.flush();
                    } catch (Exception e){
                        System.err.println(e);
                    }

                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

                }
            });

            mqttClient.subscribe(topic, 0);

Kafka 接收这些消息,我可以使用 bash 检查这些消息。 接下来,我想要一个窗口化的 Kafka Stream 来计算移动平均值:

public class PersistenceService {


    public static void main(String[] args) {
        Logger logger = Logger.getLogger("PERSISTENCE SERVICE");
        String topic = "TEMPERATURE";
        String targetTopic = "MOVINGAVG";
        String kafkaURI = "localhost:9092";
        String clientID = "Persistence Service";
        Properties properties = new Properties();
        kafkaPropertiesSet(kafkaURI, properties, clientID);
        MongoClient mongoClient = new MongoClient("localhost", 27017);

        MongoDatabase database = mongoClient.getDatabase("Temperature");
        logger.info("Connected to the database");
        MongoCollection kafkaCollection = database.getCollection("TemperatureReadingsKafka");
        MongoCollection brokerCollection = database.getCollection("TemperatureReadingsMQTT");
        DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
        Document document = new Document();
        document.put(dateFormat.format(new Date()), 18.34);
        kafkaCollection.insertOne(document);



        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream<String, Double> kafkaStreams =  streamsBuilder.stream(topic, Consumed.with(Serdes.String(), Serdes.Double()));
        Duration timeDifference = Duration.ofSeconds(30);
        Topology topology = streamsBuilder.build();
        KTable table = kafkaStreams.groupByKey()
                .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(timeDifference))
                .aggregate(
                        () -> new Tuple2(0.0, 0.0, 0.0), // initializer
                        (key, value, aggregate) -> tempAggregator(key, String.valueOf(value), aggregate))
                .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
                .mapValues(new ValueMapper<Tuple2, Object>() {
                    @Override
                    public Double apply(Tuple2 tuple2) {
                        return (Double) tuple2.avg;
                    }
                });
        table.toStream().to(targetTopic);
        KafkaStreams streams = new KafkaStreams(topology, properties);
        streams.start();
    }



    private static void kafkaPropertiesSet(String kafkaURI, Properties properties, String clientID) {
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaURI);
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, clientID);
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    }

    public static Tuple2 tempAggregator(String key, String value, Tuple2<Double> aggregateTuple){
        aggregateTuple.count = aggregateTuple.count + 1;
        aggregateTuple.sum = aggregateTuple.sum + Double.valueOf(value);
        aggregateTuple.avg = aggregateTuple.sum/ aggregateTuple.count;
        return aggregateTuple;
    }
    static class Tuple2<Double> {
        public Double count;
        public Double sum;
        public Double avg;

        public Tuple2(Double count, Double sum, Double avg) {
            this.count = count;
            this.sum = sum;
            this.avg = avg;
        }
    }

但是,没有消息从流发送到 Kafka。 来自控制台的日志:

[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1-consumer, groupId=Persistence Service] Adding newly assigned partitions: TEMPERATURE-0
[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] State transition from STARTING to PARTITIONS_ASSIGNED
[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1-consumer, groupId=Persistence Service] Setting offset for partition TEMPERATURE-0 to the committed offset FetchPosition{offset=621, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1001 rack: null)], epoch=0}}
[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamTask - stream-thread [Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] task [0_0] Initialized
[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamTask - stream-thread [Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] task [0_0] Restored and ready to run
[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] Restoration took 80 ms for all tasks [0_0]
[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6] State transition from REBALANCING to RUNNING
[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1-consumer, groupId=Persistence Service] Requesting the log end offset for TEMPERATURE-0 in order to compute lag
[Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Persistence Service-cc71ac12-a534-4ffc-9c3e-dc5479c633d6-StreamThread-1] Processed 346 total records, ran 0 punctuators, and committed 2 total tasks since the last update

Checking on kafka bash gives me this: enter image description here

我究竟做错了什么?任何建议非常感谢

EDIT正如建议的@OneCricketeer https://stackoverflow.com/users/2308683/onecricketeer,删除拓扑并使用KafkaStreams streams = new KafkaStreams(streamBuilder.build(), properties);推动计划向前发展。我需要实现序列化和反序列化类,以及 Serdes。现在将 KafkaStreams 代码更改为如下所示:

 StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream<String, Double> kafkaStreams = streamsBuilder.stream(topic, Consumed.with(Serdes.String(), Serdes.Double()));
        Duration timeDifference = Duration.ofSeconds(5);

        KTable table = kafkaStreams.groupByKey(Grouped.with(Serdes.String(),Serdes.Double()))
                .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(timeDifference))
                .aggregate(
                        () -> generateTuple(logger), // initializer
                        (key, value, aggregate) -> tempAggregator(key, String.valueOf(value), aggregate, logger))
                .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
                .mapValues((ValueMapper<AggregationClass, Object>) tuple2 -> (Double) tuple2.getAverage());
        table.toStream().to(targetTopic);
        KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), properties);
        streams.cleanUp();
        streams.start();

    }

    private static AggregationClass generateTuple(Logger logger) {
        logger.info("Tuple init");
        return new AggregationClass(0.0, 0.0);
    }
private static void kafkaPropertiesSet(String kafkaURI, Properties properties, String clientID) {
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaURI);
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, clientID);
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, AggregationSerde.class.getName());
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    }

    public static AggregationClass tempAggregator(String key, String value, AggregationClass aggregateTuple, Logger logger) {
        aggregateTuple.count = aggregateTuple.count + 1;
        logger.warning("COUNT" + aggregateTuple.count);
        aggregateTuple.sum = aggregateTuple.sum + Double.valueOf(value);
        logger.warning("SUM: " + aggregateTuple.sum);
        return aggregateTuple;
    }

AggregationClass 的实现如下:

public class AggregationClass<Double> {
    public double count;
    public double sum;


    public AggregationClass(double count, double sum) {
        this.count = count;
        this.sum = sum;

    }

    public double getAverage() {
        return this.sum/this.count;
    }
}

现在发生的情况是从该主题读取消息,但最终我得到:

为主题 MOVINGAVG 生成数据时出现 ClassCastException。序列化程序(键:org.apache.kafka.streams.kstream.TimeWindowedSerializer /值:AggregateSerializer)与实际键或值类型(键类型:org.apache.kafka.streams.kstream.Windowed /值类型: java.lang.Double)。更改 StreamConfig 中的默认 Serdes 或通过方法参数提供正确的 Serdes(例如,如果使用 DSL,#to(String topic, Produced<K, V> produced) with Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))).


既然你有这个

properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, AggregationSerde.class.getName());

然后所有需要生成中间主题的数据,例如之后groupByKey, or to()默认情况下,需要是 AggregationClass 实例。

但之后mapValues,你有 Doubles,所以你需要为该类型定义一个新的序列化器

.to(targetTopic, Produced.valueSerde(Serdes.Double()));
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

KafkaStreams 不生成发送到目标主题的消息 的相关文章

随机推荐

  • 我如何列出我的要点?

    我可以获得我的要点清单吗 这样的列表将列出所有要点 而不仅仅是四个要点 并且在我单击要点之前不会显示要点的内容 有一个简单的方法 https gist github com anders https gist github com ande
  • Conda 环境名称在提示中显示整个目录

    当我跑步时 源激活 anaconda2 envs myEnv 它在我的提示符 在 iterm2 上 中显示了此 conda 环境的整个目录 如下所示 Users billy anaconda2 envs myEnv billy mbp pr
  • 错误:架构必须包含唯一命名的类型,但包含多个名为“DateTime”的类型

    我正在尝试生成 GraphQL 架构 我有以下解析器 其中3个位于不同的文件中 其中 2 个用于收集数据 而其中一个只是触发 API 让它们从数据源收集更多数据 Resolver export class RefreshDataSchema
  • 从 root Android 应用程序运行二进制文件

    我想运行一个二进制文件 dev local名为native 我通过adb推送它 具有root权限 为了实现这一点 我编写了以下代码 try root Runtime getRuntime exec su DataOutputStream o
  • 如何在猫鼬中使用.slice

    我在我的应用程序中得到了这个 Score find match in ids sort score sort descending slice skip limit exec function err scores if err score
  • 如何格式化 Microsoft JSON 日期?

    我正在第一次尝试Ajax http en wikipedia org wiki Ajax 28programming 29使用 jQuery 我正在将数据传输到页面上 但在处理为日期数据类型返回的 JSON 数据时遇到了一些问题 基本上 我
  • 两个项目之间的跨项目引用

    是否可以在两个 TypeScript 项目之间进行引用 假设我们有以下项目结构 Module1 ts包含 module TestModule export interface Interface1 Module2 ts包含 module T
  • 从 SciPY 导入某些模块时出现 ImportError

    我使用 Scipy 一段时间了 这是我第一次使用它进行信号处理 但是当我导入模块时 from scipy import signal from scipy import special 我收到错误 ImportError DLL load
  • 我应该在 Javascript 类型相等中使用 typeof 吗?

    什么是更好的 if obj undefined vs if typeof obj undefined 如果你无法克制自己对全球的影响undefined 或者无法避免尝试引用未声明的变量 然后使用 typeof x undefined 如果您
  • Kendo ui - 如何将验证与 mvc 模型属性联系起来

    通过阅读本文中的帖子thread http www kendoui com forums framework validation using kendo validator with mvc model properties aspx 由
  • Spacy tokenizer,添加 tokenizer 异常

    嘿 我正在尝试使用 spacy 2 02 添加对某些标记进行标记的例外 我知道存在这种情况 tokenizer add special case 我在某些情况下使用它 但例如像 100 美元这样的代币 spacy 分成两个代币 美元 SYM
  • 函数内的多线程变量访问

    我有一个函数可以启动几个线程 它是一个测试函数 其中一个线程会改变变量的状态 由于局部变量不能标记为易失性 因此我假设该方法中的多个线程将始终具有变量的更新状 态 它是否正确 这是示例代码 public void someMethod Mu
  • 找不到 tbb.dll

    我在 opencv 2 3 中使用 cvCanny 函数 它编译得很好 但在执行时出现错误 提示 tbb dll 未找到 这个dll有什么用处 在哪里可以找到这个 thanks 它是英特尔的一部分线程构建块 http threadingbu
  • Xcode 一直显示:-1: SWIFT_VERSION '5.0' 不受支持,支持的版本是:3.0、4.0、4.2。有什么解决方案吗?

    当我将 facebook SDK pod 安装到我的 ios 项目中时 它显示 1 SWIFT VERSION 5 0 不受支持 支持的版本为 3 0 4 0 4 2 在目标 FacebookCore 中 我用的是迅捷4 2 有什么解决办法
  • Git merge --squash 可以保留提交注释吗?

    有没有一种方法可以自动添加压缩后的所有提交注释mybranch执行时提交 git merge squash mybranch 这样单个提交包含所有提交注释的串联mybranch 我认为这就是 git merge squash 自动执行的操作
  • Spring云流从kinesis收到的消息中的特殊字符

    当我使用来自运动流的消息时 我收到一些带有标题等的垃圾字符 StreamListener Processor INPUT public void receive String message System out println Messa
  • iOS 在运行时更改应用程序图标

    我想在运行时更改我的应用程序图标 我在 SO 上阅读了其他类似的帖子 他们说使用 Apple 认可的 API 是不可能的 其中一份回复提到可以使用受限 API 但没有详细说明 我了解 如果我使用受限制的 API 我的应用程序范围将仅限于越狱
  • 为什么Python没有多行注释?

    好的 我知道三引号字符串可以用作多行注释 例如 Hello I am a multiline comment and Hello I am a multiline comment 但从技术上讲 这些都是字符串 对吗 我用谷歌搜索并阅读了 P
  • Visual Studio:变量更改时中断?

    我在 Visual Studio 2008 Professional 中调试的 C 类中有一个成员变量结构 我想在结构发生变化时中断 是否可以做到这一点 或者我必须寻找代码中可能更改的每个可能的点 并在那里设置断点 如果可以确定成员的地址
  • KafkaStreams 不生成发送到目标主题的消息

    我一直在尝试使用 KafkaStreams 来计算传感器读取的温度的移动平均值 我有 Producer 它从 mqtt 代理获取消息并将它们推送到 kafka String topic TEMPERATURE Producer