ES-Hadoop是Elasticsearch推出的专门用于对接Hadoop生态的工具,可以让数据在Elasticsearch和Hadoop之间双向移动,无缝衔接Elasticsearch与Hadoop服务,充分使用Elasticsearch的快速搜索及Hadoop批处理能力,实现交互式数据处理。本文介绍如何通过ES-Hadoop实现Hadoop的Hive服务读写Elasticsearch数据。
Spark是一种通用的大数据计算框架,拥有Hadoop MapReduce所具有的计算优点,能够通过内存缓存数据为大型数据集提供快速的迭代功能。与MapReduce相比,减少了中间数据读取磁盘的过程,进而提高了处理能力。本文介绍如何通过ES-Hadoop实现Hadoop的Spark服务读写Elasticsearch数据。
阿里云Elasticsearch兼容开源Elasticsearch的功能,以及Security、Machine Learning、Graph、APM等商业功能,致力于数据分析、数据搜索等场景服务。支持5.5.3、6.3.2、6.7.0、6.8.0和7.4.0等版本,并提供了商业插件X-Pack服务。在开源Elasticsearch的基础上提供企业级权限管控、安全监控告警、自动报表生成等功能。本文使用阿里云Elasticsearch为您演示,单击此处即可免费试用。
准备工作
-
创建阿里云Elasticsearch实例,并开启自动创建索引功能。
具体操作步骤请参见创建阿里云Elasticsearch实例和开启自动创建索引。本文以6.7.0版本的实例为例。
注意 在生产环境中,建议关闭自动创建索引功能,提前创建好索引和Mapping。由于本文仅用于测试,因此开启了自动创建索引功能。
-
创建与Elasticsearch实例在同一专有网络下的E-MapReduce(以下简称EMR)实例。
实例配置如下:
- 准备Java环境,要求JDK版本为8.0及以上。
编写并运行Spark任务
-
准备测试数据。
-
登录E-MapReduce控制台,获取Master节点的IP地址,并通过SSH登录对应的ECS机器。
具体操作步骤,请参见使用SSH连接主节点。
-
将测试数据写入文件中。
本文使用的JSON数据示例如下,将该数据保存在 http_log.txt 文件中。
{"id": 1, "name": "zhangsan", "birth": "1990-01-01", "addr": "No.969, wenyixi Rd, yuhang, hangzhou"}
{"id": 2, "name": "lisi", "birth": "1991-01-01", "addr": "No.556, xixi Rd, xihu, hangzhou"}
{"id": 3, "name": "wangwu", "birth": "1992-01-01", "addr": "No.699 wangshang Rd, binjiang, hangzhou"}
-
执行以下命令,将测试数据上传至EMR Master节点的 tmp/hadoop-es 文件中。
hadoop fs -put http_log.txt /tmp/hadoop-es
-
配置pom依赖。
创建Java Maven工程,并将如下的pom依赖添加到Java工程的pom.xml文件中。
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>6.7.0</version>
</dependency>
</dependencies>
注意 请确保pom依赖中版本与云服务对应版本保持一致,例如elasticsearch-spark-20_2.11版本与阿里云Elasticsearch版本一致;spark-core_2.12与HDFS版本一致。
-
编写示例代码。
-
写数据
以下示例代码用来将测试数据写入Elasticsearch的company索引中。
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import org.spark_project.guava.collect.ImmutableMap;
public class SparkWriteEs {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("Es-write");
conf.set("es.nodes", "es-cn-n6w1o1x0w001c****.elasticsearch.aliyuncs.com");
conf.set("es.net.http.auth.user", "elastic");
conf.set("es.net.http.auth.pass", "xxxxxx");
conf.set("es.nodes.wan.only", "true");
conf.set("es.nodes.discovery","false");
conf.set("es.input.use.sliced.partitions","false");
SparkSession ss = new SparkSession(new SparkContext(conf));
final AtomicInteger employeesNo = new AtomicInteger(0);
//以下的/tmp/hadoop-es/http_log.txt需要替换为您测试数据的路径。
JavaRDD<Map<Object, ?>> javaRDD = ss.read().text("/tmp/hadoop-es/http_log.txt")
.javaRDD().map((Function<Row, Map<Object, ?>>) row -> ImmutableMap.of("employees" employeesNo.getAndAdd(1), row.mkString()));
JavaEsSpark.saveToEs(javaRDD, "company/_doc");
}
}
-
读数据
以下示例代码用来读取上一步写入Elasticsearch的数据,并进行打印。
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import java.util.Map;
public class ReadES {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("readEs").setMaster("local[*]")
.set("es.nodes", "es-cn-n6w1o1x0w001c****.elasticsearch.aliyuncs.com")
.set("es.port", "9200")
.set("es.net.http.auth.user", "elastic")
.set("es.net.http.auth.pass", "xxxxxx")
.set("es.nodes.wan.only", "true")
.set("es.nodes.discovery","false")
.set("es.input.use.sliced.partitions","false")
.set("es.resource", "company/_doc")
.set("es.scroll.size","500");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaPairRDD<String, Map<String, Object>> rdd = JavaEsSpark.esRDD(sc);
for ( Map<String, Object> item : rdd.values().collect()) {
System.out.println(item);
}
sc.stop();
}
}
参数说明
参数 | 默认值 | 说明 |
---|
es.nodes | localhost | 指定阿里云Elasticsearch实例的访问地址,建议使用内网地址,可在实例的基本信息页面查看。更多信息,请参见查看实例的基本信息。 |
es.port | 9200 | Elasticsearch实例的访问端口号。 |
es.net.http.auth.user | elastic | Elasticsearch实例的访问用户名。 说明 如果程序中指定elastic账号访问Elasticsearch服务,后续在修改elastic账号对应密码后需要一些时间来生效,在密码生效期间会影响服务访问,因此不建议通过elastic来访问。建议在Kibana控制台中创建一个符合预期的Role角色用户进行访问,详情请参见创建角色和创建用户。 |
es.net.http.auth.pass | / | 对应用户的密码,在创建实例时指定。如果忘记可进行重置,具体操作步骤,请参见重置实例访问密码。 |
es.nodes.wan.only | false | 开启Elasticsearch集群在云上使用虚拟IP进行连接,是否进行节点嗅探: true:设置; false:不设置 |
es.nodes.discovery | true | 是否禁用节点发现: true:禁用 ;false:不禁用 注意 使用阿里云Elasticsearch,必须将此参数设置为false。 |
es.input.use.sliced.partitions | true | 是否使用slice分区: true:使用。设置为true,可能会导致索引在预读阶段的时间明显变长,有时会远远超出查询数据所耗费的时间。建议设置为false,以提高查询效率; false:不使用。 |
es.index.auto.create | true | 通过Hadoop组件向Elasticsearch集群写入数据,是否自动创建不存在的index: true:自动创建 ; false:不会自动创建 |
es.resource | / | 指定要读写的index和type。 |
es.mapping.names | / | 表字段与Elasticsearch的索引字段名映射。 |
更多的ES-Hadoop配置项说明,请参见官方配置说明。
-
将代码打成Jar包,上传至EMR客户端机器(例如Gateway或EMR集群主节点)。
-
在EMR客户端机器上,运行如下命令执行Spark程序。
-
写数据
cd /usr/lib/spark-current
./bin/spark-submit --master yarn --executor-cores 1 --class "SparkWriteEs" /root/spark_es.jar
注意 /root/spark_es.jar 需要替换为您Jar包上传的路径。
-
读数据
cd /usr/lib/spark-current
./bin/spark-submit --master yarn --executor-cores 1 --class "ReadES" /root/spark_es.jar
读数据成功后,打印结果如下。
验证结果
-
登录对应阿里云Elasticsearch实例的Kibana控制台。
具体操作步骤请参见登录Kibana控制台。
-
在左侧导航栏,单击 Dev Tools 。
-
在 Console 中,执行以下命令,查看通过Spark任务写入的数据。
GET company/_search
{
"query": {
"match_all": {}
}
}
查询成功后,返回结果如下。
总结
本文以阿里云Elasticsearch和EMR为例,介绍了如何通过ES-Hadoop,实现Spark读写阿里云Elasticsearch数据。与其他EMR组件相比,ES-Hadoop与Spark的集成,不仅包括RDD,还包括Spark Streaming、scale、DataSet与Spark SQL等,您可以根据需求进行配置。详细信息,请参见Apache Spark support。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)