从 pySpark SQL 写入远程 mysql 数据库 (JDBC) 获取新行 id

2024-01-23

我正在使用 pyspark-sql 使用 JDBC 在远程 mysql 数据库中创建行。

我有两张桌子,parent_table(id, value) and child_table(id, value, parent_id),所以每一行parent_id可能有尽可能多的行child_id根据需要与其关联。

现在我想创建一些新数据并将其插入数据库。我正在使用代码指南here https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases为了write操作,但我希望能够执行以下操作:

parentDf = sc.parallelize([5, 6, 7]).toDF(('value',))
parentWithIdDf = parentDf.write.mode('append') \
                         .format("jdbc") \
                         .option("url", "jdbc:mysql://" + host_name + "/"
                            + db_name).option("dbtable", table_name) \
                         .option("user", user_name).option("password", password_str) \
                         .save()
# The assignment at the previous line is wrong, as pyspark.sql.DataFrameWriter#save doesn't return anything.

我想要一种方法让上面的最后一行代码返回一个 DataFrame,其中每行都有新的行 id,这样我就可以这样做

childDf = parentWithIdDf.flatMap(lambda x: [[8, x[0]], [9, x[0]]])
childDf.write.mode('append')...

这意味着最后我会在我的远程数据库中

parent_table
 ____________
| id | value |
 ____________
| 1  |   5   |
| 2  |   6   |
| 3  |   7   |
 ____________

child_table
 ________________________
| id | value | parent_id |
 ________________________
| 1  |   8   |    1      |
| 2  |   9   |    1      |
| 3  |   8   |    2      |
| 4  |   9   |    2      |
| 5  |   8   |    3      |
| 6  |   9   |    3      |
 ________________________ 

正如我在上面的第一个代码片段中所写的,pyspark.sql.DataFrameWriter#save不返回任何内容,正在查看它的文档 http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter,那么我怎样才能实现这一目标呢?

我做错了什么吗?看起来没有办法从 Spark 的操作中获取数据(save是),而我想用这个行动作为一种转变,这让我觉得我可能以错误的方式思考这一切。


一个简单的答案是使用时间戳+自增数来创建唯一的ID。仅当某个时间只有一台服务器正在运行时,此方法才有效。 :)

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

从 pySpark SQL 写入远程 mysql 数据库 (JDBC) 获取新行 id 的相关文章

