使用 Scala 转换 PySpark RDD

2023-11-26

TL;DR - 我在 PySpark 应用程序中有看起来像字符串 DStream 的东西。我想将其作为DStream[String]到 Scala 库。不过,Py4j 不会转换字符串。

我正在开发一个 PySpark 应用程序,该应用程序使用 Spark Streaming 从 Kafka 提取数据。我的消息是字符串,我想在 Scala 代码中调用一个方法,并将其传递给DStream[String]实例。但是,我无法在 Scala 代码中接收正确的 JVM 字符串。在我看来,Python 字符串没有转换为 Java 字符串,而是被序列化了。

我的问题是:如何从 Java 字符串中取出字符串DStream object?


这是我想出的最简单的 Python 代码:

from pyspark.streaming import StreamingContext
ssc = StreamingContext(sparkContext=sc, batchDuration=int(1))

from pyspark.streaming.kafka import KafkaUtils
stream = KafkaUtils.createDirectStream(ssc, ["IN"], {"metadata.broker.list": "localhost:9092"})
values = stream.map(lambda tuple: tuple[1])

ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream)

ssc.start()

我在 PySpark 中运行此代码,并将其传递给我的 JAR 路径:

pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar

在 Scala 方面,我有:

package com.seigneurin

import org.apache.spark.streaming.api.java.JavaDStream

object MyPythonHelper {
  def doSomething(jdstream: JavaDStream[String]) = {
    val dstream = jdstream.dstream
    dstream.foreachRDD(rdd => {
      rdd.foreach(println)
    })
  }
}

现在,假设我将一些数据发送到 Kafka:

echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic IN

The printlnScala 代码中的语句打印如下内容:

