无法使用 Scala 在 Apache Spark 中执行用户定义的函数

2024-01-22

我有以下数据框:

+---------------+-----------+-------------+--------+--------+--------+--------+------+-----+
|   time_stamp_0|sender_ip_1|receiver_ip_2|s_port_3|r_port_4|acknum_5|winnum_6| len_7|count|
+---------------+-----------+-------------+--------+--------+--------+--------+------+-----+
|06:36:16.293711|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58| 65161|  130|
|06:36:16.293729|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58| 65913|  130|
|06:36:16.293743|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|131073|  130|
|06:36:16.293765|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|196233|  130|
|06:36:16.293783|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|196985|  130|
|06:36:16.293798|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|262145|  130|
|06:36:16.293820|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|327305|  130|
|06:36:16.293837|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|328057|  130|
|06:36:16.293851|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|393217|  130|
|06:36:16.293873|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|458377|  130|
|06:36:16.293890|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|459129|  130|
|06:36:16.293904|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|524289|  130|
|06:36:16.293926|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|589449|  130|
|06:36:16.293942|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|590201|  130|
|06:36:16.293956|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|655361|  130|
|06:36:16.293977|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|720521|  130|
|06:36:16.293994|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|721273|  130|
|06:36:16.294007|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|786433|  130|
|06:36:16.294028|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|851593|  130|
|06:36:16.294045|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|852345|  130|
+---------------+-----------+-------------+--------+--------+--------+--------+------+-----+
only showing top 20 rows

我必须向我的产品添加功能和标签dataframe来预测计数值。但是,当我运行代码时,我会看到以下错误:

Failed to execute user defined function(anonfun$15: (int, int, string, string, int, int, int, int, int) => vector)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)

I also cast(IntegerType)我的所有功能,但再次出现错误。这是我的代码:

val Frist_Dataframe = sqlContext.createDataFrame(Row_Dstream_Train, customSchema)

       val toVec9 = udf[Vector, Int, Int, String, String, Int, Int, Int, Int, Int] { (a, b, c, d, e, f, g, h, i) =>
              val e3 = c match {
                case "10.0.0.1" => 1
                case "10.0.0.2" => 2
                case "10.0.0.3" => 3
              }

              val e4 = d match {
                case "10.0.0.1" => 1
                case "10.0.0.2" => 2
                case "10.0.0.3" => 3
              }
              Vectors.dense(a, b, e3, e4, e, f, g, h, i)
            }

            val final_df = Dataframe.withColumn(
              "features",
              toVec9(
                // casting into Timestamp to parse the string, and then into Int
                $"time_stamp_0".cast(TimestampType).cast(IntegerType),
                $"count".cast(IntegerType),
                $"sender_ip_1",
                $"receiver_ip_2",
                $"s_port_3".cast(IntegerType),
                $"r_port_4".cast(IntegerType),
                $"acknum_5".cast(IntegerType),
                $"winnum_6".cast(IntegerType),
                $"len_7".cast(IntegerType)
              )
            ).withColumn("label", (Dataframe("count"))).select("features", "label")

Final_df.show()

val trainingTest = final_df.randomSplit(Array(0.8, 0.2))
val TrainingDF = trainingTest(0).toDF()
val TestingDF=trainingTest(1).toDF()
TrainingDF.show()
TestingDF.show()

我的依赖项还有:

libraryDependencies ++= Seq(
  "co.theasi" %% "plotly" % "0.2.0",
  "org.apache.spark" %% "spark-core" % "2.1.1",
  "org.apache.spark" %% "spark-sql" % "2.1.1",
  "org.apache.spark" %% "spark-hive" % "2.1.1",
  "org.apache.spark" %% "spark-streaming" % "2.1.1",
  "org.apache.spark" %% "spark-mllib" % "2.1.1"
)

最有趣的一点是,如果我改变我所有的cast(IntegerType) to cast(TimestampType).cast(IntegerType)在我的代码的最后一部分,错误消失,输出将如下所示:

+--------+-----+
|features|label|
+--------+-----+
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
+--------+-----+

UPDATE:应用@Ramesh Maharjan 解决方案后,我的数据帧的结果运行良好,但是,每当我尝试将 Final_df 数据帧拆分为训练和测试时,结果如下所示,并且我仍然遇到相同的空行问题。

