从 apache Spark 读取/写入 dynamo 数据库 [关闭]

2024-04-24

我想知道是否有任何 Java 库支持从 apache Spark(Mesos) 读取/写入 dynamo db (AWS),我知道根据本文有一些库支持 EMR Sparkhttps://aws.amazon.com/blogs/big-data/analyze-your-data-on-amazon-dynamodb-with-apache-spark/ https://aws.amazon.com/blogs/big-data/analyze-your-data-on-amazon-dynamodb-with-apache-spark/。请指教。

谢谢 普拉迪普


您可以使用以下命令从 DynamoDB 表中读取项目或将项目写入其中阿帕奇火花 and emr-dynamodb-连接器图书馆。要读取数据,您可以使用javaSparkContext.hadoopRDD(jobConf, DynamoDBInputFormat.class, Text.class, DynamoDBItemWritable.class);以及将数据写入 DynamoDB:javaPairRDD.saveAsHadoopDataset(jobConf);。以下是一个示例(适用于 EMR 和非 EMR 环境):

public static void main(String[] args) throws Exception {
    SparkConf conf = new SparkConf()
            .setAppName("DynamoDBApplication")
            .setMaster("local[4]")
            .registerKryoClasses(new Class<?>[]{
                    Class.forName("org.apache.hadoop.io.Text"),
                    Class.forName("org.apache.hadoop.dynamodb.DynamoDBItemWritable")
            });

    JavaSparkContext sc = new JavaSparkContext(conf);

    JobConf jobConf = getDynamoDbJobConf(sc, "TableNameForRead", "TableNameForWrite");

    // read all items from DynamoDB table with name TableNameForRead
    JavaPairRDD<Text, DynamoDBItemWritable> javaPairRdd = sc.hadoopRDD(jobConf, DynamoDBInputFormat.class, Text.class, DynamoDBItemWritable.class);
    System.out.println("count: " + javaPairRdd.count());

    // process data in any way, below is just a simple example
    JavaRDD<Map<String, AttributeValue>> javaRDD = javaPairRdd.map(t -> {
        DynamoDBItemWritable item = t._2();
        Map<String, AttributeValue> attrs = item.getItem();
        String hashKey = attrs.get("key").getS();
        Long result = Long.valueOf(attrs.get("resultAttribute").getN());
        System.out.println(String.format("hashKey=%s, result=%d", hashKey, result));
        return attrs;
    });
    System.out.println("count: " + javaRDD.count());

    // update JavaPairRdd in order to store it to DynamoDB, below is just a simple example with updating hashKey
    JavaPairRDD<Text, DynamoDBItemWritable> updatedJavaPairRDD = javaPairRdd.mapToPair(t -> {
        DynamoDBItemWritable item = t._2();
        Map<String, AttributeValue> attrs = item.getItem();
        String hashKey = attrs.get("key").getS();
        String updatedHashKey = hashKey + "_new";
        attrs.get("key").setS(updatedHashKey);
        return new Tuple2<>(t._1(), item);
    });

    // write items to DynamoDB table with name TableNameForWrite
    updatedJavaPairRDD.saveAsHadoopDataset(jobConf);

    sc.stop();
}


private static JobConf getDynamoDbJobConf(JavaSparkContext sc, String tableNameForRead, String tableNameForWrite) {
    final JobConf jobConf = new JobConf(sc.hadoopConfiguration());
    jobConf.set("dynamodb.servicename", "dynamodb");

    jobConf.set("dynamodb.input.tableName", tableNameForRead);
    jobConf.set("dynamodb.output.tableName", tableNameForWrite);

    jobConf.set("dynamodb.awsAccessKeyId", "YOUR_AWS_ACCESS_KEY");
    jobConf.set("dynamodb.awsSecretAccessKey", "YOUR_AWS_SECRET_KEY");
    jobConf.set("dynamodb.endpoint", "dynamodb.us-west-1.amazonaws.com");
    jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat");
    jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat");

    return jobConf;
}

要运行此代码,您需要以下 Maven 依赖项:

