下沉 kafka 流时看不到消息,并且在 flink 1.2 中看不到打印消息

2024-01-09

我的目标是使用kafka读取json格式的字符串,对字符串进行过滤,然后将消息接收出来(仍然是json字符串格式)。

出于测试目的,我的输入字符串消息如下所示:

{"a":1,"b":2}

我的实现代码是:

def main(args: Array[String]): Unit = {

// parse input arguments
val params = ParameterTool.fromArgs(args)

if (params.getNumberOfParameters < 4) {
  println("Missing parameters!\n"
    + "Usage: Kafka --input-topic <topic> --output-topic <topic> "
    + "--bootstrap.servers <kafka brokers> "
    + "--zookeeper.connect <zk quorum> --group.id <some id> [--prefix <prefix>]")
  return
}

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.disableSysoutLogging
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
// create a checkpoint every 5 seconds
env.enableCheckpointing(5000)
// make parameters available in the web interface
env.getConfig.setGlobalJobParameters(params)

// create a Kafka streaming source consumer for Kafka 0.10.x
val kafkaConsumer = new FlinkKafkaConsumer010(
  params.getRequired("input-topic"),
  new JSONKeyValueDeserializationSchema(false),
  params.getProperties)

val messageStream = env.addSource(kafkaConsumer)

val filteredStream: DataStream[ObjectNode] = messageStream.filter(node => node.get("a").asText.equals("1")
                      && node.get("b").asText.equals("2"))

messageStream.print()
// Refer to: https://stackoverflow.com/documentation/apache-flink/9004/how-to-define-a-custom-deserialization-schema#t=201708080802319255857
filteredStream.addSink(new FlinkKafkaProducer010[ObjectNode](
  params.getRequired("output-topic"),
  new SerializationSchema[ObjectNode] {
    override def serialize(element: ObjectNode): Array[Byte] = element.toString.getBytes()
  }, params.getProperties
))

env.execute("Kafka 0.10 Example")
}

可以看出,我想将消息流打印到控制台并将过滤后的消息接收到kafka。然而,我看不到他们两个。

有趣的是,如果我将 KafkaConsumer 的模式从 JSONKeyValueDeserializationSchema 修改为 SimpleStringSchema,我可以看到 messageStream 打印到控制台。代码如下图:

 val kafkaConsumer = new FlinkKafkaConsumer010(
  params.getRequired("input-topic"),
  new SimpleStringSchema,
  params.getProperties)

val messageStream = env.addSource(kafkaConsumer)
messageStream.print()

这让我想到如果我使用 JSONKeyValueDeserializationSchema,我的输入消息实际上不会被 Kafka 接受。但这看起来很奇怪,与在线文档有很大不同(https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html)

希望有人可以帮助我!


The JSONKeyValue反序列化Schema()每个 kafka 消息都需要消息密钥,并且我假设在生成 JSON 消息并通过 kafka 主题发送时没有提供密钥。

因此,要解决该问题,请尝试使用JSONDeserializationSchema()它只需要消息并根据收到的消息创建一个对象节点。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

