我正在使用 pySpark 2.3.0,并创建了一个非常简单的 Spark 数据框来测试 VectorAssembler 的功能。这是较大数据框的子集,其中我只选择了一些数字(双精度数据类型)列:
>>>cols = ['index','host_listings_count','neighbourhood_group_cleansed',\
'bathrooms','bedrooms','beds','square_feet', 'guests_included',\
'review_scores_rating']
>>>test = df[cols]
>>>test.take(3)
[行(索引=0,host_listings_count=1,
neighbourhood_group_cleansed=无,浴室=1.5,卧室=2.0,
床位=3.0,square_feet=无,guests_included=1,
review_scores_ rating = 100.0),行(索引= 1,host_listings_count = 1,
neighbourhood_group_cleansed=无,浴室=1.5,卧室=2.0,
床位=3.0,square_feet=无,guests_included=1,
review_scores_ rating = 100.0),行(索引= 2,host_listings_count = 1,
neighbourhood_group_cleansed=无,浴室=1.5,卧室=2.0,
床位=3.0,square_feet=无,guests_included=1,
review_scores_ rating=100.0)]
从上面我看来,这个 Spark 数据框没有任何问题。因此,我创建了如下所示的汇编器并得到了显示的错误。可能出了什么问题?
>>>from pyspark.ml.feature import VectorAssembler
>>>assembler = VectorAssembler(inputCols=cols, outputCol="features")
>>>output = assembler.transform(test)
>>>output.take(3)
Py4JJavaError:调用 o279.collectToPython 时发生错误。 :
org.apache.spark.SparkException:作业由于阶段失败而中止:
阶段 5.0 中的任务 0 失败 1 次,最近一次失败:丢失任务 0.0
在阶段 5.0(TID 10,本地主机,执行器驱动程序):
org.apache.spark.SparkException:无法执行用户定义
函数($anonfun$3:
(结构)
=>向量)在org.apache.spark.sql.catalyst.expressions.GenerateClass$GenerateIterator.processNext(未知
来源)位于
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
在
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
在
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
在
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
在
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
在
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
在
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
在 org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 处
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 在
org.apache.spark.scheduler.Task.run(Task.scala:99) 在
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
在
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
在
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
在 java.lang.Thread.run(Thread.java:748) 引起的:
org.apache.spark.SparkException:要组装的值不能为空。
在
org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:160)
在
org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:143)
在
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
在
scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
在
org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:143)
在
org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:99)
在
org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:98)
... 16 更多
驱动程序堆栈跟踪:位于
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
在
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
在
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
在
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
在 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
在
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
在
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
在
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
在 scala.Option.foreach(Option.scala:257) 处
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
在
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
在
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
在
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
在 org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
在
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
在 org.apache.spark.SparkContext.runJob(SparkContext.scala:1925) 处
org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)在
org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)在
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
在
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
在
org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2768)
在
org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2765)
在
org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2765)
在
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
在
org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2788)
在 org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2765)
在 sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法) 处
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
在
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
在 java.lang.reflect.Method.invoke(Method.java:498) 处
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 在
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 在
py4j.Gateway.invoke(Gateway.java:280) 在
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
在 py4j.commands.CallCommand.execute(CallCommand.java:79) 处
py4j.GatewayConnection.run(GatewayConnection.java:214) 在
java.lang.Thread.run(Thread.java:748) 原因:
org.apache.spark.SparkException:无法执行用户定义
函数($anonfun$3:
(结构)
=>向量)在org.apache.spark.sql.catalyst.expressions.GenerateClass$GenerateIterator.processNext(未知
来源)位于
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
在
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
在
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
在
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
在
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
在
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
在
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
在 org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 处
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 在
org.apache.spark.scheduler.Task.run(Task.scala:99) 在
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
在
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
在
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 更多 引起者:org.apache.spark.SparkException:值
汇编不能为空。在
org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:160)
在
org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:143)
在
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
在
scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
在
org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:143)
在
org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:99)
在
org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:98)
... 16 更多