创建数据框时如何解决 scala.MatchError

2023-12-08

我有一个具有复杂结构行的文本文件。我正在使用客户转换器,它将给定的字符串(行)转换为 Pojo 类(countryInfo)。转换后,我正在构建 DF。 POJO 类有一个字段,它是自定义类型列表 (GlobalizedPlayTimeWindows)。我创建了一个与此 GlobalizedPlayTimeWindows 匹配的结构,并尝试将现有的自定义类型转换为该结构,但不断收到错误。

我创建的结构类型:

import org.apache.spark.sql.types._

  val PlayTimeWindow =
    StructType(
      StructField("startTime", DateType, true) ::
        StructField("endTime", DateType, true) :: Nil)


  val globalizedPlayTimeWindows =
    StructType(
                StructField( "countries", ArrayType(StringType, true), true )  ::
        StructField( "purchase", ArrayType(PlayTimeWindow, true), true )  ::
        StructField( "rental", ArrayType(PlayTimeWindow, true), true )  ::
        StructField( "free", ArrayType(PlayTimeWindow, true), true )  ::
        StructField( "download", ArrayType(PlayTimeWindow, true), true )  ::
        StructField( "advertisement", ArrayType(PlayTimeWindow, true), true )  ::
        StructField( "playTypeIds", ArrayType(PlayTimeWindow, true), true )  ::
        StructField( "benefitIds", MapType(StringType, ArrayType(PlayTimeWindow, true), true), true)  :: Nil)



  val schema =    StructType(
     StructField("id", StringType, true) ::
      StructField("jazzCount", IntegerType, true) ::
      StructField("rockCount", IntegerType, true) ::
      StructField("classicCount", IntegerType, true) ::
      StructField("nonclassicCount", IntegerType, true) ::
      StructField("musicType", StringType, true) ::
      StructField( "playType", ArrayType(globalizedPlayTimeWindows, true), true) :: Nil)

数据框创建:

val mappingFile = sc.textFile("s3://input.....")

val inputData = mappingFile.map(x=> {
    val countryInfo = MappingUtils.getCountryInfo(x)

    val id = countryInfo.getId

    val musicType = if(countryInfo.getmusicType != null && StringUtils.isNotBlank(countryInfo.getmusicType)) countryInfo.getmusicType else "UNKOWN_TYPE"


    val classicWestern = if (countryInfo.getClassic() != null && countryInfo.getClassic.size() > 0) true  else false

    var nonclassicCount : Int = 0
    var  classicCount : Int = 0

    if (classicWestern) {
      classicCount = 1
    } else {
      nonclassicCount = 1
    }


    val jazzrock = if (countryInfo.getmusicType() != null && countryInfo.getmusicType != "JAZZ") true  else false
    var jazzCount : Int = 0
    var  rockCount : Int = 0

    if (jazzrock) {
      jazzCount = 1
    } else {
      rockCount = 1
    }

    val playType = if(countryInfo.getPlayTimeWindows != null && countryInfo.getPlayTimeWindows.size > 0 ) { countryInfo.getPlayTimeWindows.asScala.toList } else null

  (id, jazzCount, rockCount, classicCount, nonclassicCount, musicType ,playType)
  }).map{case (id, jazzCount, rockCount, classicCount, nonclassicCount, musicType,playType) => Row(id, jazzCount, rockCount, classicCount, nonclassicCount, musicType,playType)
  }.persist(DISK_ONLY)

 val inputDataDF = sqlContext.createDataFrame(inputData, schema)

inputDataDF.printSchema :

