将转换后的 DataFrame 保存/导出回 JDBC / MySQL

2024-01-06

我正在尝试弄清楚如何使用新的DataFrameWriter将数据写回 JDBC 数据库。我似乎找不到任何相关文档,尽管查看源代码似乎应该是可能的。

我正在尝试的一个简单示例如下所示:

sqlContext.read.format("jdbc").options(Map(
  "url" -> "jdbc:mysql://localhost/foo", "dbtable" -> "foo.bar")
).select("some_column", "another_column")
.write.format("jdbc").options(Map(
  "url" -> "jdbc:mysql://localhost/foo", "dbtable" -> "foo.bar2")
).save("foo.bar2")

这不起作用——我最终遇到了这个错误:

java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource does not allow create table as select.
    at scala.sys.package$.error(package.scala:27)
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:200)

我不确定我是否做错了什么(例如为什么它解析为 DefaultSource 而不是 JDBCRDD?)或者是否无法使用 Spark 的 DataFrames API 写入现有的 MySQL 数据库。


Update

当前的 Spark 版本(2.0 或更高版本)支持写入时创建表。

原来的答案

可以写入现有表,但目前 (Spark 1.5.0) 尚不支持使用 JDBC 数据源创建表*。你可以检查SPARK-7646 https://issues.apache.org/jira/browse/SPARK-7646以供参考。

如果表已经存在你可以简单地使用DataFrameWriter.jdbc method:

val prop: java.util.Properties = ???
df.write.jdbc("jdbc:mysql://localhost/foo", "foo.bar2", prop)

* 有趣的是 PySpark 似乎支持使用以下方式创建表jdbc method.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

