我正在尝试使用 clickhouse-native-jdbc 驱动程序将带有 MapType 列的数据帧保存到 Clickhouse(架构中也包含地图类型列),并遇到以下错误:
Caused by: java.lang.IllegalArgumentException: Can't translate non-null value for field 74
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeSetter$16(JdbcUtils.scala:593)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeSetter$16$adapted(JdbcUtils.scala:591)
我在spark源代码中找到了这个地方,它包含以下内容:
private def makeSetter(
conn: Connection,
dialect: JdbcDialect,
dataType: DataType): JDBCValueSetter = dataType match {
case IntegerType =>
(stmt: PreparedStatement, row: Row, pos: Int) =>
stmt.setInt(pos + 1, row.getInt(pos))
case LongType =>
(stmt: PreparedStatement, row: Row, pos: Int) =>
stmt.setLong(pos + 1, row.getLong(pos))
...
case _ =>
(_: PreparedStatement, _: Row, pos: Int) =>
throw new IllegalArgumentException(
s"Can't translate non-null value for field $pos")
该函数匹配列类型,如果没有合适的类型,则会抛出此错误。正如我所看到的,spark 根本无法处理 MapType 列。
我尝试复制和修改org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
文件以使其能够与 MapType 列一起使用,如下所示:
case MapType(_, _, _) =>
(stmt: PreparedStatement, row: Row, pos: Int) =>
val map = row.getMap[AnyRef, AnyRef](pos)
stmt.setObject(pos + 1, mapAsJavaMap(map))
在本地计算机中,它按预期工作,但在集群模式执行器中使用库存版本,而不是我自己的版本。
有谁知道如何让 Spark 以另一种方式使用 MapType 列,或者使用修改后的源代码来创建执行器?