在您的解决方案中,Spark 会在开始写入之前将整个表内容读入一个分区。可以避免这种情况的一种方法是对读取部分进行分区,但它需要源数据中的数字连续列:
Dataset<Row> ds = spark.read()
.format("jdbc")
.option("url", url)
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "bigdatatable")
.option("user", "root")
.option("password", "foobar")
.option("partitionColumn", "NUMERIC_COL")
.option("lowerBound", "1")
.option("upperBound", "10000")
.option("numPartitions", "64")
.load();
在上面的示例中,数据中必须存在“NUMERIC_COL”列,理想情况下,它应该在 1 到 10000 之间均匀变化。当然,这是很多要求,并且类似的列可能不存在,因此您应该可能会在数据库中创建一个具有类似列的视图,或者将其添加到查询中(请注意,我使用了通用 SQL 语法,您必须适应您的 DBMS):
String query = "(select mod(row_number(), 64) as NUMERIC_COL, * from bigdatatable) as foo"
Dataset<Row> ds = spark.read()
.format("jdbc")
.option("url", url)
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", query)
.option("user", "root")
.option("password", "foobar")
.option("partitionColumn", "NUMERIC_COL")
.option("lowerBound", "0")
.option("upperBound", "63")
.option("numPartitions", "64")
.load();