将转换后的 DataFrame 保存/导出回 JDBC / MySQL 的相关文章

  • 如何在 Pyspark 中使用滑动窗口对时间序列数据进行数据转换

    我正在尝试根据时间序列数据的滑动窗口提取特征 在Scala中 似乎有一个sliding函数基于这个帖子 https stackoverflow com a 28863132 3089523 and 文档 http spark apache
  • 无法在 Windows 10 中启动 Spark Master

    我是 Spark 新手 我正在尝试手动启动 master 在 Windows 10 中使用 MINGW64 当我这样做时 Downloads spark 1 5 1 bin hadoop2 4 spark 1 5 1 bin hadoop2
  • PySpark DataFrame 上分组数据的 Pandas 式转换

    如果我们有一个由一列类别和一列值组成的 Pandas 数据框 我们可以通过执行以下操作来删除每个类别中的平均值 df DemeanedValues df groupby Category Values transform lambda g
  • 使用 pyspark awsglue 时显示 DataFrame

    如何使用 awsglue 的 job etl 显示 DataFrame 我尝试了下面的代码 但没有显示任何内容 df show code datasource0 glueContext create dynamic frame from c
  • Spark 数据帧分组、排序和选择一组列的顶部行

    我正在使用 Spark 1 5 0 我有一个包含以下列的 Spark 数据框 user id description fName weight 我想做的是为每个用户选择前 10 行和后 10 行 基于列权重的值 数据类型为 Double 如
  • Spark SQL/Hive 查询通过 Join 永远持续下去

    所以我正在做一些应该很简单的事情 但显然它不在 Spark SQL 中 如果我在 MySQL 中运行以下查询 查询将在不到一秒的时间内完成 SELECT ua address id FROM user u inner join user a
  • 为什么 Spark 退出并显示 exitCode: 16?

    我将 Spark 2 0 0 与 Hadoop 2 7 一起使用 并使用纱线集群模式 每次 我都会收到以下错误 17 01 04 11 18 04 INFO spark SparkContext Successfully stopped S
  • Spark Dataframe 中的分析

    在这个问题中 我们有两个经理 M1 和 M2 在经理 M1 的团队中有两个员工 e1 和 e2 在 M2 的团队中有两个员工 e4 和 e5 以下是经理和员工的层次结构 1 M1 a e1 b e2 2 M2 a e4 b e5 我们有以下
  • 如何从DataFrame中获取最后一行?

    我有一个DataFrame 该DataFrame有两列 value 和 timestamp timestmp 是有序的 我想获取DataFrame的最后一行 我该怎么办 这是我的输入 value timestamp 1 1 4 2 3 3
  • 如何从 Spark 数据框中删除重复项,同时保留最新数据?

    我正在使用 Spark 从 Amazon S3 加载 json 文件 我想根据保留最新数据帧的两列删除重复项 我有时间戳列 最好的方法是什么 请注意 重复项可能分布在多个分区中 我可以在不打乱的情况下删除保留最后一条记录的重复项吗 我正在处
  • 如何在 PySpark 中使用 foreach 或 foreachBatch 写入数据库?

    我想使用 Python PySpark 从 Kafka 源到 MariaDB 进行 Spark 结构化流处理 Spark 2 4 x 我想使用流式 Spark 数据帧 而不是静态数据帧或 Pandas 数据帧 看来必须要用foreach o
  • 在“GROUP BY”子句中重用选择表达式的结果?

    在 MySQL 中 我可以有这样的查询 select cast from unixtime t time Y m d H 00 as datetime as timeHour from some table t group by timeH
  • 如何在 pySpark 数据框中添加行 ID [重复]

    这个问题在这里已经有答案了 我有一个 csv 文件 我在 pyspark 中将其转换为 DataFrame df 经过一番改造后 我想在 df 中添加一列 这应该是简单的行 ID 从 0 或 1 开始到 N 我将 df 转换为 rdd 并使
  • 我们可以在 UDF 中使用关键字参数吗

    我的问题是我们可以像下面那样在 Pyspark 中使用关键字参数和 UDF 吗 conv 方法有一个关键字参数 conv type 默认情况下它被分配给特定类型的格式化程序 但是我想在某些地方指定不同的格式 由于关键字参数 这在 udf 中
  • Spark LDA 困境 - 预测和 OOM 问题

    我正在评估 Spark 1 6 0 来构建大型 数百万个文档 数百万个特征 数千个主题 LDA 模型并进行预测 这是我可以使用 Yahoo 轻松完成的任务 LDA 从小处开始 按照 Java 示例 我使用分布式模型 EM 优化器构建了 10
  • 一起调用distinct和map会在spark库中抛出NPE

    我不确定这是否是一个错误 所以如果你这样做 d spark RDD String d distinct map x gt d filter equals x 您将获得 Java NPE 但是如果你做了一个collect之后立马distinc
  • S3A:失败,而 S3:在 Spark EMR 中工作

    我将 EMR 5 5 0 与 Spark 结合使用 如果我使用一个简单的文件写入 s3s3 网址写得很好 但如果我使用s3a 地址 它失败了Service Amazon S3 Status Code 403 Error Code Acces
  • 在 Spark 中写入 JSON 时保留具有空值的键

    我正在尝试使用 Spark 编写 JSON 文件 有一些键有null作为价值 这些在中显示得很好DataSet 但是当我写入文件时 密钥会丢失 我如何确保它们被保留 写入文件的代码 ddp coalesce 20 write mode ov
  • 如何对 RDD 进行分区

    我有一个文本文件 其中包含大量由空格分隔的随机浮动值 我正在将此文件加载到 scala 中的 RDD 中 这个RDD是如何分区的 另外 是否有任何方法可以生成自定义分区 以便所有分区都具有相同数量的元素以及每个分区的索引 val dRDD
  • 在 Scala 中创建 Java 对象

    我有一个 Java 类 Listings 我在 Java MapReduce 作业中使用它 如下所示 public void map Object key Text value Context context throws IOExcept