root 
|-- id: string (nullable = true) 
|-- jazzCount: integer (nullable = true) 
|-- rockCount: integer (nullable = true) 
|-- classicCount: integer (nullable = true) 
|-- nonclassicCount: integer (nullable = true) 
|-- musicType: string (nullable = true) 
|-- playType: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- countries: array (nullable = true) 
| | | |-- element: string (containsNull = true) 
| | |-- purchase: array (nullable = true) 
| | | |-- element: struct (containsNull = true) 
| | | | |-- startTime: date (nullable = true) 
| | | | |-- endTime: date (nullable = true) 
| | |-- rental: array (nullable = true) 
| | | |-- element: struct (containsNull = true) 
| | | | |-- startTime: date (nullable = true) 
| | | | |-- endTime: date (nullable = true) 
| | |-- free: array (nullable = true) 
| | | |-- element: struct (containsNull = true) 
| | | | |-- startTime: date (nullable = true) 
| | | | |-- endTime: date (nullable = true) 
| | |-- download: array (nullable = true) 
| | | |-- element: struct (containsNull = true) 
| | | | |-- startTime: date (nullable = true) 
| | | | |-- endTime: date (nullable = true) 
| | |-- advertisement: array (nullable = true) 
| | | |-- element: struct (containsNull = true) 
| | | | |-- startTime: date (nullable = true) 
| | | | |-- endTime: date (nullable = true) 
| | |-- playTypeIds: array (nullable = true) 
| | | |-- element: struct (containsNull = true) 
| | | | |-- startTime: date (nullable = true) 
| | | | |-- endTime: date (nullable = true) 
| | |-- benefitIds: map (nullable = true) 
| | | |-- key: string 
| | | |-- value: array (valueContainsNull = true) 
| | | | |-- element: struct (containsNull = true) 
| | | | | |-- startTime: date (nullable = true) 
| | | | | |-- endTime: date (nullable = true) 

Struct 的等效 POJO :

@Data
public GlobalizedPlayTimeWindows(

    private final List<String> countries;

    private final List<PlayTimeWindow> purchase;

    private final List<PlayTimeWindow> rental;

    private final List<PlayTimeWindow> free;

    private final List<PlayTimeWindow> download;

    private final List<PlayTimeWindow> advertisement;

    private final List<PlayTimeWindow> preorderExclusive;

    private final Map<String, List<PlayTimeWindow>> playTypeIds;

}

@Data
public class PlayTimeWindow {

    private final Date startTime;

    private final Date endTime;
}

我收到的错误:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 4 times, most recent failure: Lost task 0.3 in stage 12.0 (TID 393, ip-172-31-14-43.ec2.internal): scala.MatchError: GlobalizedPlayTimeWindows(countries=[US], purchase=null, rental=null, free=null, download=null, advertisement=null, preorderExclusive=null, playTypeIds=null) (of class com.model.global.GlobalizedPlayTimeWindows) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:255) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter$$anonfun$toCatalystImpl$2.apply(CatalystTypeConverters.scala:163) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:163) at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:153) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401) at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492) at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212) at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165) at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174) at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538) at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1544) at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1414) at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1413) at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138) at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1413) at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1495) at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:171) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:394) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:355) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:363) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$c57ec8bf9b0d5f6161b97741d596ff0$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:163) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$c57ec8bf9b0d5f6161b97741d596ff0$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:168) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$c57ec8bf9b0d5f6161b97741d596ff0$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:170) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$c57ec8bf9b0d5f6161b97741d596ff0$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:172) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$c57ec8bf9b0d5f6161b97741d596ff0$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:174) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$c57ec8bf9b0d5f6161b97741d596ff0$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:176) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$c57ec8bf9b0d5f6161b97741d596ff0$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:178) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:180) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:182) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:184) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:186) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:188) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:190) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:192) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:194) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:196) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:198) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:200) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:202) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:204) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:206) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:208) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:210) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:212) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:214) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:216) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:218) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:220) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:222) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:224) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:226) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:228) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:230) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:232) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:234) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:236) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:238) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:240) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:242) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:244) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:246) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:248) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:250) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:252) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:254) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:256) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:258) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:260) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:262) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:264) at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:266) at $iwC$$iwC$$iwC$$iwC.<init>(<console>:268) at $iwC$$iwC$$iwC.<init>(<console>:270) at $iwC$$iwC.<init>(<console>:272) at $iwC.<init>(<console>:274) at <init>(<console>:276) at .<init>(<console>:280) at .<clinit>(<console>) at .<init>(<console>:7) at .<clinit>(<console>) at $print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.zeppelin.spark.SparkInterpreter.interpretInput(SparkInterpreter.java:664) at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:629) at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:622) at org.apache.zeppelin.interpreter.ClassloaderInterpreter.interpret(ClassloaderInterpreter.java:57) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:276) at org.apache.zeppelin.scheduler.Job.run(Job.java:170) at org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:118) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