<dependencies>

    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-core</artifactId>
        <version>2.6.0</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.6.0</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-annotations</artifactId>
        <version>2.6.0</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.module</groupId>
        <artifactId>jackson-module-scala_2.10</artifactId>
        <version>2.6.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>${spark.version}</version>
    </dependency>

    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-java-sdk-emr</artifactId>
        <version>1.11.113</version>
    </dependency>
    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-java-sdk-dynamodb</artifactId>
        <version>1.11.113</version>
    </dependency>

    <!-- https://github.com/awslabs/emr-dynamodb-connector -->
    <dependency>
        <groupId>com.amazon.emr</groupId>
        <artifactId>emr-dynamodb-hadoop</artifactId>
        <version>4.2.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-aws</artifactId>
        <version>2.8.0</version>
    </dependency>

</dependencies>
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

从 apache Spark 读取/写入 dynamo 数据库 [关闭] 的相关文章

  • 如何过滤 pyspark 列表中值的列?

    我有一个数据框原始数据 我必须在 X 列上应用值 CB CI 和 CR 的过滤条件 所以我使用了下面的代码 df dfRawData filter col X between CB CI CR 但我收到以下错误 Between 恰好需要 3
  • 从 pandas udf 记录

    我正在尝试从 python 转换中调用的 pandas udf 进行日志记录 因为在执行器上调用的代码不会显示在驱动程序的日志中 我一直在寻找一些选项 但到目前为止最接近的选项是这个one https stackoverflow com q
  • 在 kubernetes 上安装 PySpark 软件包时出现 Spark-Submit:ivy-cache 文件未找到错误

    我一整天都在与它斗争 我能够安装并使用带有 Spark shell 或连接的 Jupiter 笔记本的包 graphframes 但我想使用 Spark Submit 将其移动到基于 kubernetes 的 Spark 环境 我的火花版
  • 如何将多行标签 xml 文件转换为 dataframe

    我有一个包含多个行标签的 xml 文件 我需要将此 xml 转换为正确的数据帧 我使用了spark xml 它只处理单行标签 xml数据如下
  • 使用 mlib 执行 Spark-Shell,错误:对象 jblas 不是包 org 的成员

    在spark shell中 当我执行import org jblas DoubleMatrix 它会在 RHEL 上抛出 错误 对象 jblas 不是包 org 的成员 实际上 我用谷歌搜索了 jblas 并安装了 gfortran htt
  • 列对象不可调用 Spark

    我尝试安装 Spark 并运行教程中给出的命令 但出现以下错误 https spark apache org docs latest quick start html https spark apache org docs latest q
  • 如何将模型从 ML Pipeline 保存到 S3 或 HDFS?

    我正在尝试保存 ML Pipeline 生成的数千个模型 正如答案中所示here https stackoverflow com questions 32121046 run 3000 random forest models by gro
  • 更新 DynamoDB 中的多条记录

    如何在单个查询中更新 DynamoDB 中的多条记录 我有一个 csv 文件作为基于 csv 文件的输入 我必须更新数据库中的多条记录 只有一个属性 有可用的 API 吗 或者这可以使用批处理 Spring batch 来完成 Dynamo
  • 如何将模型结果保存到文本文件?

    我正在尝试将从模型生成的频繁项集保存到文本文件中 该代码是 Spark ML 库中 FPGrowth 示例的示例 Using saveAsTextFile直接在模型上写入 RDD 位置而不是实际值 import org apache spa
  • Spark:如何使用crossJoin

    我有两个数据框 df1有 100000 行并且df2有 10000 行 我想创建一个df3这是两者的交叉连接 val df3 df1 crossJoin df2 这将产生 10 亿行 尝试在本地运行它 但似乎需要很长时间 您认为本地可以实现
  • 获取 int() 参数必须是字符串或数字,而不是“Column”- Apache Spark

    如果我使用以下代码 我会收到此异常 int argument must be a string or a number not Column df df withColumn FY F when df ID substr 5 2 isin
  • 如何将 Pyspark Dataframe 标题设置到另一行?

    我有一个如下所示的数据框 col1 col2 col3 id name val 1 a01 X 2 a02 Y 我需要从中创建一个新的数据框 使用 row 1 作为新的列标题并忽略或删除 col1 col2 等行 新表应如下所示 id na
  • 将 IndexToString 应用于 Spark 中的特征向量

    Context 我有一个数据框 其中所有分类值都已使用 StringIndexer 进行索引 val categoricalColumns df schema collect case StructField name StringType
  • Scala Spark 包含与不包含

    我可以使用 contains 过滤 RDD 中的元组 如下所示 但是使用 不包含 来过滤 RDD 又如何呢 val rdd2 rdd1 filter x gt x 1 contains 我找不到这个的语法 假设这是可能的并且我没有使用Dat
  • 在 Spark 结构化流 2.3.0 中连接两个流时,左外连接不发出空值

    两个流上的左外连接不发出空输出 它只是等待记录添加到另一个流中 使用套接字流来测试这一点 在我们的例子中 我们想要发出具有 null 值的记录 这些记录与 id 不匹配或 且不属于时间范围条件 水印和间隔的详细信息如下 val ds1Map
  • 使用 Python 计算 Spark 中成对 (K,V) RDD 中每个 KEY 的平均值

    我想与 Python 共享这个特定的 Apache Spark 解决方案 因为它的文档非常贫乏 我想通过 KEY 计算 K V 对 存储在 Pairwise RDD 中 的平均值 示例数据如下所示 gt gt gt rdd1 take 10
  • 在spark-kafka中使用schema将ConsumerRecord值转换为Dataframe

    我正在使用 Spark 2 0 2 和 Kafka 0 11 0 并且 我正在尝试在火花流中使用来自卡夫卡的消息 以下是代码 val topics notes val kafkaParams Map String Object bootst
  • 适用于 Hadoop 的 DynamoDB 输入格式

    我必须使用 Hadoop mapreduce 处理保留在 Amazon Dynamodb 中的一些数据 我在互联网上搜索 Dynamo DB 的 Hadoop InputFormat 但找不到它 我对 Dynamo DB 不熟悉 所以我猜测
  • 火花内存不足

    我有一个文件夹 里面有 150 G 的 txt 文件 大约 700 个文件 平均每个 200 MB 我使用 scala 来处理文件并最终计算一些汇总统计数据 我认为有两种可能的方法可以做到这一点 手动循环所有文件 对每个文件进行计算并最终合
  • pyspark 将 twitter json 流式传输到 DF

    我正在从事集成工作spark streaming with twitter using pythonAPI 我看到的大多数示例或代码片段和博客是他们从Twitter JSON文件进行最终处理 但根据我的用例 我需要所有字段twitter J