随机推荐

  • Domino Designer 中的 javascript 版本?

    在处理基于 Web 的 Domino 表单时 onChange字段 JavaScript 的事件 我无法使用某些语法 因为它会抛出错误并且代码将无法保存 例子有 代替var 我想用const and let 但它不会接受它 此外 当尝试使用
  • 我应该将哪个 .NET Azure 服务总线库用于队列?

    Microsoft 在 NuGet 上有两个 Azure 服务总线包 WindowsAzure ServiceBus https www nuget org packages WindowsAzure ServiceBus Use this
  • addField类型图像和缩略图路径

    我有一个 Magento 网上商店 刚刚创建了一个带有扩展名 Modulecreator 的自定义模块 该模块带有一个标准的管理界面 可以处理文件上传 我发现 如果您想显示缩略图 可以使用 图像 字段类型 addField myfield
  • 如何在 Objective-C 中记录每个被调用的类方法的名称? [复制]

    这个问题在这里已经有答案了 当我想查看对象方法调用的顺序时 我必须像这样记录我实现的每个方法 void updateTime float time NSLog s PRETTY FUNCTION 因此我必须把这段代码放在类的每个方法中 每次
  • 如果我想建立一个自定义数据库,我该怎么做? [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我构建了一个以前的程序 它获取客户信息并将其存储在 txt 文件的文件夹中 不太切实际 但现在我想升级该程序以提高效率 并将信息放入某
  • 如何更改 Joomla 管理 URL

    默认情况下 Joomla 管理 URL 是您的站点名称 管理员 我如何为 Joomla 1 5 更改此设置 以便它不会影响我的模块和组件或后端的任何其他内容 我认为这不是一个好主意 因为模块 组件中指向 administrator 的一些链
  • 无法从 Gstreamer 找到 get-launch-1.0

    我在 OSX Mac 上运行 我已经在此处找到的 GStreamer 1 5 1 中安装了各种软件包 http gstreamer freedesktop org data pkg osx http gstreamer freedeskto
  • 为什么这个 rust HashMap 宏不再起作用?

    我以前用过 macro export macro rules map T ident key expr gt value expr gt let mut m T new m insert key value m 要创建对象 如下所示 let
  • 将 EoS 发送到文件接收器,同时从 tee 中删除分支

    我写了一个v4l2src同时显示和记录的代码 我的管道看起来像 queue videosink v4l2src tee queue filesink 目前我可以一起显示 记录 还可以随意动态启动和停止记录分支 使用 ctrl c sigin
  • Linux 内核:设置通过 create_device() 创建的 /dev 文件的权限

    我正在制作一个小型 Linux 模块 它是 char 设备的驱动程序 在我的代码中 我创建了设备类 而不是设备本身 因此 dev 文件是 在我的系统中创建 问题是 dev文件只有root权限 而用户 对该文件既没有读 写也没有执行权限 我想
  • 在 Vagrant 配置期间更新 .bashrc 和环境变量

    我正在使用 Vagrant 设置一个包含 python pip virtualenv virtualenvwrapper 和一些要求的盒子 配置 shell 脚本添加了virtualenvwrapper 所需的行 http virtuale
  • 将 string[][] 与 npgsql 一起使用

    不支持吗 尝试插入命令参数设置为的数据时出现异常 var parameter IDbDataParameter cmd Parameters index var list string value parameter Value list
  • 如何在 Android API 17 之前的 Android 中检测外部显示器

    我正在使用具有受版权保护的视频的应用程序 该视频只能在 Android 设备中运行 而不能在通过 HDMI 或无线连接的外部显示器中运行 我发现了很棒的 AP I推介会 http developer android com referenc
  • 为什么变量“i”和“j”用于计数器?

    Locked 这个问题及其答案是locked help locked posts因为这个问题是题外话 但却具有历史意义 目前不接受新的答案或互动 我知道这似乎是一个绝对愚蠢的问题 但我很好奇 不能不问 为什么 i 和 j 成为大多数控制结构
  • 如何在Android中没有SQL数据库的情况下显示在线数据以供离线使用?

    我想要节目3fragments in my Activity并从中加载数据json in any fragments 我将网站数据显示到Recyclerview with OkHTTP v3图书馆 我想离线显示此数据 我的意思是如果用户关闭
  • 上下文切换线程等待

    我已经寻找这个问题的答案一天了 但找不到直接的答案 我正在阅读上下文切换等待队列之类的内容 确实很好地掌握了所有内容 在阅读一篇文章时 写到当发生车队情况时 将会有大量的上下文切换 那么让我直接说一下 假设一个线程处于等待互斥体解锁的队列中
  • 类型错误:引导日期选择器中未定义日期

    我正在使用引导日期选择器http www eyecon ro bootstrap datepicker http www eyecon ro bootstrap datepicker 但 firebug 显示 bootstrap datep
  • 在 SQLPLUS 中正确格式化表

    在这被投票为重复问题之前 我在 S O 上花了相当多的时间 试图解决这个问题 正如您在下面的屏幕截图中看到的 我的表格看起来很乱 对于第一个和第二个表 您可以看到每个表有 2 组列标题 那么 例如 我怎样才能将所有 5 条客户记录包含在一张
  • 如何将任何 mp3 文件转换为 .wav 16khz 单声道 16 位

    请帮助选择将任何 mp3 文件转换为特殊 wav 的解决方案 我是 Linux 命令行工具的新手 所以现在对我来说很难 我需要从任何 mp3 文件中获取具有 16khz 单声道 16 位声音属性的 wav 我正在尝试 ffmpeg i 11
  • 将转换后的 DataFrame 保存/导出回 JDBC / MySQL

    我正在尝试弄清楚如何使用新的DataFrameWriter将数据写回 JDBC 数据库 我似乎找不到任何相关文档 尽管查看源代码似乎应该是可能的 我正在尝试的一个简单示例如下所示 sqlContext read format jdbc op