还尝试从 inputData 执行隐式 toDF :

inputData.toDF.printSchema 但出现错误:

java.lang.UnsupportedOperationException: Schema for type com.model.global.GlobalizedPlayTimeWindows is not supported at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:718) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30) at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:667) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:693) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:691) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at 

好的 - 长话短说,这是一个可行的解决方案。基本上你在这里有两个不同的问题:

  1. 您期望 Spark 能够将任意 Java 类解析为 DataFrame - 事实并非如此,Spark 只能解析特定类型,通常是: Scala 集合;基元;java.sql.Date;和任何子类scala.Product- 例如,所有案例类和元组。因此 - 正如评论中所讨论的,要做的第一件事是将现有结构转换为此类类型。

  2. Your schema也不匹配您的 Java 类 - 有一些差异:

    • 模式的playType was an Array of GlobalizedPlayTimeWindows,而您的代码创建了一个single项目而不是数组
    • globalizedPlayTimeWindows包含模式benefitIdsJava 类中不存在
    • playTypeIds模式是一个Array,而 Java 类中同名的字段是Map

所以 - 我纠正了所有这些(更改了架构以匹配数据,您可以选择以不同的方式修复这些问题,只要它们match)并完成了Java类到case类的转换:

// corrected schemas:
val PlayTimeWindow =
  StructType(
    StructField("startTime", DateType, true) ::
      StructField("endTime", DateType, true) :: Nil)

val globalizedPlayTimeWindows =
  StructType(
    StructField( "countries", ArrayType(StringType, true), true )  ::
      StructField( "purchase", ArrayType(PlayTimeWindow, true), true )  ::
      StructField( "rental", ArrayType(PlayTimeWindow, true), true )  ::
      StructField( "free", ArrayType(PlayTimeWindow, true), true )  ::
      StructField( "download", ArrayType(PlayTimeWindow, true), true )  ::
      StructField( "advertisement", ArrayType(PlayTimeWindow, true), true )  ::
      StructField( "preorderExclusive", ArrayType(PlayTimeWindow, true), true )  ::
      StructField( "playTypeIds", MapType(StringType, ArrayType(PlayTimeWindow, true), true), true )  ::
      Nil)

val schema =    StructType(
  StructField("id", StringType, true) ::
    StructField("jazzCount", IntegerType, true) ::
    StructField("rockCount", IntegerType, true) ::
    StructField("classicCount", IntegerType, true) ::
    StructField("nonclassicCount", IntegerType, true) ::
    StructField("musicType", StringType, true) ::
    StructField( "playType", globalizedPlayTimeWindows, true) :: Nil)

// note the use of java.sql.Date, java.util.Date not supported
case class PlayTimeWindowScala(startTime: java.sql.Date, endTime: java.sql.Date)

case class GlobalizedPlayTimeWindowsScala (countries: List[String],
                                           purchase: List[PlayTimeWindowScala],
                                           rental: List[PlayTimeWindowScala],
                                           free: List[PlayTimeWindowScala],
                                           download: List[PlayTimeWindowScala],
                                           advertisement: List[PlayTimeWindowScala],
                                           preorderExclusive: List[PlayTimeWindowScala],
                                           playTypeIds: Map[String, List[PlayTimeWindowScala]])

// some conversion methods:
def toSqlDate(jDate: java.util.Date): java.sql.Date = new java.sql.Date(jDate.getTime)

import scala.collection.JavaConverters._

