在 Spark 中将带有 MapType 列的 DataFrame 写入数据库

2024-03-02

我正在尝试使用 clickhouse-native-jdbc 驱动程序将带有 MapType 列的数据帧保存到 Clickhouse(架构中也包含地图类型列),并遇到以下错误:

Caused by: java.lang.IllegalArgumentException: Can't translate non-null value for field 74
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeSetter$16(JdbcUtils.scala:593)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeSetter$16$adapted(JdbcUtils.scala:591)

我在spark源代码中找到了这个地方,它包含以下内容:

private def makeSetter(
      conn: Connection,
      dialect: JdbcDialect,
      dataType: DataType): JDBCValueSetter = dataType match {
    case IntegerType =>
      (stmt: PreparedStatement, row: Row, pos: Int) =>
        stmt.setInt(pos + 1, row.getInt(pos))

    case LongType =>
      (stmt: PreparedStatement, row: Row, pos: Int) =>
        stmt.setLong(pos + 1, row.getLong(pos))

...
    case _ =>
      (_: PreparedStatement, _: Row, pos: Int) =>
        throw new IllegalArgumentException(
          s"Can't translate non-null value for field $pos")

该函数匹配列类型,如果没有合适的类型,则会抛出此错误。正如我所看到的,spark 根本无法处理 MapType 列。

我尝试复制和修改org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils文件以使其能够与 MapType 列一起使用,如下所示:

case MapType(_, _, _) =>
    (stmt: PreparedStatement, row: Row, pos: Int) =>
        val map = row.getMap[AnyRef, AnyRef](pos)
        stmt.setObject(pos + 1, mapAsJavaMap(map))

在本地计算机中,它按预期工作,但在集群模式执行器中使用库存版本,而不是我自己的版本。

有谁知道如何让 Spark 以另一种方式使用 MapType 列,或者使用修改后的源代码来创建执行器?


感谢 Danilo Rodrigues 的启发,最后我这样解决了我的问题: 我没有按原样编写 Map 值,而是将其转换为 json 字符串,Clickhouse 中的表架构现在如下所示:

CREATE TABLE t1 (
    param_str String,
    param MATERIALIZED cast((arrayMap(x->x.1, JSONExtractKeysAndValues(param_str, 'String')), arrayMap(x->x.2, JSONExtractKeysAndValues(param_str, 'String'))), 'Map(String, String)')
) Engine ...

是的,它看起来有点难看,我更愿意选择改变 Spark 源代码的方法,但当前的方法效果很好

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 Spark 中将带有 MapType 列的 DataFrame 写入数据库 的相关文章

随机推荐