TL;DR - 我在 PySpark 应用程序中有看起来像字符串 DStream 的东西。我想将其作为DStream[String]
到 Scala 库。不过,Py4j 不会转换字符串。
我正在开发一个 PySpark 应用程序,该应用程序使用 Spark Streaming 从 Kafka 提取数据。我的消息是字符串,我想在 Scala 代码中调用一个方法,并将其传递给DStream[String]
实例。但是,我无法在 Scala 代码中接收正确的 JVM 字符串。在我看来,Python 字符串没有转换为 Java 字符串,而是被序列化了。
我的问题是:如何从 Java 字符串中取出字符串DStream
object?
这是我想出的最简单的 Python 代码:
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sparkContext=sc, batchDuration=int(1))
from pyspark.streaming.kafka import KafkaUtils
stream = KafkaUtils.createDirectStream(ssc, ["IN"], {"metadata.broker.list": "localhost:9092"})
values = stream.map(lambda tuple: tuple[1])
ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream)
ssc.start()
我在 PySpark 中运行此代码,并将其传递给我的 JAR 路径:
pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar
在 Scala 方面,我有:
package com.seigneurin
import org.apache.spark.streaming.api.java.JavaDStream
object MyPythonHelper {
def doSomething(jdstream: JavaDStream[String]) = {
val dstream = jdstream.dstream
dstream.foreachRDD(rdd => {
rdd.foreach(println)
})
}
}
现在,假设我将一些数据发送到 Kafka:
echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic IN
The println
Scala 代码中的语句打印如下内容:
[B@758aa4d9
我期望得到foo bar
反而。
现在,如果我替换简单的println
Scala 代码中的语句如下:
rdd.foreach(v => println(v.getClass.getCanonicalName))
I get:
java.lang.ClassCastException: [B cannot be cast to java.lang.String
这表明字符串实际上是作为字节数组传递的。
如果我只是尝试将此字节数组转换为字符串(我知道我什至没有指定编码):
def doSomething(jdstream: JavaDStream[Array[Byte]]) = {
val dstream = jdstream.dstream
dstream.foreachRDD(rdd => {
rdd.foreach(bytes => println(new String(bytes)))
})
}
我得到了一些东西looks像(特殊字符可能会被删除):
�]qXfoo barqa.
这表明 Python 字符串已序列化(腌制?)。我怎样才能检索正确的 Java 字符串呢?