随机推荐

  • 如何在 PostgreSQL ORDER BY 子句中使用 ALIAS?

    我有以下查询 SELECT title stock one stock two AS global stock FROM product ORDER BY global stock 0 title 在 PostgreSQL 8 1 23 中
  • 根据字符串匹配过滤字符串向量

    我有以下向量 X lt c mama log papa log mimo png mentor log 如何检索另一个仅包含以 m 开头并以 log 结尾的元素的向量 您可以使用grepl用正则表达式 X grepl m log X
  • 将 AudioBufferList 转换为 CMSampleBuffer 会产生意外结果

    我正在尝试转换AudioBufferList我从音频单元得到的CMSampleBuffer我可以传递到AVAssetWriter保存麦克风中的音频 这种转换有效 因为我为执行转换而进行的调用不会失败 但记录最终会失败 并且我在日志中看到一些
  • 为什么我的 MVC 应用程序中有两个 web.config 文件

    Views 文件夹中有一个 应用程序的根目录中还有另一个 我想注册一个自定义处理程序 但我不明白代码应该放在哪里 我在集成模式下运行 IIS7 所以我必须添加
  • C++11 中的线程池

    相关问题 关于 C 11 C 11 std 线程池 https stackoverflow com questions 12993451 c11 stdthread pooled C 11 中的 async launch async 是否会
  • NSScrollView 具有未剪辑的内容视图?

    有没有办法设置我的滚动视图不剪辑其内容 这是一个NSTextView 我有 NSScrollView 的子类并想要它的内容not被剪裁到其边界 我尝试过重写 BOOL wantsDefaultClipping return NO in My
  • 如何让 Flask-SQLAlchemy 在出现异常时自动回滚会话?

    我想设置一个使用以下构建的应用程序Flask SQLAlchemy如果视图引发在视图代码外部冒泡的异常 即未在内部捕获 则回滚对数据库所做的所有更改 我希望它能够工作 即使某些对象在子事务中自动或直接通过session commit 类似的
  • 在keras中构建多输出模型时出错

    我正在尝试在 Keras 中创建一个多输出模型 该模型从卷积开始 旨在叠加两个独立密集层的结果 我为回归任务创建了一些随机数据 其中x1是输入和df是标签 这df包含三列 定义训练和测试拆分并形成模型后 我在拟合模型时收到错误 谁能帮我纠正
  • 如何在 Android 中的搜索输入字段旁边添加下拉菜单?

    在我的 HTC Desire Froyo 上的系统范围搜索中 我看到搜索输入字段左侧有一个小下拉菜单 允许选择我要搜索的位置 全部 网络 应用程序 我怎样才能实现这个在我的一个应用程序中 Google 开发者网站上的搜索教程没有解决这个问题
  • 配置 MapReduce 作业时使用多个 InputFormat 类

    我想编写一个可以处理文本和 zip 文件的 MapReduce 应用程序 为此 我想使用不同的输入格式 一种用于文本 另一种用于 zip 可以这样做吗 从 ChrisWhite的答案延伸一点 你需要的是使用自定义InputFormat an
  • 如何修复 Xcode“DTAssetProviderService 无法启动..”错误?

    我已经升级了 macOS Sierra Developer Preview 但是我的Xcode 7 3 1尝试在模拟器上运行我的项目时出现以下错误 此外 Generic to archieve 还给出了另一个错误 例如 此外 对于存档 发生
  • 如何将此 SQL 查询转换为 LINQ 或 Lambda 表达式?

    我有以下 SQL 查询 SELECT C ID C Name FROM Category C JOIN Layout L ON C ID L CategoryID JOIN Position P ON L PositionID LIKE C
  • 无法使用 Desktop Docker 设置通过节点端口访问 Kubernetes 服务

    我在 Windows 10 上使用 Docker Desktop 我生成 kubernetes NodePort 服务以从客户端 Web 浏览器访问 http 10 110 201 24 30008 hello praveen http 1
  • 停留在基本的 Linq to XML 查询上

    我正在尝试从 namecheap 沙箱 api 中提取信息 但无法弄清楚为什么我的 linq 查询不起作用 这是一个示例响应 XML
  • php 中的文件锁定

    我有一个新人 隔壁的少年 编写了一些 php 代码来跟踪我网站上的一些使用情况 我不熟悉 php 所以我想问一些关于并发文件访问的问题 我的本机应用程序 在 Windows 上 偶尔会通过点击包含我的 php 脚本的 URL 来将一些数据记
  • 从地图外部将对象拖放到 Google 地图中:标记未放置在正确的纬度/经度处

    我想从地图外部将一个对象拖到我的 Google 地图 API V3 中 经过一番研究 我发现这个非常有帮助的帖子 https stackoverflow com a 5921814 1866810我尝试将其适应我的项目 主要思想是在地图上拖
  • MVC路由问题

    我想按如下方式设置路由 Profile Edit gt 编辑操作的路由 Profile Add gt 添加操作的路由 Profile username gt 使用参数 username 路由到 Index 操作 因为操作用户名不存在 所以我
  • EventBus 和 RxJava 有什么区别? [复制]

    这个问题在这里已经有答案了 我对 android 中的 EventBus 和 RxJava 之间的区别感到困惑 我需要实现其中之一来解决我的问题 即在完成某些更改后通知某些组件 以便它们可以更新其状态 另外 我读到 EventsBus 已因
  • Java 禁用 dpi 感知不起作用

    我正在尝试运行 Java 应用程序 Dsun java2d dpiaware false争论但什么也没发生 我希望有一个模糊的用户界面 但对于正常大小的图标和字体 这个标志似乎不起作用 我在 Windows 8 1 上使用 JDK 1 8
  • 从 pySpark SQL 写入远程 mysql 数据库 (JDBC) 获取新行 id

    我正在使用 pyspark sql 使用 JDBC 在远程 mysql 数据库中创建行 我有两张桌子 parent table id value and child table id value parent id 所以每一行parent