将 Spark Dataframe 保存到 Elasticsearch - 无法处理类型异常

2024-04-17

我设计了一个简单的作业,使用 Spark 从 MySQL 读取数据并将其保存在 Elasticsearch 中。

这是代码:

JavaSparkContext sc = new JavaSparkContext(
        new SparkConf().setAppName("MySQLtoEs")
                .set("es.index.auto.create", "true")
                .set("es.nodes", "127.0.0.1:9200")
                .set("es.mapping.id", "id")
                .set("spark.serializer", KryoSerializer.class.getName()));

SQLContext sqlContext = new SQLContext(sc);

// Data source options
Map<String, String> options = new HashMap<>();
options.put("driver", MYSQL_DRIVER);
options.put("url", MYSQL_CONNECTION_URL);
options.put("dbtable", "OFFERS");
options.put("partitionColumn", "id");
options.put("lowerBound", "10001");
options.put("upperBound", "499999");
options.put("numPartitions", "10");

// Load MySQL query result as DataFrame
LOGGER.info("Loading DataFrame");
DataFrame jdbcDF = sqlContext.load("jdbc", options);
DataFrame df = jdbcDF.select("id", "title", "description",
        "merchantId", "price", "keywords", "brandId", "categoryId");
df.show();
LOGGER.info("df.count : " + df.count());
EsSparkSQL.saveToEs(df, "offers/product");

可以看到代码非常简单。它将数据读入 DataFrame,选择一些列,然后执行count作为数据框上的基本操作。到目前为止一切正常。

然后它尝试将数据保存到 Elasticsearch 中,但失败了,因为它无法处理某些类型。可以看到错误日志here https://gist.github.com/eliasah/76a58145cefc4fec1111.

我不确定为什么它不能处理这种类型。有谁知道为什么会发生这种情况?

我正在使用 Apache Spark 1.5.0、Elasticsearch 1.4.4 和 ematicsearch-hadoop 2.1.1

EDIT:

  • 我已经使用示例数据集和源代码更新了要点链接。
  • 我也尝试过使用elasticsearch-hadoop开发构建 https://www.elastic.co/guide/en/elasticsearch/hadoop/current/install.html#download-dev正如@costin 在邮件列表中提到的。

这个问题的答案很棘手,但感谢samklr https://twitter.com/samklr,我已经设法弄清楚问题出在哪里。

然而,解决方案并不简单,并且可能会考虑一些“不必要的”转换。

首先我们来谈谈序列化.

Spark 中数据序列化和函数序列化需要考虑两个方面的序列化。在本例中,涉及数据序列化和反序列化。

从 Spark 的角度来看,唯一需要的就是设置序列化 - Spark 默认依赖于 Java 序列化,这很方便,但效率相当低。这就是Hadoop本身引入自己的序列化机制和自己的类型的原因——即Writables。像这样,InputFormat and OutputFormats需要返回WritablesSpark 开箱即用时无法理解。

使用elasticsearch-spark连接器,必须启用一种不同的序列化(Kryo),它可以自动处理转换并且非常高效。

conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

即使 Kryo 不要求类实现要序列化的特定接口,这意味着 POJO 可以在 RDD 中使用,除了启用 Kryo 序列化之外无需任何进一步的工作。

也就是说,@samklr 向我指出 Kryo 需要在使用类之前注册它们。

这是因为 Kryo 写入了对正在序列化的对象的类的引用(为每个写入的对象写入一个引用),如果该类已注册,则该引用只是一个整数标识符,否则为完整的类名。 Spark 代表您注册 Scala 类和许多其他框架类(例如 Avro Generic 或 Thrift 类)。

使用 Kryo 注册课程非常简单。创建 KryoRegistrator 的子类,并重写registerClasses() method:

public class MyKryoRegistrator implements KryoRegistrator, Serializable {
    @Override
    public void registerClasses(Kryo kryo) {
        // Product POJO associated to a product Row from the DataFrame            
        kryo.register(Product.class); 
    }
}

最后,在您的驱动程序中,将spark.kryo.registrator属性设置为KryoRegistrator实现的完全限定类名:

conf.set("spark.kryo.registrator", "MyKryoRegistrator")

其次,即使设置了 Kryo 序列化器并注册了类,并对 Spark 1.5 进行了更改,但由于某种原因 Elasticsearch 无法反序列化Dataframe 因为它无法推断SchemaType将数据框插入连接器。

所以我必须将 Dataframe 转换为 JavaRDD

JavaRDD<Product> products = df.javaRDD().map(new Function<Row, Product>() {
    public Product call(Row row) throws Exception {
        long id = row.getLong(0);
        String title = row.getString(1);
        String description = row.getString(2);
        int merchantId = row.getInt(3);
        double price = row.getDecimal(4).doubleValue();
        String keywords = row.getString(5);
        long brandId = row.getLong(6);
        int categoryId = row.getInt(7);
        return new Product(id, title, description, merchantId, price, keywords, brandId, categoryId);
    }
});

现在数据已准备好写入 elasticsearch :

JavaEsSpark.saveToEs(products, "test/test");

参考:

  • Elasticsearch 的 Apache Spark 支持文档 https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html.
  • Hadoop 权威指南,第 19 章。Spark,编辑。 4——汤姆·怀特。
  • User samklr https://twitter.com/samklr.
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

将 Spark Dataframe 保存到 Elasticsearch - 无法处理类型异常 的相关文章