+--------------------+-----+
|            features|label|
+--------------------+-----+
|                null|  130|
|                null|  130|
|                null|  130|
|                null|  130|
|                null|  130|
|                null|  130|
|                null|  130|
|                null|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
+--------------------+-----+

你能帮助我吗?


我没看到count column在您的问题代码中生成。除了count专栏@Shankar的回答应该会给你你想要的结果。

以下错误是由于错误的定义造成的udf@Shankar 在他的回答中更正了该函数。

Failed to execute user defined function(anonfun$15: (int, int, string, string, int, int, int, int, int) => vector)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)

以下错误是由于version不匹配spark-mllib library with spark-core library and spark-sql library。它们都应该是相同的版本。

error: Caused by: org.apache.spark.SparkException: Failed to execute user defined function(anonfun$15: (int, int, string, string, int, int, int, int, int) => vector) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen‌​eratedIterator.proce‌​ssNext(Unknown Source) 

我希望解释清楚,并希望您的问题尽快得到解决。

Edited

你还没有改变udf按照@Shankar 的建议运行。添加.trim我也可以看到一些空间

val toVec9 = udf ((a: Int, b: Int, c: String, d: String, e: Int, f: Int, g: Int, h: Int, i: Int) =>
  {
  val e3 = c.trim match {
    case "10.0.0.1" => 1
    case "10.0.0.2" => 2
    case "10.0.0.3" => 3
  }
  val e4 = d.trim match {
    case "10.0.0.1" => 1
    case "10.0.0.2" => 2
    case "10.0.0.3" => 3
  }
  Vectors.dense(a, b, e3, e4, e, f, g, h, i)
})

看看你的依赖关系,你正在使用%%这告诉sbt下载dependencies包装有scala您系统中的版本。这应该没问题,但由于您仍然收到错误,我想更改dependencies as

libraryDependencies ++= Seq(
  "co.theasi" %% "plotly" % "0.2.0",
  "org.apache.spark" % "spark-core_2.11" % "2.1.1",
  "org.apache.spark" % "spark-sql_2.11" % "2.1.1",
  "org.apache.spark" %% "spark-hive" % "2.1.1",
  "org.apache.spark" % "spark-streaming_2.11" % "2.1.1",
  "org.apache.spark" % "spark-mllib_2.11" % "2.1.1"

)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

无法使用 Scala 在 Apache Spark 中执行用户定义的函数 的相关文章

