我有以下数据框:
+---------------+-----------+-------------+--------+--------+--------+--------+------+-----+
| time_stamp_0|sender_ip_1|receiver_ip_2|s_port_3|r_port_4|acknum_5|winnum_6| len_7|count|
+---------------+-----------+-------------+--------+--------+--------+--------+------+-----+
|06:36:16.293711| 10.0.0.1| 10.0.0.2| 55518| 5001| 0| 58| 65161| 130|
|06:36:16.293729| 10.0.0.1| 10.0.0.2| 55518| 5001| 0| 58| 65913| 130|
|06:36:16.293743| 10.0.0.1| 10.0.0.2| 55518| 5001| 0| 58|131073| 130|
|06:36:16.293765| 10.0.0.1| 10.0.0.2| 55518| 5001| 0| 58|196233| 130|
|06:36:16.293783| 10.0.0.1| 10.0.0.2| 55518| 5001| 0| 58|196985| 130|
|06:36:16.293798| 10.0.0.1| 10.0.0.2| 55518| 5001| 0| 58|262145| 130|
|06:36:16.293820| 10.0.0.1| 10.0.0.2| 55518| 5001| 0| 58|327305| 130|
|06:36:16.293837| 10.0.0.1| 10.0.0.2| 55518| 5001| 0| 58|328057| 130|
|06:36:16.293851| 10.0.0.1| 10.0.0.2| 55518| 5001| 0| 58|393217| 130|
|06:36:16.293873| 10.0.0.1| 10.0.0.2| 55518| 5001| 0| 58|458377| 130|
|06:36:16.293890| 10.0.0.1| 10.0.0.2| 55518| 5001| 0| 58|459129| 130|
|06:36:16.293904| 10.0.0.1| 10.0.0.2| 55518| 5001| 0| 58|524289| 130|
|06:36:16.293926| 10.0.0.1| 10.0.0.2| 55518| 5001| 0| 58|589449| 130|
|06:36:16.293942| 10.0.0.1| 10.0.0.2| 55518| 5001| 0| 58|590201| 130|
|06:36:16.293956| 10.0.0.1| 10.0.0.2| 55518| 5001| 0| 58|655361| 130|
|06:36:16.293977| 10.0.0.1| 10.0.0.2| 55518| 5001| 0| 58|720521| 130|
|06:36:16.293994| 10.0.0.1| 10.0.0.2| 55518| 5001| 0| 58|721273| 130|
|06:36:16.294007| 10.0.0.1| 10.0.0.2| 55518| 5001| 0| 58|786433| 130|
|06:36:16.294028| 10.0.0.1| 10.0.0.2| 55518| 5001| 0| 58|851593| 130|
|06:36:16.294045| 10.0.0.1| 10.0.0.2| 55518| 5001| 0| 58|852345| 130|
+---------------+-----------+-------------+--------+--------+--------+--------+------+-----+
only showing top 20 rows
我必须向我的产品添加功能和标签dataframe
来预测计数值。但是,当我运行代码时,我会看到以下错误:
Failed to execute user defined function(anonfun$15: (int, int, string, string, int, int, int, int, int) => vector)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
I also cast(IntegerType)
我的所有功能,但再次出现错误。这是我的代码:
val Frist_Dataframe = sqlContext.createDataFrame(Row_Dstream_Train, customSchema)
val toVec9 = udf[Vector, Int, Int, String, String, Int, Int, Int, Int, Int] { (a, b, c, d, e, f, g, h, i) =>
val e3 = c match {
case "10.0.0.1" => 1
case "10.0.0.2" => 2
case "10.0.0.3" => 3
}
val e4 = d match {
case "10.0.0.1" => 1
case "10.0.0.2" => 2
case "10.0.0.3" => 3
}
Vectors.dense(a, b, e3, e4, e, f, g, h, i)
}
val final_df = Dataframe.withColumn(
"features",
toVec9(
// casting into Timestamp to parse the string, and then into Int
$"time_stamp_0".cast(TimestampType).cast(IntegerType),
$"count".cast(IntegerType),
$"sender_ip_1",
$"receiver_ip_2",
$"s_port_3".cast(IntegerType),
$"r_port_4".cast(IntegerType),
$"acknum_5".cast(IntegerType),
$"winnum_6".cast(IntegerType),
$"len_7".cast(IntegerType)
)
).withColumn("label", (Dataframe("count"))).select("features", "label")
Final_df.show()
val trainingTest = final_df.randomSplit(Array(0.8, 0.2))
val TrainingDF = trainingTest(0).toDF()
val TestingDF=trainingTest(1).toDF()
TrainingDF.show()
TestingDF.show()
我的依赖项还有:
libraryDependencies ++= Seq(
"co.theasi" %% "plotly" % "0.2.0",
"org.apache.spark" %% "spark-core" % "2.1.1",
"org.apache.spark" %% "spark-sql" % "2.1.1",
"org.apache.spark" %% "spark-hive" % "2.1.1",
"org.apache.spark" %% "spark-streaming" % "2.1.1",
"org.apache.spark" %% "spark-mllib" % "2.1.1"
)
最有趣的一点是,如果我改变我所有的cast(IntegerType)
to cast(TimestampType).cast(IntegerType)
在我的代码的最后一部分,错误消失,输出将如下所示:
+--------+-----+
|features|label|
+--------+-----+
| null| 130|
| null| 130|
| null| 130|
| null| 130|
| null| 130|
| null| 130|
| null| 130|
| null| 130|
| null| 130|
| null| 130|
| null| 130|
| null| 130|
| null| 130|
| null| 130|
| null| 130|
| null| 130|
| null| 130|
| null| 130|
| null| 130|
| null| 130|
+--------+-----+
UPDATE:应用@Ramesh Maharjan 解决方案后,我的数据帧的结果运行良好,但是,每当我尝试将 Final_df 数据帧拆分为训练和测试时,结果如下所示,并且我仍然遇到相同的空行问题。
+--------------------+-----+
| features|label|
+--------------------+-----+
| null| 130|
| null| 130|
| null| 130|
| null| 130|
| null| 130|
| null| 130|
| null| 130|
| null| 130|
|[1.497587776E9,13...| 130|
|[1.497587776E9,13...| 130|
|[1.497587776E9,13...| 130|
|[1.497587776E9,13...| 130|
|[1.497587776E9,13...| 130|
|[1.497587776E9,13...| 130|
|[1.497587776E9,13...| 130|
|[1.497587776E9,13...| 130|
|[1.497587776E9,13...| 130|
|[1.497587776E9,13...| 130|
|[1.497587776E9,13...| 130|
|[1.497587776E9,13...| 130|
+--------------------+-----+
你能帮助我吗?