随机推荐

  • 如何在 Google 电子表格中插入列?

    我想将新数据添加到工作表的开头 开头 所以我必须在工作表中添加一个新的 A1 列 但我找不到任何 PHP 的 API 示例 现在我用这个附加数据 body new Google Service Sheets ValueRange value
  • 如果未使用 scp 命令指定目标路径,则文件位置

    要将文件夹从本地计算机复制到我使用的服务器 scp r local folder user server path 现在我忘了第一次指定目标路径 scp r local folder user server 现在有人知道该文件夹是否已被复制
  • 如何在视频标签中播放AVI文件?

    我想知道是否可以让浏览器在一个文件中播放 AVI 文件video tag 我在网上找到的所有谈论它的内容都集中在 MP4 和 Ogg 格式 但没有人谈论 AVI 格式 我发现的唯一解决方案是放弃video标记并使视频可以使用 JW Play
  • Lua 如何创建可用于变量的自定义函数?

    对于像 io close 这样的方法 你可以像这样使用它 file close 有没有办法创建一个像这样工作的自定义函数 您可以在变量上调用它 对我来说 我尝试使用它通过使用 string find 查找空格来将参数与文本文件分开 所以在文
  • 如何在 Node.js 中进行 Base64 编码?

    Node js 是否有内置 Base64 编码 我问这个的原因是final from crypto只能输出十六进制 二进制或ASCII数据 例如 var cipher crypto createCipheriv des ede3 cbc e
  • 对于某些版本的 PHP,“未指定输入文件”

    我安装了多个版本的 PHP 我写的 基本上 它是一个 ApacheLounge 安装 在 Windows 10 上通过 FastCGI 与 PHP 进行通信 这曾经让我在不同的虚拟主机上同时运行这些 PHP 版本 Apache 通过 Fas
  • 如何可视化来自谷歌协议缓冲区的数据?

    我想使用谷歌协议缓冲区存储数据 另一种序列化格式也可以 然后有一个用户界面来浏览该数据 是否有 C 框架 API 可以让我做到这一点 例如 它可以使用protobuf的反射接口 然后将数据填充到Qt的QTableView 或从其他工具包 中
  • 从 C 中的 long 中提取单个数字

    我正在为我的 C 课程 第一门编程课程 做作业 作业的一部分是编写代码 让用户输入一个最多9位数的数字 程序需要判断这个数字是 递增 真递增 递减 真递减 增减 实减实增 不减不增 共7个选项 由于这是我们的第一个作业 我们不允许使用课堂上
  • 与 Jenkins 工作流程/管道并行运行阶段

    请注意 问题是基于旧的 现在称为 脚本化 管道格式 当使用 声明式管道 时 并行块可以嵌套在阶段块内 请参阅声明式管道 1 2 的并行阶段 https jenkins io blog 2017 09 25 declarative 1 我想知
  • Android 中卡片视图内带有三个点的小部件的名称是什么?

    带有三个点的小部件是什么 如何将其添加到我的应用程序中 这根本不是一个小部件 它是一个ImageButton 无边框风格 使用包含一个的溢出图标PopupMenu 如需文档教程访问http developer android com gui
  • ASP.NET-Core 2.0 在应用程序启动后添加/删除路由

    我需要添加 删除通过 IApplicaitonBuilder 在 Startup 类的 Configure 方法期间注册的自定义路由 启动后 我在 UseMvc 命令中调用 MapRoute 方法 将一堆自定义路由注册到我的控制器 这些路由
  • ListBox不显示绑定数据

    在我的 Xaml 中我有这个
  • 将 Bootstrap 导航栏中的元素居中

    无论我尝试什么 我都无法将 Bootstrap 导航栏中的某些内容居中 有什么解决方案吗 我尝试添加一个div 使用margin 0 auto or margin right auto margin left auto used cente
  • opencv中的矩阵类型转换

    我正在尝试使用滤波器对图像进行卷积 并借助 opencv 中的 filter2D 函数将其存储到 CV 64F 类型的矩阵中 但目标矩阵的类型发生了变化 我尝试借助 allocateTo 0r ConvertTo 函数将其更改回 CV 64
  • 了解随机起始权重对神经网络性能的影响

    使用 R 和包neuralnet 我尝试对数据进行建模 如下所示 这些是几天内以 10 分钟为间隔的温度读数 上面是 2 天的截图 使用下面的代码 我将神经网络拟合到数据 可能有更简单的方法来对这些精确数据进行建模 但将来数据可能看起来完全
  • 无需安装即可使用Python

    我有一个安装程序 它使用 Python 脚本来安装多个组件 我不想在用户计算机上安装 Python 如果用户还没有安装 Python 并且我也不希望安装 Python 成为使用我的安装程序的先决条件 有没有一种方法可以在不使用安装程序的情况
  • 范围之间的随机日期时间 - 不统一输出

    我实现了下面的 RandomDate 但我总是不断获取接近 From 日期的值 我可能在这里错过了一些东西 public static DateTime GetRandomDate DateTime from DateTime to var
  • 为什么矢量化通常比循环更快?

    为什么在执行操作的硬件的最低级别和所涉及的一般底层操作 即 运行代码时所有编程语言的实际实现通用的事情 矢量化通常比循环快得多 计算机在循环时会做什么而在使用矢量化时不会做什么 我指的是计算机执行的实际计算 而不是程序员编写的计算 或者它有
  • ChrisBanes PullToRefresh“正在加载...”问题

    我正在使用我发现的 chrisbanes 的 PullToRefresh ListViewhere https github com chrisbanes Android PullToRefresh 多亏了它的文档 我成功地实现了它 然而
  • 将 Spark Dataframe 保存到 Elasticsearch - 无法处理类型异常

    我设计了一个简单的作业 使用 Spark 从 MySQL 读取数据并将其保存在 Elasticsearch 中 这是代码 JavaSparkContext sc new JavaSparkContext new SparkConf setA