随机推荐

  • Checkstyle:尾随空格正则表达式问题

    我正在将 Checkstyle 添加到我的项目中 但检测空格的规则不够好 RegexpSingleline lt S s 它检测尾随空格并仅忽略带有空格的行 它应该允许缩进的空白行 它在大多数情况下工作正常 但它抱怨使用空行的 javado
  • 适用于多种设备的演示技术

    我们的应用程序应该为多种设备提供服务 从简单的智能手机 iPhone 触摸屏到普通浏览器 应用程序是分层的 因此我们可以重用业务层和持久层 然而 我们也想对单个表示层进行编程 例如 我知道 ASP NET 根据浏览器类型生成不同的 html
  • 在 Mac (OS High Sierra) 上安装 Flask-mysqldb (python 3) 时出错

    在按照在线教程创建 Flask Web 应用程序时 我尝试使用以下命令安装 Flask mysqldbsudo pip3 install flask mysqldb 这会导致安装错误 该错误似乎源于依赖性问题 错误信息如下Command u
  • 如何在Python中根据椭圆的一般方程绘制椭圆

    我知道matplotlib可以根据椭圆的中心 半长轴长度 半短轴长度以及x轴和长轴之间的角度来绘制椭圆 但是有没有简单的方法可以像Matlab一样根据椭圆的一般方程绘制椭圆 ezplot 3 x 2 2 x y 4 y 2 5 我找到了一种
  • 如何测试文件列表是否存在?

    我有一个列出文件名的文件 每个文件名都在自己的行上 我想测试每个文件名是否存在于特定目录中 例如 文件的一些示例行可能是 mshta dll foobar dll somethingelse dll 我感兴趣的目录是X Windows Sy
  • 在 Azure 角色中使用 SmtpClient 时出现“不支持请求的功能”异常

    在 Azure Web 或辅助角色中使用 SmtpClient 时出现异常 我创建了一个控制台应用程序 通过 RDP 在角色虚拟机上手动运行以进行重现 using System using System Net using System N
  • 使用节点和角度应用程序刷新页面时获取 404 页面

    我是新来的Angular 我尝试使用创建 CRUD 操作Nodejs and Angular 我在用Nodejs and Express对于支持和Angular对于前端 当我使用 routerLink 在页面上导航时 它工作正常 但是当我在
  • 具有递归可变参数函数的字符串流?

    我希望能够使用 ostringstream 将多个不同的参数组合成一个字符串 这样我就可以记录生成的单个字符串 而不会出现任何随机问题 我到目前为止 template
  • OAuth 2.0 - 客户端秘密是否必须是“秘密”?

    我只是好奇 我需要保留客户端秘密来自 Google FaceBook 其他 OAuth 2 0 提供商的 秘密 地方 据我所知 一旦我指定了非常严格的回调网址 就可以使用客户端秘密参数完成很少的事情 例如 将 秘密 密钥提交到 github
  • 从 jck 密钥存储导出密钥

    我们有一个包含秘密密钥的 jck 密钥库 jceks 格式 它是使用 keytool 命令生成的 keytool genseckey alias mykey keyalg AES keysize 256 storetype jceks ke
  • django - HttpRequest 对象没有属性“会话”

    我似乎无法让会话正常工作 Django 抱怨 HttpRequest 对象没有名为 session 的属性 在文档中明确指出 如果您启用了中间件 并且在安装的应用程序中启用了 django contrib sessions 那么您就可以开始
  • RPG对话引擎/结构[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我一直对 RPG 角色扮演游戏 中涉及的数据结构感兴趣 我特别对对话和基于事件的行动感到好奇 例如 如果我在某个时刻接近 NPCx在游
  • Shiny DT::renderDataTable 的“全选”复选框

    我想要一个复选框 用于选择 Shiny 中标准 DT renderDataTable 中显示的所有行 显示是关键 因为您应用的过滤器和整个数据表之间存在差异 是否有任何 DT 扩展可以做到这一点 我的编码技能很基础 因此我无法编写等效的 J
  • libgdx 中的多色文本

    我发现夜间构建中的 LibGDX 中有一个新组件 TextArea这是scene2d ui包裹 拥有这样的组件真是太好了 非常易于使用 但我缺少的是对多色文本的一些支持 我想用不同的颜色突出显示文本中的一些关键字 但我不知道如何使用当前的
  • @Autowired 和 @Service 从控制器工作,但不从不同的包工作

    我需要帮助理解背后的概念 Autowired and Service 我有一个 DAO 定义为 Service和控制器 Autowired一切看起来都很好 但是 我使用相同的 Autowired在不同的班级 那么它不起作用 Example
  • 从 Google 表格中的另一张表格左侧进行 VLOOKUP

    我有一个电子表格 该电子表格是在 Google 表格中我自己的电子表格外部管理的 我正在尝试将数据从外部工作表提取到我的工作表中 以便我有一个集中位置来显示与我相关的任务项 这是我认为可行的公式 VLOOKUP My Name IMPORT
  • 通过索引访问字符串枚举

    我在 C 中有一个枚举 索引需要用字符串表示 String 类型的 Swift 枚举如何通过整数索引使用 我想将枚举复制到 Swift 将类型设置为字符串并定义所有原始值以显示文本 然后使用 C 枚举值提取 Swift String 枚举的
  • 了解 constexpr 变量初始化的完整表达式

    下面的程序编译成功 https godbolt org z 3c1xsh6oz与所有主要编译器 struct S constexpr S const S constexpr S default int main void S s1 cons
  • Scipy的solve_ivp函数的文档中字母k是什么意思?

    Solve ivp 是 Scipy 中的初始值问题求解器函数 简单来说 scipy integrate solve ivp 乐趣 t span y0 方法 RK45 t eval 无 dense output False 事件 无 向量化
  • 无法使用 Scala 在 Apache Spark 中执行用户定义的函数

    我有以下数据框 time stamp 0 sender ip 1 receiver ip 2 s port 3 r port 4 acknum 5 winnum 6 len 7 count 06 36 16 293711 10 0 0 1