Spark 中的潜在狄利克雷分配 (LDA) - 复制模型

2024-01-31

我想从 pyspark ml-clustering 包中保存 LDA 模型,并在保存后将该模型应用于训练和测试数据集。然而,尽管设定了种子,结果还是存在差异。我的代码如下:

1)导入包

from pyspark.ml.clustering import LocalLDAModel, DistributedLDAModel
from pyspark.ml.feature import CountVectorizer , IDF

2)准备数据集

countVectors = CountVectorizer(inputCol="requester_instruction_words_filtered_complete", outputCol="raw_features", vocabSize=5000, minDF=10.0)
cv_model = countVectors.fit(tokenized_stopwords_sample_df)
result_tf = cv_model.transform(tokenized_stopwords_sample_df)
vocabArray = cv_model.vocabulary
idf = IDF(inputCol="raw_features", outputCol="features")
idfModel = idf.fit(result_tf)
result_tfidf = idfModel.transform(result_tf)
result_tfidf = result_tfidf.withColumn("id", monotonically_increasing_id())    
corpus = result_tfidf.select("id", "features")

3)训练LDA模型

lda = LDA(k=number_of_topics, maxIter=100, docConcentration = [alpha], topicConcentration = beta, seed = 123)
model = lda.fit(corpus)
model.save("LDA_model_saved")
topics = model.describeTopics(words_in_topic)  
topics_rdd = topics.rdd
modelled_corpus = model.transform(corpus)

4)复制模型

#Prepare the data set
countVectors = CountVectorizer(inputCol="requester_instruction_words_filtered_complete", outputCol="raw_features", vocabSize=5000, minDF=10.0)
cv_model = countVectors.fit(tokenized_stopwords_sample_df)
result_tf = cv_model.transform(tokenized_stopwords_sample_df)
vocabArray = cv_model.vocabulary
idf = IDF(inputCol="raw_features", outputCol="features")
idfModel = idf.fit(result_tf)
result_tfidf = idfModel.transform(result_tf)   
result_tfidf = result_tfidf.withColumn("id", monotonically_increasing_id())
corpus_new = result_tfidf.select("id", "features")

#Load the model to apply to new corpus
newModel = LocalLDAModel.load("LDA_model_saved")
topics_new = newModel.describeTopics(words_in_topic)  
topics_rdd_new = topics_new.rdd
modelled_corpus_new = newModel.transform(corpus_new)

尽管我假设是相同的,但以下结果是不同的:topics_rdd != topics_rdd_new and modelled_corpus != modelled_corpus_new(此外,在检查提取的主题时,它们以及数据集上的预测类别都是不同的)

所以我觉得很奇怪,即使我在模型生成中设置了种子,同一模型在同一数据集上预测不同的类(“主题”)。有复制 LDA 模型经验的人可以提供帮助吗?

谢谢 :)


我在 PYSPARK 中实现 LDA 时遇到了类似的问题。尽管我使用了种子,但每次我在具有相同参数的相同数据上重新运行代码时,结果都是不同的。

在尝试了多种方法后,我想出了以下解决方案:

  1. Saved cv_model运行一次并在下一次迭代中加载它之后,而不是重新安装它。

  2. 这和我的数据集比较相关。我使用的语料库中的一些文档的大小非常小(每个文档大约 3 个单词)。我过滤掉了这些文档并设置了限制,这样只有那些至少包含 15 个单词的文档才会包含在语料库中(您的语料库可能更高)。我不确定为什么这个有效,可能与强调模型复杂性的某些事情有关。