随机推荐

  • 每当应用程序运行或不运行时显示通知

    在我的程序中 无论应用程序是否运行 都必须激活通知 我应该将通知方法放在 onCreate 中吗 我的通知就像闹钟一样 请稍微检查一下 public String getCurrentTime Calendar c Calendar get
  • 如何在最新的azure webjob 3.03中指定AzureWebJobsStorage

    我将旧的 azure webjob 代码更新为打包到 3 03 然后它就不起作用了 我设法修复了所有编译时错误 但在本地运行时 它会抛出以下错误 Microsoft Azure WebJobs Host Indexers FunctionI
  • 带有自定义 json 数据的 JsTree

    我在 json 中有这个结构 无法根据请求进行修改 Object url http www google com id 1 name Redirection Rule Object frequency 1 trigger 1 Object
  • 如何使用 Express 在 NodeJS 中的 GET 请求中发出 GET 请求

    基本上 我试图在回调 GET 方法中从 Facebook 获取访问令牌 下面是我的代码 getAccessToken根本没有被调用 正确的实施方法是什么 app get fbcallback function req res var cod
  • React - setState 不更新值

    我正在尝试使用 DidMount 中的 localStorage 值更新状态 但它没有更新 type Props type State id evaluation string class Evaluation extends Compon
  • 在c#中查找编译类的源文件

    我正在寻找一组已编译的 net 程序集中特定类的关联源文件 e g MyAsm Namespace Foo gt C Source foo cs MyAsm Namespace Bar gt C Source Code MoreCode C
  • 如何使用 Google Apps 脚本限制文件的复制/下载/打印访问

    有没有人找到一种方法来限制使用谷歌应用程序脚本复制 下载 打印电子表格的访问权限 背景信息 我创建了一个使用 setShareableByEditors false 限制编辑者共享权限的脚本 唯一的问题是编辑者仍然可以轻松地复制电子表格 然
  • Rails 4 link_to 更大的静态图像

    我的文件存储在app assets images subdirectory image png and app assets images subdirectory image full png In my app views home h
  • lseek() 的复杂度是 O(1) 吗?

    我知道我的问题在这里有答案 QFile 寻道性能 https stackoverflow com questions 6171403 qfile seek performance 但我对这个答案并不完全满意 即使在查看了以下实现之后gene
  • 是否可以将jsp预编译到eclipse中?

    标题很简单 我想知道是否有可能直接在eclipse中看到编译好的jsp 生成的servlet 无需部署到任何服务器上 如果您使用 JSP 我建议购买我的Eclipse http www myeclipseide com 因为它可以编译 JS
  • 显示表格单元格不一致。

    嘿 我想知道为什么会发生这种情况 http jsfiddle net dSVGF http jsfiddle net dSVGF 按钮尚未填充容器 锚确实如此 有什么本质上的不同 两个标签之间的风格 div class table a hr
  • 如何在 SQL 中替换 PIVOT 中的 Null 值

    我有以下代码 我试图用零替换使用枢轴时出现的 Null 我执行了以下操作 但它说 ISNULL 附近的语法不正确 我不确定我做错了什么 有什么建议请 select from tempfinaltable pivot ISNULL sum T
  • 无法更新 android studio 3.1:配置冲突:同步项目期间“armeabi-v7a,x86”

    这是我的构建 gradle 应用程序 文件 apply plugin com android application apply plugin io fabric apply plugin checkstyle def versions a
  • Angular2 ngNoForm 还可以进行角度形式验证

    我有一个遗留后端服务器 它将表单数据作为请求参数进行处理 我们将 angular2 放在前端 我想提交 angular2 表单 以便所有字段都作为请求参数 这样就不必更改旧后端 为此 我有
  • 捆绑安装不起作用

    我正在 Windows 上开发 Ruby on Rails 我们的本地网络出现问题 无法访问https www rubygems org https www rubygems org 好像被屏蔽了什么的 但我可以通过访问它http www
  • Ruby on Rails 3:Devise::LdapAdapter.get_ldap_param 未定义方法错误

    我在跑步 红宝石 1 9 3p0 轨道 3 1 1 设计1 4 9 Devise ldap authenticatable 0 4 10 我正在使用 Devise 通过 LDAP 服务器验证我的 Rails 应用程序 我使用用户名而不是电子
  • Is Type 和 Is Type(object, object) 抛出 TypeException

    我试图断言方法调用返回的对象属于以下类型List
  • EC2 t2.medium 可爆发信用“储蓄”计算

    我正在使用 T2 medium 实例 一天的三分之一的时间我都在做密集的统计计算 并计算出剩下的 2 3 的时间我将以每小时 24 小时的速度 赚取 学分 但这并没有发生 这是我这两天的使用情况 这是我的信用账户 直到昨天下午 6 点我已经
  • 在 Ruby 中模拟 int64 溢出

    我是一名资深程序员 但对 Ruby 还很陌生 我正在尝试移植一种名为 CheckRevision 的算法 用于在登录 Battle net 的在线游戏服务之前检查游戏文件的完整性 该算法使用给定的公式对文件进行 哈希 没有无聊的细节 而是不
  • 从 apache Spark 读取/写入 dynamo 数据库 [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我想知道是否有任何 Java 库支持从 apache Spark Mesos 读取 写入 dynamo