flink 中的 Kafka 消费者

2023-12-10

我正在使用 kafka 和 apache flink。我正在尝试使用 apache flink 中的 kafka 主题的记录(采用 avro 格式)。下面是我正在尝试使用的代码片段。

使用自定义反序列化器对主题中的 avro 记录进行反序列化。

我发送到主题“test-topic”的数据的 Avro 架构如下。

{
  "namespace": "com.example.flink.avro",
  "type": "record",
  "name": "UserInfo",
  "fields": [
    {"name": "name", "type": "string"}
  ]
}

我正在使用的自定义解串器如下。

public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {

    private static final long serialVersionUID = 1L;

    private final Class<T> avroType;

    private transient DatumReader<T> reader;
    private transient BinaryDecoder decoder;

    public AvroDeserializationSchema(Class<T> avroType) {
        this.avroType = avroType;
    }


    public T deserialize(byte[] message) {
        ensureInitialized();
        try {
            decoder = DecoderFactory.get().binaryDecoder(message, decoder);
            T t = reader.read(null, decoder);
            return t;
        } catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    private void ensureInitialized() {
        if (reader == null) {
            if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {
                reader = new SpecificDatumReader<T>(avroType);
            } else {
                reader = new ReflectDatumReader<T>(avroType);
            }
        }
    }


    public boolean isEndOfStream(T nextElement) {
        return false;
    }


    public TypeInformation<T> getProducedType() {
        return TypeExtractor.getForClass(avroType);
    }
}

这就是我的 flink 应用程序的编写方式。

public class FlinkKafkaApp {


    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties kafkaProperties = new Properties();
        kafkaProperties.put("bootstrap.servers", "localhost:9092");
        kafkaProperties.put("group.id", "test");

        AvroDeserializationSchema<UserInfo> schema = new AvroDeserializationSchema<UserInfo>(UserInfo.class);

        FlinkKafkaConsumer011<UserInfo> consumer = new FlinkKafkaConsumer011<UserInfo>("test-topic", schema, kafkaProperties);

        DataStreamSource<UserInfo> userStream = env.addSource(consumer);

        userStream.map(new MapFunction<UserInfo, UserInfo>() {

            @Override
            public UserInfo map(UserInfo userInfo) {
                return userInfo;
            }
        }).print();

        env.execute("Test Kafka");

    }

我正在尝试打印发送到主题的记录,如下所示。{"name" :"sumit"}

Output:

我得到的输出是{"name":""}

任何人都可以帮助找出这里的问题是什么以及为什么我没有得到{"name" : "sumit"}作为输出。


Flink 文档说: Flink 的 Kafka Consumer 称为 FlinkKafkaConsumer08(对于 Kafka 0.9.0.x 版本为 09 等,对于 Kafka >= 1.0.0 版本则称为 FlinkKafkaConsumer)。它提供对一个或多个 Kafka 主题的访问。

我们不必编写自定义反序列化器来使用来自 Kafka 的 Avro 消息。

-读取特定记录:

DataStreamSource<UserInfo> stream = streamExecutionEnvironment.addSource(new FlinkKafkaConsumer<>("test_topic", AvroDeserializationSchema.forSpecific(UserInfo.class), properties).setStartFromEarliest());

要读取 GenericRecords :

Schema schema = Schema.parse("{"namespace": "com.example.flink.avro","type": "record","name": "UserInfo","fields": [{"name": "name", "type": "string"}]}");
DataStreamSource<GenericRecord> stream = streamExecutionEnvironment.addSource(new FlinkKafkaConsumer<>("test_topic", AvroDeserializationSchema.forGeneric(schema), properties).setStartFromEarliest());

更多细节 :https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumer

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

flink 中的 Kafka 消费者 的相关文章

随机推荐

  • DuplicateHandle(),在第一个或第二个进程中使用?

    Windows API DuplicateHandle http msdn microsoft com en us library ms724251 VS 85 aspx需要复制对象句柄以及原始进程和要在其中使用复制句柄的其他进程的句柄 我
  • 如何使用flask创建进度条? [复制]

    这个问题在这里已经有答案了 只是想在我的 html 页面中插入一个进度条 它应该从我的 app py 中的 for 加载 这就是我到目前为止所做的 app py from flask import Flask render template
  • 创建具有不同样式的大量文本 - JavaFX FXML

    在我的 JavaFx 应用程序的 fxml 类中 我想使用最少的组件添加大量文本 而不是每行添加多个标签 我还想在同一组件中创建不同样式的文本 我应该使用什么组件 例如 TextArea 以及如何在其中创建多种样式 使用 css Use a
  • 如何使用标准输入在 Swift 3.0 中运行进程

    我在使用 Swift Process 运行 MySQL 恢复转储文件时遇到问题 let command usr local bin mysql h theHost P 3306 u root pTheInlinePassword examp
  • 使用对象映射器解析嵌套的字典数组

    我正在解析一个 Web api 响应 它是一个字典数组 每个字典又都有一个嵌套的字典数组 我该如何解析它 请提供一些代码示例 我的 API 响应是 FilingStatusId 0 FormName MISC OrderId 0 Recip
  • 如何将 HTTPS 与 Microsoft.AspNet.Server.WebListener 结合使用

    在本文的最后http www asp net vnext overview aspnet vnext create a web api with mvc 6它描述了如何使用 Microsoft AspNet Server WebListen
  • 如何实现在更改时自动更新的可变 PickleTypes

    SQLAlchemy 提供PickleType和优惠突变追踪对于任何可变的类型 如字典 SQLAlchemy 文档提到这是实现可变的方法PickleType但它没有具体说明如何进行 Note 我想在中存储一个字典PickleType 你如何
  • 何时使用 HtmlControls 与 WebControls

    我喜欢 HtmlControls 因为没有 HTML 魔法 asp 源代码看起来与客户端看到的类似 我无法否认 GridView Repeater CheckBoxLists 等的实用性 因此当我需要这些功能时我会使用它们 另外 混合和匹配
  • 使用 intptr_t 而不是 void*?

    使用是一个好主意吗intptr t作为通用存储 保存指针和整数值 而不是void 如下所示 http www crystalspace3d org docs online manual Api1 005f0 64 002dBit Porta
  • 如何使用 pyinstaller 将多个 python 文件编译为单个 .exe 文件

    我已经在 python 中创建了一个 GUI 使用 Tkinter 并且使用 os system python file py 从 GUI 单击按钮即可运行 python 文件 我想使用 pyinstaller 将所有这些 python 文
  • 在 KUbuntu 22.04 上的 Visual Studio Code 中点击快速修复键盘快捷键会生成“e”

    在我的 KUbuntu 22 04 中 当我按下键盘快捷键进行快速修复时 即ctrl 在应用程序中 它产生一个小 e 而不是做任何它期望做的事情 我在网上搜索了这个问题 只找到了这个link 但是 它没有给出解决此问题的任何指导 有人遇到过
  • 安全性:tcl 中的会话标识符未更新

    我正在开发开源应用程序 项目 开放 在扫描过程中我发现了以下漏洞 Medium Session Identifier Not Updated Issue 13800882 Severity Medium URL https
  • 如何在 mysql 查询的“IN”子句中使用 PHP 中的值数组?

    get all id s of ur friend that has installed your application friend pics facebook gt api array method gt fql query quer
  • Next.js getServerSideProps 始终未定义

    我已经开始使用新的 Next 应用程序 并尽可能使用功能组件而不是基于类的组件 继文档 我设置了以下内容但没有运气 import React from react import GetServerSideProps InferGetServ
  • ui grid 将更新的单元格数据保存到数据库

    我正在研究 ui 网格编辑单元格功能 我需要使用 REST API 将编辑后的单元格值更新到数据库 另外 我如何获取控制器中选择的行列表 我的工作代码 var app angular module app ngTouch ui grid u
  • 使用JNA加载多个依赖库

    JNA中有没有办法用Java加载多个依赖库 我通常使用Native loadLibrary 加载一个 DLL 但我猜它不会以这种方式工作 因为我将此函数调用分配给实例成员 假设我有图书馆foo和图书馆bar bar依赖于foo 它也依赖于b
  • 多数独人工智能方法

    我正在概念化一个求解器的变体sudoku called 多重数独 其中多个板重叠 如下所示 如果我正确理解游戏 那么您必须以这样的方式解决每个网格 即任何两个或多个网格之间的重叠都是每个网格解决方案的一部分 我不确定我应该如何思考这个问题
  • 为什么 IntelliJ 的 Java 编辑器中添加灰色的 var:colon

    我安装了IntelliJ 2016 3 2 构建 IC 163 10154 41 建于2016年12月21日 灰色的 var colon 会自动添加到 Java 编辑器中调用方方法的参数前面 如下所示 添加灰色的 a b 为什么会发生这种情
  • 通过 Cordova config.xml 将条目添加到 iOS .plist 文件

    我是 Cordova CLI 的新手 我需要通过 Cordova 以编程方式执行以下步骤 在项目 plist中添加一个新行 在新行中输入以下值 Key GD库模式Type 字符串 默认 Value GD企业模拟 我想我需要在项目根目录下的
  • flink 中的 Kafka 消费者

    我正在使用 kafka 和 apache flink 我正在尝试使用 apache flink 中的 kafka 主题的记录 采用 avro 格式 下面是我正在尝试使用的代码片段 使用自定义反序列化器对主题中的 avro 记录进行反序列化