toDF 的值不是 org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)] 的成员

2024-01-09

在 SPARK 2.0 中使用 SCALA 将 pre-LDA 转换转换为数据帧时遇到编译错误。抛出错误的具体代码如下:

val documents = PreLDAmodel.transform(mp_listing_lda_df)
  .select("docId","features")
  .rdd
  .map{ case Row(row_num: Long, features: MLVector) => (row_num, features) }
  .toDF()

完整的编译错误是:

Error:(132, 8) value toDF is not a member of org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)]
possible cause: maybe a semicolon is missing before `value toDF'?
      .toDF()

这是完整的代码:

import java.io.FileInputStream
import java.sql.{DriverManager, ResultSet}
import java.util.Properties

import org.apache.spark.SparkConf
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.clustering.LDA
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel, RegexTokenizer, StopWordsRemover}
import org.apache.spark.ml.linalg.{Vector => MLVector}
import org.apache.spark.mllib.clustering.{LDA => oldLDA}
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

object MPClassificationLDA {
  /*Start: Configuration variable initialization*/
  val props = new Properties
  val fileStream = new FileInputStream("U:\\JIRA\\MP_Classification\\target\\classes\\mpclassification.properties")
  props.load(fileStream)
  val mpExtract = props.getProperty("mpExtract").toString
  val shard6_db_server_name = props.getProperty("shard6_db_server_name").toString
  val shard6_db_user_id = props.getProperty("shard6_db_user_id").toString
  val shard6_db_user_pwd = props.getProperty("shard6_db_user_pwd").toString
  val mp_output_file = props.getProperty("mp_output_file").toString
  val spark_warehouse_path = props.getProperty("spark_warehouse_path").toString
  val rf_model_file_path = props.getProperty("rf_model_file_path").toString
  val windows_hadoop_home = props.getProperty("windows_hadoop_home").toString
  val lda_vocabulary_size = props.getProperty("lda_vocabulary_size").toInt
  val pre_lda_model_file_path = props.getProperty("pre_lda_model_file_path").toString
  val lda_model_file_path = props.getProperty("lda_model_file_path").toString
  fileStream.close()
  /*End: Configuration variable initialization*/

  val conf = new SparkConf().set("spark.sql.warehouse.dir", spark_warehouse_path)

  def main(arg: Array[String]): Unit = {
    //SQL Query definition and parameter values as parameter upon executing the Object
    val cont_id = "14211599"
    val top = "100000"
    val start_date = "2016-05-01"
    val end_date = "2016-06-01"

    val mp_spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("MPClassificationLoadLDA")
      .config(conf)
      .getOrCreate()
    MPClassificationLDACalculation(mp_spark, cont_id, top, start_date, end_date)
    mp_spark.stop()
  }

  private def MPClassificationLDACalculation
  (mp_spark: SparkSession
   ,cont_id: String
   ,top: String
   ,start_date: String
   ,end_date: String
  ): Unit = {

    //DB connection definition
    def createConnection() = {
      Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver").newInstance();
      DriverManager.getConnection("jdbc:sqlserver://" + shard6_db_server_name + ";user=" + shard6_db_user_id + ";password=" + shard6_db_user_pwd);
    }

    //DB Field Names definition
    def extractvalues(r: ResultSet) = {
      Row(r.getString(1),r.getString(2))
    }

    //Prepare SQL Statement with parameter value replacement
    val query = """SELECT docId = audt_id, text = auction_title FROM brands6.dbo.uf_ds_marketplace_classification_listing(@cont_id, @top, '@start_date', '@end_date') WHERE ? < ? OPTION(RECOMPILE);"""
      .replaceAll("@cont_id", cont_id)
      .replaceAll("@top", top)
      .replaceAll("@start_date", start_date)
      .replaceAll("@end_date", end_date)
      .stripMargin

    //Connect to Source DB and execute the Prepared SQL Steatement
    val mpDataRDD = new JdbcRDD(mp_spark.sparkContext
      ,createConnection
      ,query
      ,lowerBound = 0
      ,upperBound = 10000000
      ,numPartitions = 1
      ,mapRow = extractvalues)

    val schema_string = "docId,text"
    val fields = StructType(schema_string.split(",")
      .map(fieldname => StructField(fieldname, StringType, true)))

    //Create Data Frame using format identified through schema_string
    val mpDF = mp_spark.createDataFrame(mpDataRDD, fields)
    mpDF.collect()

    val mp_listing_tmp = mpDF.selectExpr("cast(docId as long) docId", "text")
    mp_listing_tmp.printSchema()
    println(mp_listing_tmp.first)

    val mp_listing_lda_df = mp_listing_tmp.withColumn("docId", mp_listing_tmp("docId"))
    mp_listing_lda_df.printSchema()

    val tokenizer = new RegexTokenizer()
      .setInputCol("text")
      .setOutputCol("rawTokens")
      .setMinTokenLength(2)

    val stopWordsRemover = new StopWordsRemover()
      .setInputCol("rawTokens")
      .setOutputCol("tokens")

    val vocabSize = 4000

    val countVectorizer = new CountVectorizer()
      .setVocabSize(vocabSize)
      .setInputCol("tokens")
      .setOutputCol("features")

    val PreLDApipeline = new Pipeline()
      .setStages(Array(tokenizer, stopWordsRemover, countVectorizer))

    val PreLDAmodel = PreLDApipeline.fit(mp_listing_lda_df)
    //comment out after saving it the first time
    PreLDAmodel.write.overwrite().save(pre_lda_model_file_path)

    val documents = PreLDAmodel.transform(mp_listing_lda_df)
      .select("docId","features")
      .rdd
      .map{ case Row(row_num: Long, features: MLVector) => (row_num, features) }
      .toDF()

    //documents.printSchema()
    val numTopics: Int = 20
    val maxIterations: Int = 100

    //note the FeaturesCol need to be set
    val lda = new LDA()
      .setOptimizer("em")
      .setK(numTopics)
      .setMaxIter(maxIterations)
      .setFeaturesCol(("_2"))

    val vocabArray = PreLDAmodel.stages(2).asInstanceOf[CountVectorizerModel].vocabulary
  }
}

我认为这与代码导入部分的冲突有关。感谢任何帮助。


需要做两件事:

导入隐式:请注意,这应该仅在实例之后完成org.apache.spark.sql.SQLContext被建造。应该写成:

val sqlContext= new org.apache.spark.sql.SQLContext(sc)

import sqlContext.implicits._

将案例类移到方法之外:案例类(您可以使用它来定义 DataFrame 的架构)应该在需要它的方法之外定义。你可以在这里读更多关于它的内容:https://issues.scala-lang.org/browse/SI-6649 https://issues.scala-lang.org/browse/SI-6649

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

toDF 的值不是 org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)] 的成员 的相关文章

  • Spark 上的 Hive 2.1.1 - 我应该使用哪个版本的 Spark

    我在跑蜂巢2 1 1 Ubuntu 16 04 上的 hadoop 2 7 3 根据Hive on Spark 入门 https cwiki apache org confluence display Hive Hive on Spark
  • 如何在 Spark 中创建空数据帧

    我有一组基于 Avro 的配置单元表 我需要从中读取数据 由于Spark SQL使用hive serdes从HDFS读取数据 因此比直接读取HDFS慢很多 因此 我使用数据块 Spark Avro jar 从底层 HDFS 目录读取 Avr
  • 计算行的排名

    我想根据一个字段对用户 ID 进行排名 对于相同的字段值 排名应该相同 该数据位于 Hive 表中 e g user value a 5 b 10 c 5 d 6 Rank a 1 c 1 d 3 b 4 我怎样才能做到这一点 可以使用ra
  • 将 Spark 数据框中的时间戳转换为日期

    我见过 这里 如何将DataFrame中的时间戳转换为日期格式 https stackoverflow com questions 40656001 how to convert timestamp to date format in da
  • 获取 emr-ddb-hadoop.jar 将 DynamoDB 与 EMR Spark 连接

    我有一个 DynamoDB 表 需要将其连接到 EMR Spark SQL 才能对该表运行查询 我获得了带有发行标签 emr 4 6 0 和 Spark 1 6 1 的 EMR Spark Cluster 我指的是文档 使用 Spark 分
  • Spark 执行器 STDOUT 到 Kubernetes STDOUT

    我在 Spark Worker 中运行的 Spark 应用程序将执行程序日志输出到特定文件路径 worker home directory app xxxxxxxx 0 stdout I used log4j properties将日志从
  • 对于“迭代算法”,转换为 RDD 然后再转换回 Dataframe 有什么优势

    我在读高性能火花作者提出以下主张 虽然 Catalyst 优化器非常强大 但它目前遇到挑战的情况之一是非常大的查询计划 这些查询计划往往是迭代算法的结果 例如图算法或机器学习算法 一个简单的解决方法是将数据转换为 RDD 并在每次迭代结束时
  • 无法在 SBT 中运行 Apache Spark 相关单元测试 - NoClassDefFoundError

    我有一个简单的单元测试 使用SparkContext 我可以在 IntelliJ Idea 中运行单元测试 没有任何问题 但是 当尝试从 SBT shell 运行相同的测试时 我收到以下错误 java lang NoClassDefFoun
  • ';'预期但发现“导入” - Scala 和 Spark

    我正在尝试使用 Spark 和 Scala 来编译一个独立的应用程序 我不知道为什么会收到此错误 topicModel scala 2 expected but import found error import org apache sp
  • 如何从字符串列中提取数字?

    我的要求是从列中的评论列中检索订单号comment并且总是开始于R 订单号应作为新列添加到表中 输入数据 code id mode location status comment AS SD 101 Airways hyderabad D
  • 如何根据 Pyspark 中另一列的表达式评估有条件地替换列中的值?

    import numpy as np df spark createDataFrame 1 1 None 1 2 float 5 1 3 np nan 1 4 None 0 5 float 10 1 6 float nan 0 6 floa
  • Spark日期格式问题

    我在火花日期格式中观察到奇怪的行为 实际上我需要转换日期yy to yyyy 日期转换后 日期应为 20yy 我尝试过如下 2040年后失败 import org apache spark sql functions val df Seq
  • 更改 Spark SQL 中的 Null 顺序

    我需要能够按升序和降序对列进行排序 并且还允许空值位于第一个或空值位于最后一个 使用 RDD 我可以将 sortByKey 方法与自定义比较器结合使用 我想知道是否有使用 Dataset API 的相应方法 我了解如何将 desc asc
  • HashPartitioner 是如何工作的?

    我阅读了文档HashPartitioner http spark apache org docs 1 3 1 api java index html org apache spark HashPartitioner html 不幸的是 除了
  • 带有安全 Kafka 抛出的 Spark 结构化流:无权访问组异常

    为了在我的项目中使用结构化流 我正在 hortonworks 2 6 3 环境上测试 Spark 2 2 0 和 Kafka 0 10 1 与 Kerberos 的集成 我正在运行下面的示例代码来检查集成 我能够在 Spark 本地模式下的
  • 如何使用 Spark 2 屏蔽列?

    我有一些表 我需要屏蔽其中的一些列 要屏蔽的列因表而异 我正在读取这些列application conf file 例如 对于员工表如下所示 id name age address 1 abcd 21 India 2 qazx 42 Ger
  • Spark:如何使用crossJoin

    我有两个数据框 df1有 100000 行并且df2有 10000 行 我想创建一个df3这是两者的交叉连接 val df3 df1 crossJoin df2 这将产生 10 亿行 尝试在本地运行它 但似乎需要很长时间 您认为本地可以实现
  • 如何设置SPARK_HOME变量?

    按照链接中的气泡水步骤进行操作http h2o release s3 amazonaws com sparkling water rel 2 2 0 index html http h2o release s3 amazonaws com
  • Scala Spark 包含与不包含

    我可以使用 contains 过滤 RDD 中的元组 如下所示 但是使用 不包含 来过滤 RDD 又如何呢 val rdd2 rdd1 filter x gt x 1 contains 我找不到这个的语法 假设这是可能的并且我没有使用Dat
  • 根据 pyspark 中的条件从数据框中删除行

    我有一个包含两列的数据框 col1 col2 22 12 2 1 2 1 5 52 1 2 62 9 77 33 3 我想创建一个新的数据框 它只需要行 col1 的值 gt col2 的值 就像注释一样col1 很长类型和col2 有双

随机推荐