Spark 中简单的 RDD 写入 DynamoDB

2024-02-16

刚刚在尝试将基本 RDD 数据集导入 DynamoDB 时陷入困境。这是代码:

import org.apache.hadoop.mapred.JobConf

var rdd = sc.parallelize(Array(("", Map("col1" -> Map("s" -> "abc"), "col2" -> Map("n" -> "123")))))

var jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set("dynamodb.output.tableName", "table_x")
jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")

rdd.saveAsHadoopDataset(jobConf)

这是我得到的错误:

16/02/28 15:40:38 WARN TaskSetManager: Lost task 7.0 in stage 1.0 (TID 18, ip-172-31-9-224.eu-west-1.compute.internal): java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.hadoop.io.Text
at org.apache.hadoop.dynamodb.write.DefaultDynamoDBRecordWriter.convertValueToDynamoDBItem(DefaultDynamoDBRecordWriter.java:10)
at org.apache.hadoop.dynamodb.write.AbstractDynamoDBRecordWriter.write(AbstractDynamoDBRecordWriter.java:90)
at org.apache.spark.SparkHadoopWriter.write(SparkHadoopWriter.scala:96)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1199)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1197)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1197)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1205)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1185)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

我可以做什么来解决这个问题?


您需要将对象转换为文本对象。

我建议你看看这里:

https://aws.amazon.com/blogs/big-data/using-spark-sql-for-etl/ https://aws.amazon.com/blogs/big-data/using-spark-sql-for-etl/

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 中简单的 RDD 写入 DynamoDB 的相关文章

  • Scala Sparkcollect_list() 与 array()

    有什么区别collect list and array 在 Spark 中使用 scala 我看到到处都有使用情况 但我不清楚用例来确定差异 尽管两者array https spark apache org docs latest api
  • 在 Spark 中将流式 XML 转换为 JSON

    我是 Spark 新手 正在开发一个简单的应用程序 将从 Kafka 接收的 XML 流转换为 JSON 格式 Using 火花2 4 5 斯卡拉 2 11 12 在我的用例中 kafka 流采用 xml 格式 以下是我尝试过的代码 val
  • 如何检查SparkContext是否已停止?

    如何检测是否SparkContext http spark apache org docs latest programming guide html已经stopped https spark apache org docs latest
  • 是否可以使用 Java 读写 Parquet,而不依赖 Hadoop 和 HDFS?

    我一直在寻找这个问题的解决方案 在我看来 如果不引入对 HDFS 和 Hadoop 的依赖 就无法在 Java 程序中嵌入读写 Parquet 格式 它是否正确 我想在 Hadoop 集群之外的客户端计算机上进行读写 我开始对 Apache
  • Apache Spark 中的高效字符串匹配

    我使用 OCR 工具从屏幕截图中提取文本 每个大约 1 5 句话 然而 当手动验证提取的文本时 我注意到时不时会出现一些错误 鉴于文本 你好 我真的很喜欢 Spark 我注意到 1 像 I 和 l 这样的字母被 替换 2 表情符号未被正确提
  • 在 Databricks / Spark 中的 SQL 中为变量分配动态值

    我觉得我一定在这里遗漏了一些明显的东西 但我似乎无法在 Spark SQL 中动态设置变量值 假设我有两张桌子 tableSrc and tableBuilder 我正在创建tableDest 我一直在尝试变体 SET myVar FLOA
  • 公平调度器和容量调度器有什么区别?

    我是 Hadoop 世界的新手 想了解公平调度程序和容量调度程序之间的区别 另外我们什么时候应该使用每一个 请简单地回答一下 因为我在网上读了很多东西 但从中得到的不多 公平调度是一种为作业分配资源的方法 使得所有作业随着时间的推移平均获得
  • 读取不同文件夹深度的多个 csv 文件

    我想递归地将给定文件夹中的所有 csv 文件读入 Spark SQLDataFrame如果可能的话 使用单一路径 我的文件夹结构如下所示 我想包含具有一个路径的所有文件 resources first csv resources subfo
  • Python 包安装:pip 与 yum,还是两者一起安装?

    我刚刚开始管理 Hadoop 集群 我们使用 Bright Cluster Manager 直至操作系统级别 CentOS 7 1 然后使用 Ambari 以及适用于 Hadoop 的 Hortonworks HDP 2 3 我不断收到安装
  • 来自 Janino 和 Commons-Compiler 的 Spark java.lang.NoSuchMethodError

    我正在构建一个使用 Spark 进行基于随机森林分类的 应用程序 当尝试运行该程序时 我从该行收到异常 StringIndexerModel labelIndexer new StringIndexer setInputCol label
  • 覆盖hadoop中的log4j.properties

    如何覆盖hadoop中的默认log4j properties 如果我设置 hadoop root logger WARN console 它不会在控制台上打印日志 而我想要的是它不应该在日志文件中打印 INFO 我在 jar 中添加了一个
  • 与文件名中的冒号“:”作斗争

    我有以下代码 用于加载大量 csv gz 并将它们转储到其他文件夹中 并将源文件名作为一列 object DailyMerger extends App def allFiles path File List File val parts
  • pyspark。数据框中的 zip 数组

    我有以下 PySpark DataFrame id data 1 10 11 12 2 20 21 22 3 30 31 32 最后 我想要以下 DataFrame id data
  • Spark Python:标准缩放器错误“不支持... SparseVector”

    我又撞到了堵墙 我是一个新手 所以我不得不再次依赖你强大的知识 我从一个数据集开始 如下所示 user account id user lifetime user no outgoing activity in days user acco
  • Spark Dataframe/Parquet 中的枚举等效项

    我有一个包含数亿行的表 我想将其存储在 Spark 的数据帧中并作为 parquet 文件持久保存到磁盘 我的 Parquet 文件的大小现在超过 2TB 我想确保我已经对此进行了优化 这些列中很大一部分是字符串值 它们可能很长 但值通常也
  • 如何通过数据框中数组列的索引计算平均值

    我正在使用 Spark 2 2 我有一个关于合作的基本问题ArrayType 我没有找到可以使用的内置聚合函数 Given a DataFrame有一个柱子id和一列values of ArrayType 我们想按 id 分组 然后按索引计
  • 在 pyspark 中包装 java 函数

    我正在尝试创建一个用户定义的聚合函数 我可以从 python 调用它 我试图遵循答案this https stackoverflow com questions 33233737 spark how to map python with s
  • 如何避免连续“重置偏移量”和“寻找最新偏移量”?

    我正在尝试遵循本指南 https spark apache org docs latest structed streaming kafka integration html https spark apache org docs late
  • Hive NVL 不适用于列的日期类型 - NullpointerException

    我正在使用 HDFS 上的 MapR Hive 发行版并面临以下问题 如果表的列类型是 日期 类型 则NVL https cwiki apache org confluence display Hive LanguageManual UDF
  • 非键属性查询

    看起来 dynamodb 的query方法必须包含分区键作为过滤器的一部分 如果不知道分区键如何进行查询 例如 您有一个具有属性的用户表userid设置为分区键 现在我们想通过电话号码查找用户 是否可以在没有分区键的情况下执行查询 使用sc

随机推荐