[B@758aa4d9

我期望得到foo bar反而。

现在,如果我替换简单的printlnScala 代码中的语句如下:

rdd.foreach(v => println(v.getClass.getCanonicalName))

I get:

java.lang.ClassCastException: [B cannot be cast to java.lang.String

这表明字符串实际上是作为字节数组传递的。

如果我只是尝试将此字节数组转换为字符串(我知道我什至没有指定编码):

      def doSomething(jdstream: JavaDStream[Array[Byte]]) = {
        val dstream = jdstream.dstream
        dstream.foreachRDD(rdd => {
          rdd.foreach(bytes => println(new String(bytes)))
        })
      }

我得到了一些东西looks像(特殊字符可能会被删除):

�]qXfoo barqa.

这表明 Python 字符串已序列化(腌制?)。我怎样才能检索正确的 Java 字符串呢?


长话短说,没有支持的方法来做这样的事情。不要在生产中尝试这个。你已被警告过。

一般来说,除了驱动程序上的一些基本 RPC 调用之外,Spark 不会将 Py4j 用于其他任何用途,并且不会在任何其他计算机上启动 Py4j 网关。当需要时(主要是 MLlib 和 SQL 的某些部分)Spark 使用Pyrolite序列化 JVM 和 Python 之间传递的对象。

这部分 API 要么是私有的 (Scala),要么是内部的 (Python),因此不适合一般用途。虽然理论上您可以按批次访问它:

package dummy

import org.apache.spark.api.java.JavaRDD
import org.apache.spark.streaming.api.java.JavaDStream
import org.apache.spark.sql.DataFrame

object PythonRDDHelper {
  def go(rdd: JavaRDD[Any]) = {
    rdd.rdd.collect {
      case s: String => s
    }.take(5).foreach(println)
  }
}

完整流:

object PythonDStreamHelper {
  def go(stream: JavaDStream[Any]) = {
    stream.dstream.transform(_.collect {
      case s: String => s
    }).print
  }
}

或将各个批次暴露为DataFrames(可能是最不邪恶的选择):

object PythonDataFrameHelper {
  def go(df: DataFrame) = {
    df.show
  }
}

并按如下方式使用这些包装器:

from pyspark.streaming import StreamingContext
from pyspark.mllib.common import _to_java_object_rdd
from pyspark.rdd import RDD

ssc = StreamingContext(spark.sparkContext, 10)
spark.catalog.listTables()

q = ssc.queueStream([sc.parallelize(["foo", "bar"]) for _ in range(10)]) 

# Reserialize RDD as Java RDD<Object> and pass 
# to Scala sink (only for output)
q.foreachRDD(lambda rdd: ssc._jvm.dummy.PythonRDDHelper.go(
    _to_java_object_rdd(rdd)
))

# Reserialize and convert to JavaDStream<Object>
# This is the only option which allows further transformations
# on DStream
ssc._jvm.dummy.PythonDStreamHelper.go(
    q.transform(lambda rdd: RDD(  # Reserialize but keep as Python RDD
        _to_java_object_rdd(rdd), ssc.sparkContext
    ))._jdstream
)

# Convert to DataFrame and pass to Scala sink.
# Arguably there are relatively few moving parts here. 
q.foreachRDD(lambda rdd: 
    ssc._jvm.dummy.PythonDataFrameHelper.go(
        rdd.map(lambda x: (x, )).toDF()._jdf
    )
)

ssc.start()
ssc.awaitTerminationOrTimeout(30)
ssc.stop()

这不受支持,未经测试,因此除了 Spark API 实验之外,对其他任何事情都毫无用处。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 Scala 转换 PySpark RDD 的相关文章

随机推荐

  • 查询远程服务器上磁盘空间的最佳方法

    我试图通过查询所有驱动器然后循环直到找到我正在寻找的驱动器来确定远程服务器上的可用空间 有一个更好的方法吗 Dim oConn As New ConnectionOptions Dim sNameSpace As String mnb co
  • 在请求 URL 中公开表名和字段名

    我的任务是创建这个 Joomla 组件 是的 joomla 但它无关 一位教授告诉我 我应该使我的代码尽可能动态 需要较少维护的代码 并避免硬编码 我们最初想到的方法是获取 url 参数 将它们转换为对象 然后将它们传递给查询 假设我们要读
  • Android SDK 管理器不显示要安装的包

    我已经安装了 Android SDK 和软件包 由于我在Eclipse上打开新项目时出现错误 我决定卸载SDK并重新安装 现在的问题是我没有看到要安装的软件包 我只能看到 Android SDK 工具 如何重新安装并查看所有软件包 对于所有
  • 从构造函数初始值设定项列表传递 this

    在以下代码中将 this 传递给初始值设定项列表中的另一个对象时是否存在任何问题 class Callback public virtual void DoCallback 0 class B Callback cb public B Ca
  • Javascript图像上传和显示

    我的基本任务是选择图像并显示它 而不将其保存在数据库中 为了这 1 我在html中做了一个select标签 通过它我可以上传图像 2 我制作了一个空白图像标签 其中没有图像源 备用是上传图像 3 select标签有onchange java
  • 通过 Groovy 在 Java 中解析 XML

    我正在尝试使用 Groovy 和 Java 的 ScriptEngine API 解析 XML 下面的代码正是这样做的 但我想知道是否有更好的方法可以做到这一点 另外是否有任何与此相关的性能影响 import java util Array
  • 向量化矩阵中不同对角线的和

    我想对以下 MATLAB 代码进行向量化 我认为它一定很简单 但我发现它仍然令人困惑 r some constant less than m or n m n size C S zeros m r n r for i 1 m r 1 for
  • Tensorflow 移动应用程序:不是有效的 TensorFlow Graph 序列化:NodeDef 提到 attr“扩张”不在 Op 中

    我尝试替换示例中的 graph pb 文件https codelabs developers google com codelabs tensorflow for poets 2但它无法在 Android 中启动 并出现错误 不是有效的 T
  • 如果两个不同类的实例具有相同的属性,如何自动映射两个不同类的实例之间的值?

    我有两个类 它们具有完全相同的成员 属性和字段 和相同的数据类型 我想以自动方式将成员从一个映射到另一个 我知道有更实用的开发方式可以处理 一种简单的解决方案是手动将每个成员从一个实例映射到另一个实例 但是 我想将其自动化作为一些通用解决方
  • iOS 14.2 Beta - AVPlayer 无法播放 [关闭]

    Closed 这个问题是基于意见的 目前不接受答案 Apple 在 iOS 14 2 beta 中对 iOS 14 0 的 AVFoundation 进行了重大更改 正在播放控制中心 UI 更改等 问题是 在 iOS 14 0 中完美运行的
  • 存储 SEPA(IBAN 和 BIC)数据 - 需要 PCI 合规性吗?

    我们希望使用银行 API 从我们的银行帐户到用户的银行帐户进行 SEPA 转账 为此 用户需要在表格中输入他的 IBAN 和 BIC 我们获取这些数据 受 SSL 保护 并使用银行 REST API 转账 如果我们收到成功响应 我们会向用户
  • 如何安装 Rails MySQL 适配器?

    我的问题仅此而已 gem install mysql不起作用 我还没有通过谷歌搜索找到任何东西 当我尝试时gem install mysql2 这就是我得到的 我现在不知道该怎么办 jason buster projects mcif ra
  • 使用全宽,不包括溢出滚动条和“位置:绝对”

    我希望在固定顶部位置有一个全宽的小红色 div 位于另一个具有overflow scroll 我希望 jsFiddle 说清楚 http jsfiddle net mCYLm 2 问题是红色 div 与滚动条重叠 我猜right 0意味着右
  • 如何在 Angularjs 中观察按键组合? [复制]

    这个问题在这里已经有答案了 我正在尝试让我的控制器监视按键组合 为了便于论证 我们可以说 上 下 下 左 右 左 右 b a 无论用户当前位于页面的哪个位置 我怎样才能有角度地寻找这些内容 看起来你可以使用ng keydown去做这个 这里
  • 在流星中处理多个“页面”的正确方法

    在流星中处理多个 页面 的 正式 方式是什么 我说 页面 我见过人们用几种不同的方式来做这件事 我见过人们创建实际的完整页面 index html about html contact html 然后当单击链接时 您将编写一个路由来呈现这些
  • Facebook Account Kit 与 Google play services gradle 的冲突

    配置account kit sdk后为 compile com facebook android account kit sdk 4 和 gradle 同步它与冲突com google android gms gradle as 混合版本可
  • JavaScript 变量引用/别名

    javascript 中是否可以以某种方式为本地变量分配别名 引用 我的意思是类似 C 的东西 function foo var x 1 var y x y alert x prints 2 EDIT 是否可以在这段代码中为argument
  • 根据字典替换 NumPy 数组中的值,并避免新值和键之间的重叠

    我想根据 python 中的以下字典替换 2D numpy 数组中的值 code region 334 0 4 22 8 31 12 16 16 17 24 27 28 18 32 21 36 1 我想找到以下单元格numpy匹配的二维数组
  • 最佳实践:PHP 魔术方法 __set 和 __get [重复]

    这个问题在这里已经有答案了 可能的重复 魔术方法是 PHP 中的最佳实践吗 这些都是简单的示例 但想象一下您的类中拥有的属性多于两个 最佳实践是什么 a 使用 get 和 set class MyClass private firstFie
  • 使用 Scala 转换 PySpark RDD

    TL DR 我在 PySpark 应用程序中有看起来像字符串 DStream 的东西 我想将其作为DStream String 到 Scala 库 不过 Py4j 不会转换字符串 我正在开发一个 PySpark 应用程序 该应用程序使用 S