总而言之,即使经过几次迭代,我的结果也是相同的。希望这可以帮助。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 中的潜在狄利克雷分配 (LDA) - 复制模型 的相关文章

  • 如何将 DataFrame 作为输入传递给 Spark UDF?

    我有一个数据框 我想对每一行应用一个函数 该函数依赖于其他数据帧 简化的例子 我有如下三个数据框 df sc parallelize a b 1 c d 3 toDF feat1 feat2 value df other 1 sc para
  • Spark中如何获取map任务的ID?

    Spark中有没有办法获取map任务的ID 例如 如果每个映射任务都调用用户定义的函数 我可以从该用户定义的函数中获取该映射任务的 ID 吗 我不确定您所说的地图任务 ID 是什么意思 但您可以使用以下方式访问任务信息TaskContext
  • 带有 pySpark 的 GraphFrames

    我想将 GraphFrames 与 PySpark 一起使用 目前在 Google Dataproc 上使用 Spark v2 3 3 安装 GraphFrames 后 pip install graphframes 我尝试运行以下代码 f
  • 如何并行运行多个Spark作业?

    一个 Spark 有一个 Oracle 查询 所以我必须并行运行多个作业 以便所有查询同时触发 如何并行运行多个作业 引用官方文档作业调度 http spark apache org docs latest job scheduling h
  • 如何传递架构以从现有数据帧创建新数据帧?

    要将 schema 传递到 json 文件 我们这样做 from pyspark sql types import StructField StringType StructType IntegerType data schema Stru
  • 使用空/空字段值创建新的数据框

    我正在从现有数据帧创建一个新数据帧 但需要在这个新 DF 中添加新列 下面代码中的 field1 我该怎么做 工作示例代码示例将不胜感激 val edwDf omniDataFrame withColumn field1 callUDF v
  • pyspark中的函数input()

    我的问题是当我输入 p 的值时 没有任何反应 它不执行 请问有办法修复它吗 import sys from pyspark import SparkContext sc SparkContext local simple App p inp
  • Pyspark 数据框逐行空列列表

    我有一个 Spark 数据框 我想创建一个新列 其中包含每行中具有 null 的列名称 例如 原始数据框是 col 1 col 2 col 3 62 45 null 62 49 56 45 null null null null null
  • Scala:如何获取数据框中的行范围

    我有一个DataFrame通过运行创建sqlContext readParquet 文件的一个 The DataFrame由 300 M 行组成 我需要使用这些行作为另一个函数的输入 但我想以较小的批次进行操作 以防止 OOM 错误 目前
  • K均值||用于 Spark 上的情感分析

    我正在尝试编写基于Spark的情感分析程序 为此 我使用了 word2vec 和 KMeans 聚类 从 word2Vec 我在 100 维空间中得到了 20k 个单词 向量集合 现在我正在尝试对这个向量空间进行聚类 当我使用默认并行实现运
  • Python / Pyspark - 计数 NULL、空和 NaN

    我想计算列中的 NULL 空和 NaN 值 我尝试过这样的 df filter df ID df ID isNull df ID isnan count 但我总是收到此错误消息 TypeError Column object is not
  • Spark Driver 内存和 Application Master 内存

    我是否正确理解客户端模式的文档 客户端模式与驱动程序在应用程序主机中运行的集群模式相反 在客户端模式下 驱动程序和应用程序主机是单独的进程 因此spark driver memory spark yarn am memory一定小于机器内存
  • 用于在 pyspark 中处理大数的数据类型

    我将 Spark 与 python 一起使用 上传 csv 文件后 我需要解析 csv 文件中的一列 其中包含 22 位数字长的数字 为了解析我使用的列长类型 我使用 map 函数来定义列 以下是我在 pyspark 中的命令 gt gt
  • 为什么 Apache Spark 会读取嵌套结构中不必要的 Parquet 列?

    我的团队正在构建一个 ETL 流程 以使用 Spark 将原始分隔文本文件加载到基于 Parquet 的 数据湖 中 Parquet 列存储的承诺之一是查询将仅读取必要的 列条带 但我们看到意外的列被读取以获取嵌套模式结构 为了进行演示 下
  • Scala Spark:将数据框中的双列转换为日期时间列

    我正在尝试编写代码来将日期时间列 date 和 last updated date 转换为 mm dd yyyy 格式以进行显示 它们实际上是 unix 时间转换为双精度数 我该怎么做呢 import org joda time impor
  • Spark UDF 错误 - 不支持 Any 类型的架构

    我正在尝试创建一个 udf 它将列中的负值替换为 0 我的数据框名为 df 包含一列名为 avg x 这是我创建 udf 的代码 val noNegative udf avg acc x Double gt if avg acc x lt
  • 如果 Spark 中的数据帧是不可变的,为什么我们能够使用 withColumn() 等操作来修改它?

    这可能是一个愚蠢的问题 源于我的无知 我已经在 PySpark 上工作了几个星期 并没有太多的编程经验 我的理解是 在 Spark 中 RDD 数据帧和数据集都是不可变的 我再次理解 这意味着您无法更改数据 如果是这样 为什么我们能够使用编
  • Spark中DataFrame、Dataset、RDD的区别

    我只是想知道有什么区别RDD and DataFrame Spark 2 0 0 DataFrame 只是一个类型别名Dataset Row 在阿帕奇火花 你能将其中一种转换为另一种吗 首先是DataFrame是从SchemaRDD 是的
  • Zeppelin:如何在 zeppelin 中重新启动 SparkContext

    我正在使用 zeppelins Spark 解释器的隔离模式 在这种模式下 它将为 Spark 集群中的每个笔记本启动一项新工作 我想在笔记本执行完成后通过 zeppelin 终止该作业 为此我做了sc stop这停止了 sparkCont
  • 使用 Scala 在 Apache Spark 中拆分字符串

    我有一个数据集 其中包含以下格式的行 制表符分隔 Title lt t gt Text 现在对于每个单词Text 我想创建一个 Word Title 一对 例如 ABC Hello World gives me Hello ABC Worl

随机推荐