这个问题的答案很棘手,但感谢samklr https://twitter.com/samklr,我已经设法弄清楚问题出在哪里。
然而,解决方案并不简单,并且可能会考虑一些“不必要的”转换。
首先我们来谈谈序列化.
Spark 中数据序列化和函数序列化需要考虑两个方面的序列化。在本例中,涉及数据序列化和反序列化。
从 Spark 的角度来看,唯一需要的就是设置序列化 - Spark 默认依赖于 Java 序列化,这很方便,但效率相当低。这就是Hadoop本身引入自己的序列化机制和自己的类型的原因——即Writables
。像这样,InputFormat
and OutputFormats
需要返回Writables
Spark 开箱即用时无法理解。
使用elasticsearch-spark连接器,必须启用一种不同的序列化(Kryo),它可以自动处理转换并且非常高效。
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
即使 Kryo 不要求类实现要序列化的特定接口,这意味着 POJO 可以在 RDD 中使用,除了启用 Kryo 序列化之外无需任何进一步的工作。
也就是说,@samklr 向我指出 Kryo 需要在使用类之前注册它们。
这是因为 Kryo 写入了对正在序列化的对象的类的引用(为每个写入的对象写入一个引用),如果该类已注册,则该引用只是一个整数标识符,否则为完整的类名。 Spark 代表您注册 Scala 类和许多其他框架类(例如 Avro Generic 或 Thrift 类)。
使用 Kryo 注册课程非常简单。创建 KryoRegistrator 的子类,并重写registerClasses()
method:
public class MyKryoRegistrator implements KryoRegistrator, Serializable {
@Override
public void registerClasses(Kryo kryo) {
// Product POJO associated to a product Row from the DataFrame
kryo.register(Product.class);
}
}
最后,在您的驱动程序中,将spark.kryo.registrator属性设置为KryoRegistrator实现的完全限定类名:
conf.set("spark.kryo.registrator", "MyKryoRegistrator")
其次,即使设置了 Kryo 序列化器并注册了类,并对 Spark 1.5 进行了更改,但由于某种原因 Elasticsearch 无法反序列化Dataframe 因为它无法推断SchemaType
将数据框插入连接器。
所以我必须将 Dataframe 转换为 JavaRDD
JavaRDD<Product> products = df.javaRDD().map(new Function<Row, Product>() {
public Product call(Row row) throws Exception {
long id = row.getLong(0);
String title = row.getString(1);
String description = row.getString(2);
int merchantId = row.getInt(3);
double price = row.getDecimal(4).doubleValue();
String keywords = row.getString(5);
long brandId = row.getLong(6);
int categoryId = row.getInt(7);
return new Product(id, title, description, merchantId, price, keywords, brandId, categoryId);
}
});
现在数据已准备好写入 elasticsearch :
JavaEsSpark.saveToEs(products, "test/test");
参考:
- Elasticsearch 的 Apache Spark 支持文档 https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html.
- Hadoop 权威指南,第 19 章。Spark,编辑。 4——汤姆·怀特。
- User samklr https://twitter.com/samklr.