def toScalaWindowList(l: java.util.List[PlayTimeWindow]): List[PlayTimeWindowScala] = {
  l.asScala.map(javaWindow => PlayTimeWindowScala(toSqlDate(javaWindow.startTime), toSqlDate(javaWindow.endTime))).toList
}

def toScalaGlobalizedWindows(javaObj: GlobalizedPlayTimeWindows): GlobalizedPlayTimeWindowsScala = {
  GlobalizedPlayTimeWindowsScala(
    javaObj.countries.asScala.toList,
    toScalaWindowList(javaObj.purchase),
    toScalaWindowList(javaObj.rental),
    toScalaWindowList(javaObj.free),
    toScalaWindowList(javaObj.download),
    toScalaWindowList(javaObj.advertisement),
    toScalaWindowList(javaObj.preorderExclusive),
    javaObj.playTypeIds.asScala.mapValues(toScalaWindowList).toMap
  )
}

val parsedJavaData: RDD[(String, Int, Int, Int, Int, String, GlobalizedPlayTimeWindows)] = mappingFile.map(x => {
   // your code producing the tuple
})

// convert to Scala objects and into a Row:
val inputData = parsedJavaData.map{
  case (id, jazzCount, rockCount, classicCount, nonclassicCount, musicType, javaPlayType) =>
    val scalaPlayType = toScalaGlobalizedWindows(javaPlayType)
    Row(id, jazzCount, rockCount, classicCount, nonclassicCount, musicType, scalaPlayType)
}

// now - this works
val inputDataDF = sqlContext.createDataFrame(inputData, schema)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

创建数据框时如何解决 scala.MatchError 的相关文章