下沉 kafka 流时看不到消息,并且在 flink 1.2 中看不到打印消息 的相关文章

  • Apache Flink AWS S3 Sink 是否需要 Hadoop 进行本地测试?

    我对 Apache Flink 比较陌生 我正在尝试创建一个简单的项目 将文件生成到 AWS S3 存储桶 根据文档 我似乎需要安装 Hadoop 才能执行此操作 如何设置本地环境来测试此功能 我在本地安装了 Apache Flink 和
  • Flink CEP:对于不同类型的事件,使用哪种方法加入数据流?

    假设我有两种不同类型的数据流 一种提供天气数据 另一种提供车辆数据 我想使用 Flink 对数据进行复杂的事件处理 Flink 1 3 x 中哪种方法是正确的使用方法 我看到了不同的方法 如 Union Connect Window Joi
  • 如何在流式传输之前知道音频歌曲的持续时间?

    我正在制作一个流音频歌曲的应用程序 在自定义媒体播放器中 我必须显示该音频文件的总持续时间 如果一首音频歌曲是 SDCard 我可以使用以下方法知道它的持续时间 MediaPlayer player public double durati
  • 结构化 Spark 流指标检索

    我有一个具有结构化 Spark 流的应用程序 我想获取一些指标 例如调度延迟 延迟等 通常 此类指标可以在 Spark UI Streaming 选项卡中找到 但是 结构化流不存在此类功能我知道 那么如何获取这些指标值呢 目前 我尝试使用查
  • 使用 commons-exec 流式输出?

    谁能给我一个例子来说明如何流式传输外部程序的输出DefaultExecutor 我没有找到任何描述如何执行此操作的文档 我的外部进程将运行几个小时 因此仅获取所有输出数据是不可行的 它必须被流式传输 注意 此解决方案是同步的 因此它不会流式
  • Apache Flink - 作业内部无法识别自定义 java 选项

    我已将以下行添加到 flink conf yaml 中 env java opts Ddy props path PATH TO PROPS FILE 启动 jobmanager jobmanager sh start cluster 时
  • HTML 5 视频流 .ism 文件?

    我有一个带有媒体服务 4 0 的 IIS 7 0 服务器设置 我创建了一个非常简单的 html 5 页面 其中包含video以其source指向一个 ism文件 是否可以使用 html 5 中的 ism 文件的清单来播放视频 就像在 sil
  • 如何使用 VLC 以 ​​http 方式将视频流式传输到其他计算机 [关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我想使用以下方式将视频从我的计算机流式传输到另一台计算机http in vlc 我已从此处阅读了如何进行视频流传输的步骤https wik
  • Spark Streaming以Parquet格式附加到S3,小分区太多

    我正在构建一个使用 Spark Streaming 从 AWS EMR 上的 Kinesis 流接收数据的应用程序 目标之一是将数据持久保存到 S3 EMRFS 中 为此我使用 2 分钟的非重叠窗口 我的做法 Kinesis Stream
  • 我应该将 FLV 文件放在哪里才能在本地 Red5 服务器上进行流式传输?

    我安装了最新的 Red5 服务器 但我不确定将 flv 文件放在哪里来进行流式传输 没有像我在网上找到的一些教程那样的 streams 或 ofla 目录 我应该将 flv 文件放在哪里来进行流式传输 Red5 附带了一些演示 但默认情况下
  • 使用流式 JSON 输出构建简单的 Nodejs API

    我正在尝试构建一个简单的基于 Node js 的流 API 我想做的就是当我点击服务器 URL 时 输出应该流式传输一组测试数据 JSON 如 Twitter 流 API var app require express var server
  • 播放 video.js ustream m3u8 文件流

    我尝试在网页中播放带有 video js 的 m3u8 文件流 但我无法做到这一点 我不知道错误在哪里
  • 使用 Servlet 启动 VLC HTTP Stream 时出现问题

    我正在为自己开发一个 VLC 项目 我的目标是创建一个 HTML 前端来启动流 我通过使用 Java Servlet 来完成此操作 概述 乌班图13 04 Java 7 21 冰茶 2 3 9 Eclipse JAVAEE IDE 雄猫7
  • MySQL使用BLOB的二进制存储VS OS文件系统:大文件、大数量、大问题

    我正在运行的版本 基本上 最新的一切 PHP 5 3 1MySQL 5 1 41阿帕奇 2 2 14操作系统 CentOS 最新 情况是这样的 我有数千个非常重要的文档 从客户合同到语音签名 客户对合同的授权录音 文件类型包括但不限于jpg
  • Storm动态拓扑

    Storm 支持动态拓扑吗 我想要的功能是在 Storm 拓扑运行时根据用户要求动态更改拓扑 例如 当用户想知道流的前 10 个单词时 我使用前 10 个 Bolt 来处理它 当用户想知道其他内容时 我使用另一个 Bolt 来处理流并 拔掉
  • 如何使用 Angular/Ionic/JS 显示 Motion JPEG 二进制数据流?

    我正在为设备编写应用程序 此类设备将收到 POST 请求 并发回multipart x mixed replace二进制数据流 我必须在我的应用程序主页的一部分上显示此类流 我查了一下 这种情况的资源非常有限 到目前为止 我发现如果 Mot
  • 音频流的最佳实践

    我正在编写一个应用程序来播放远程服务器的音频 我尝试了多种方法来实现流音频 但它们对我来说都不够好 这就是我尝试过的 幼稚地使用 MediaPlayer 就像是 MediaPlayer player new MediaPlayer play
  • 黑莓上的视频流

    有没有办法从服务器流式传输和播放视频文件 黑莓是否提供可以播放流视频的内置视频播放器 是的你可以 在 bb 设备上串流视频有两种方法 使用 jsr 135 中的 javax microedition media Player 使用标准媒体应
  • C#:将音频文件从服务器流式传输到客户端

    我目前正在编写一个应用程序 该应用程序将允许用户安装某种形式的应用程序 可能是 Windows 服务 该应用程序将在其 PC 上打开一个端口 并在硬盘上指定一个特定的目的地 然后能够流式传输 mp3 文件 然后 我将有另一个应用程序 该应用
  • Flink 窗口:聚合并输出到接收器

    我们有一个数据流 其中每个元素都是这种类型 id String type Type amount Integer 我们想要聚合这个流并输出总和amount每周一次 目前的解决方案 Flink 管道示例如下所示 stream keyBy ty

随机推荐

  • 是否可以编译 linq-to-objects 的查询

    我有一个递归循环中的 linq to 对象查询 担心当对象接近 1000 个以上并且网站上的用户超过 100 个时 我的网站就会崩溃 那么是否可以编译 linq to object 查询 linq 查询只是查找节点的直接子节点 看看为什么这
  • 在另一个 Case 语句中使用 Case 语句的结果

    我有相当长的SELECT查询 但我已将相关部分粘贴到此处 我需要使用我的结果CASE在另一个语句中使用CASE陈述 我正在 SQL Server 中执行此操作 将非常感谢您的帮助 SELECT CompanyContact Name AS
  • Logstash可以直接读取远程日志吗?

    我是 Logstash 的新手 几天来我一直在阅读有关它的内容 和大多数人一样 我试图拥有一个集中式日志系统并将数据存储在 elasticsearch 中 然后使用 kibana 来可视化数据 我的应用程序部署在许多服务器中 因此我需要从所
  • 如何在 then 语句中返回一系列承诺

    因此 在过去的几个小时里 我一直在研究异步内容并使用 Promise 我正在使用测试框架量角器 并且有一些异步问题我遇到了麻烦 在此保存函数中 我异步调用 cm org1 all 然后使用 then 获取响应 我循环响应 并且需要对响应中的
  • hibernate hbm 文件中的 @Convert 相当于什么?

    我写了一个属性转换器 我想将其应用到一个实体中 到目前为止 我正在遵循纯粹的 XML 方法 我找不到相当于 Convert in hbm符号 举个例子将不胜感激 当我搜索这个时 可以理解的是 Google 返回了很多有关 自动将 hbm 文
  • VS2015中c#类的每个方法添加断点

    我有一个类 我希望调试器在调用其任何方法时停止 我尝试使用 New function breakpoint 但我找不到可用的通配符 Function Name 例如 我尝试了 MyNamespace MyClass 有 c 的示例 但它们似
  • 如何检测硬件按键点击?

    我现在需要检测当 flutter 应用程序位于前台或打开时是否按下了任何硬件按钮 例如 当有人按下音量或另一个按钮 即使是关闭电源的按钮 时 我想在我的应用程序中执行某些操作 我知道 当打开一个 flutter 应用程序并且我正在查看应用程
  • 无法在 groovy 中传递闭包

    我正在尝试运行 Geb 库的基本示例 http www gebish org manual current intro html introduction 这是代码 import geb Browser Browser drive go h
  • 环形包裹地图上一组点之间的“质心”,可最小化到所有点的平均距离

    edit 正如有人指出的那样 我正在寻找的实际上是最小化所有其他点之间的总测地距离的点 我的地图在地形上与 吃豆人 和 小行星 中的地图相似 越过顶部将使您扭曲到底部 越过左侧将使您扭曲到右侧 假设我在地图上有两个点 质量相同 我想找到它们
  • 开发面板中的本地化错误

    我购买了一个应用程序 尝试更新开发面板上的信息 当我尝试保存时收到以下错误 您的 1 个本地化内容有错误 它显示了错误位置 但我不知道问题是什么 如果您在选中媒体管理器中的复选框以使用新的 较大的屏幕尺寸屏幕截图之前没有删除较小 较旧尺寸的
  • Web.config保存问题

    我想通过 Web 应用程序的前端向用户公开一些 web config 设置 我可以毫无问题地检索设置 但是当我保存时 我要么收到错误 要么更改不会保留到 web config 文件中 我是在VS中调试的 如果我运行这个 private vo
  • 根据颜色图绘制条形图中的 y 值

    我已经在论坛上搜索过 发现this https stackoverflow com questions 42656585 barplot colored according a colormap 但我的问题有点不同 正如您从代码和下面的图像
  • 自定义 DataGridView 重复列

    我通过从 DataGridView 子类化创建了一个自定义 Winforms 控件 自定义 datagridview 定义自己的列和映射 但是 每当我将其从工具箱拖到窗体的设计图面上时 窗体都会为自定义控件中的每一列重新创建一个列控件 Da
  • Keras ML 库:梯度更新后如何进行权重裁剪? TensorFlow 后端

    我正在尝试使用 Keras 来实现需要权重裁剪的算法的一部分 即限制梯度更新后的权重值 到目前为止 我还没有通过网络搜索找到任何解决方案 作为背景 这与 WGAN 算法有关 https arxiv org pdf 1701 07875 pd
  • Spring Security OAuth 与 JWK 示例

    有人有一个带有 JWT 和非对称密钥的 Spring Security OAuth 2 资源服务器 SP 示例 该示例使用带有 JWKS 端点的 JWK 吗 多谢 散文 Spring Security OAuth 2 资源服务器可以配置为使
  • 什么会导致无法计算 UDP 数据报的 IP 标头校验和?

    我试图将 UDP 数据报从 Windows XP 上的 UdpClient 发送到设备 但它没有响应 当我在 Wireshark 中查看该流量时 我发现出站数据包很糟糕 因为它们的所有 IP 标头校验和都是 0x0000 该机器有两个网卡
  • 执行 kubeadm Reset 后 Kubernetes 无法为 pod 设置网络

    我用以下命令初始化了 Kuberneteskubeadm init 并且在我使用之后kubeadm reset重置它我发现 pod network cidr错了 更正后我尝试使用kubeadm像这样再次初始化 Kubernetes kube
  • Logstash 的 Django 日志记录格式

    我正在尝试将 django 应用程序配置为以 Logstash 易于使用的格式写入日志 受到 Node 的 Winston 日志记录包的启发 Logstash 需要一个 JSON 对象 其中包含键 message 和时间戳 timestam
  • 未找到“Mage_Googlecheckout_Helper_Data”类

    我们刚刚从 Magento 版本 1 8 0 0 升级到 1 8 1 0 现在当我们转到站点的配置部分时 我们会收到以下消息 Fatal error Class Mage Googlecheckout Helper Data not fou
  • 下沉 kafka 流时看不到消息,并且在 flink 1.2 中看不到打印消息

    我的目标是使用kafka读取json格式的字符串 对字符串进行过滤 然后将消息接收出来 仍然是json字符串格式 出于测试目的 我的输入字符串消息如下所示 a 1 b 2 我的实现代码是 def main args Array String