我正在尝试对主题数据进行一些丰富。因此,使用 Spark 结构化流从 Kafka 接收器读回 Kafka。
val ds = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("group.id", groupId)
.option("subscribe", "topicname")
.load()
val enriched = ds.select("key", "value", "topic").as[(String, String, String)].map(record => enrich(record._1,
record._2, record._3)
val query = enriched.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("group.id", groupId)
.option("topic", "desttopic")
.start()
但我遇到了一个例外:
Exception in thread "main" java.lang.UnsupportedOperationException: Data source kafka does not support streamed writing
at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:287)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:266)
at kafka_bridge.KafkaBridge$.main(KafkaBridge.scala:319)
at kafka_bridge.KafkaBridge.main(KafkaBridge.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
有什么解决方法吗?
As T·加文达 https://stackoverflow.com/a/42996602/1305344如上所述,没有 kafka 格式可以将流数据集写入 Kafka(即 Kafka 接收器)。
目前Spark 2.1中推荐的解决方案是使用foreach操作符 http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach.
foreach 操作允许对输出数据进行任意操作。从 Spark 2.1 开始,这仅适用于 Scala 和 Java。要使用它,您必须实现 ForeachWriter 接口(Scala/Java 文档),该接口具有每当触发器后生成作为输出的行序列时就会调用的方法。请注意以下要点。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)