随机推荐

  • 将日期时间转换为日期格式 dd/mm/yyyy

    我有一个 DateTime 对象2 19 2011 12 00 00 AM 我想将此对象转换为字符串19 2 2011 请帮我将日期时间转换为字符串格式 DateTime dt DateTime ParseExact yourObject
  • 为什么验证不适用于 DTO 类型的对象,而仅适用于实体

    我在类型对象上设置了注释dto 与 over 类型的对象相同Entity 该注释适用于entities 但它不适用于类型的对象dto 我在 工作春季启动 应用程序属性 validate packageid size The field PA
  • 如何解决 nginx 上的 404 错误?

    我有一个 Angular 4 SPA 应用程序 我正在使用 docker 进行生产 到目前为止看起来还不错 通过终端我去 dist文件夹 然后我让 docker 指向该文件夹的内容dist使用以下命令 docker run d p 9090
  • 应用程序上下文变量的值因应用程序错误而丢失

    我注意到 当我的应用程序遇到错误时 应用程序上下文变量的值也会重新初始化为其原始值 而不是更新后的值 根据我的理解 发生这种情况是因为应用程序被重新创建了 当应用程序发生错误时 如何保存和恢复应用程序上下文变量的值 如果您能更详细地解释我的
  • D3:如何通过更改数据文件源来动态刷新图表?

    如何通过更改文件 d3 访问来按需更新数据 例如 只需单击一下 它就会从新的数据文件中读取数据 并像 AJAX 一样向图表添加更多节点 我使用 d3 tsv 读取 data tsv 这是许多相同格式的文件之一 我做了一个简单的图表来说明我的
  • Matlab中无放回加权采样

    我有人口p向量中的索引和相应权重w 我想要得到k该人群的样本无需更换其中选择与随机权重成比例 我知道randsample可以用于通过替换进行选择 J randsample p k true w 但是当我用参数调用它时false代替true
  • 优化 JavaFX 中的内存泄漏

    我写了一段代码 让字母在我写的时候出现并飞翔 问题是它消耗大量内存 我已经对其进行了一些优化 分享path对象并在侦听器中更新其参数 每次打印新字母时调用 gc 但它仍然使用大量内存 那么关于如何减少其内存利用率有什么想法吗 提前致谢 pa
  • Web浏览器控制文件下载对话框绕过

    实际上我想使用 Webbrowser Control 从网站下载文件 但由于文件下载对话框 我无法自动执行下载过程 对于绕过它 我想使用 Web 客户端 并且需要将所有会话和 Cookie 从 Web 浏览器控件传输到 Web 客户端 或者
  • 获取“队列对象只能通过继承在进程之间共享”,但我没有使用队列

    我正在尝试使用 ProcessPoolExecutor 但收到错误 队列对象只能通过继承在进程之间共享 但我没有使用队列 至少没有明确使用 我找不到任何东西可以解释我做错了什么 这是一些演示该问题的代码 不是我的实际代码 from conc
  • 无法使用 Maven 执行 Junit5 测试 [重复]

    这个问题在这里已经有答案了 Maven执行 mvn clean test 我正在尝试使用junit5对于我的一个 Maven 项目 但无法在test阶段使用
  • 不规则间隔的热图

    我想根据下面的矩阵和边界创建一个热图 然后绘制数据 x1 x2 以查看属于各个类别的点 我能做的最好的事情就是使用seaborn 但是由于两个轴上的类间隔不均匀 因此很难读取两个轴上对应于点的值 有没有办法 使用seaborn或任何其他库
  • 从两个或多个现有表创建新表 (MySQL)

    问题 是否可以在 mySQL 中从两个或多个现有表创建一个表 Details 我可以创建一个像这样的表吗 CREATE TABLE IF NOT EXISTS USERNAME AGE INT NOT NULL CREATE TABLE I
  • 在 C# 中编译邮件的 HTML 正文

    我使用以下代码供用户向我的电子邮件发送报价 using System Net Mail MailMessage mail new MailMessage mail From new MailAddress Email Text mail T
  • 在无头模式下使用 Firefox 68.9.0esr 以及 GeckoDriver Selenium 和 Python 时出现 504 网关超时错误

    我用硒制作了一个容器图像 我用了一个ubi image来自 RedHat 作为基础镜像 我安装了以下版本的 geckodriver 和 Mozilla Firefox 壁虎驱动程序 0 26 0 e9783a644016 2019 10 1
  • 在 Windows 8.1 中安装 virtualenvwrapper-powershell,导入模块失败

    我正在尝试遵循指南here 我被困在Import Module virtualenvwrapper PowerShell 不断给我错误 import module 指定的模块 virtualenvwrapper 不是 已加载 因为在任何模块
  • 未找到 AWS ec2 winreg

    我正在尝试从亚马逊 EC2 大型实例运行 python 应用程序 然而 它在 scipy 中抱怨 因为它找不到名为 winreg 的东西 我不知道如何重新配置 它 所以它不再是问题 python2 app py Running on htt
  • Chrome 扩展如何在页面底部添加浮动栏?

    我正在创建一个需要注入浮动元素的 chrome 扩展 即position fixed 在页面底部 我的要求是 我需要从内容脚本访问其中的元素 这是因为我将事件附加到按钮 以便用户可以从浮动栏在当前选项卡上执行操作 我希望它的样式保持独立于当
  • 显示斯坦福 NER 置信度分数

    我使用斯坦福 NER CRFClassifier 从新闻文章中提取命名实体 为了实现主动学习 我想知道每个标记实体的类的置信度分数是多少 显示示例 地点 0 20 人员 0 10 组织 0 60 其他 0 10 这是我从文本中提取命名实体的
  • 启动 ASP.NET 表单身份验证

    我开始学习 ASP NET 表单身份验证 并且正在寻找一篇好文章来帮助我入门 我之前听说 ASP NET 表单身份验证使用大量数据库表 前面带有aspnet 但是我发现的任何例子都没有显示这一点 例如我认为有一个aspnet users t
  • 创建数据框时如何解决 scala.MatchError

    我有一个具有复杂结构行的文本文件 我正在使用客户转换器 它将给定的字符串 行 转换为 Pojo 类 countryInfo 转换后 我正在构建 DF POJO 类有一个字段 它是自定义类型列表 GlobalizedPlayTimeWindo