我正在使用 pyspark-sql 使用 JDBC 在远程 mysql 数据库中创建行。
我有两张桌子,parent_table(id, value)
and child_table(id, value, parent_id)
,所以每一行parent_id
可能有尽可能多的行child_id
根据需要与其关联。
现在我想创建一些新数据并将其插入数据库。我正在使用代码指南here https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases为了write
操作,但我希望能够执行以下操作:
parentDf = sc.parallelize([5, 6, 7]).toDF(('value',))
parentWithIdDf = parentDf.write.mode('append') \
.format("jdbc") \
.option("url", "jdbc:mysql://" + host_name + "/"
+ db_name).option("dbtable", table_name) \
.option("user", user_name).option("password", password_str) \
.save()
# The assignment at the previous line is wrong, as pyspark.sql.DataFrameWriter#save doesn't return anything.
我想要一种方法让上面的最后一行代码返回一个 DataFrame,其中每行都有新的行 id,这样我就可以这样做
childDf = parentWithIdDf.flatMap(lambda x: [[8, x[0]], [9, x[0]]])
childDf.write.mode('append')...
这意味着最后我会在我的远程数据库中
parent_table
____________
| id | value |
____________
| 1 | 5 |
| 2 | 6 |
| 3 | 7 |
____________
child_table
________________________
| id | value | parent_id |
________________________
| 1 | 8 | 1 |
| 2 | 9 | 1 |
| 3 | 8 | 2 |
| 4 | 9 | 2 |
| 5 | 8 | 3 |
| 6 | 9 | 3 |
________________________
正如我在上面的第一个代码片段中所写的,pyspark.sql.DataFrameWriter#save
不返回任何内容,正在查看它的文档 http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter,那么我怎样才能实现这一目标呢?
我做错了什么吗?看起来没有办法从 Spark 的操作中获取数据(save
是),而我想用这个行动作为一种转变,这让我觉得我可能以错误的方式思考这一切。