通过ES-Hadoop实现Spark读写Elasticsearch数据

2023-05-16

ES-Hadoop是Elasticsearch推出的专门用于对接Hadoop生态的工具,可以让数据在Elasticsearch和Hadoop之间双向移动,无缝衔接Elasticsearch与Hadoop服务,充分使用Elasticsearch的快速搜索及Hadoop批处理能力,实现交互式数据处理。本文介绍如何通过ES-Hadoop实现Hadoop的Hive服务读写Elasticsearch数据。

Spark是一种通用的大数据计算框架,拥有Hadoop MapReduce所具有的计算优点,能够通过内存缓存数据为大型数据集提供快速的迭代功能。与MapReduce相比,减少了中间数据读取磁盘的过程,进而提高了处理能力。本文介绍如何通过ES-Hadoop实现Hadoop的Spark服务读写Elasticsearch数据。

阿里云Elasticsearch兼容开源Elasticsearch的功能,以及Security、Machine Learning、Graph、APM等商业功能,致力于数据分析、数据搜索等场景服务。支持5.5.3、6.3.2、6.7.0、6.8.0和7.4.0等版本,并提供了商业插件X-Pack服务。在开源Elasticsearch的基础上提供企业级权限管控、安全监控告警、自动报表生成等功能。本文使用阿里云Elasticsearch为您演示,单击此处即可免费试用。

准备工作

  1. 创建阿里云Elasticsearch实例,并开启自动创建索引功能。

    具体操作步骤请参见创建阿里云Elasticsearch实例和开启自动创建索引。本文以6.7.0版本的实例为例。
    注意 在生产环境中,建议关闭自动创建索引功能,提前创建好索引和Mapping。由于本文仅用于测试,因此开启了自动创建索引功能。

  2. 创建与Elasticsearch实例在同一专有网络下的E-MapReduce(以下简称EMR)实例。

    实例配置如下:

  • 产品版本:EMR-3.29.0

  • 必选服务:Spark(2.4.5),其他服务保持默认

    具体操作步骤,请参见创建集群。

    注意 Elasticsearch实例的私网访问白名单默认为0.0.0.0/0,您可在安全配置页面查看,如果未使用默认配置,您还需要在白名单中加入EMR集群的内网IP地址:

    • 请参见查看集群列表与详情,获取EMR集群的内网IP地址。

    • 请参见配置ES公网或私网访问白名单,配置Elasticsearch实例的VPC私网访问白名单。

  1. 准备Java环境,要求JDK版本为8.0及以上。

编写并运行Spark任务

  1. 准备测试数据。

    1. 登录E-MapReduce控制台,获取Master节点的IP地址,并通过SSH登录对应的ECS机器。

      具体操作步骤,请参见使用SSH连接主节点。

    2. 将测试数据写入文件中。

      本文使用的JSON数据示例如下,将该数据保存在 http_log.txt 文件中。

      {"id": 1, "name": "zhangsan", "birth": "1990-01-01", "addr": "No.969, wenyixi Rd, yuhang, hangzhou"}
      {"id": 2, "name": "lisi", "birth": "1991-01-01", "addr": "No.556, xixi Rd, xihu, hangzhou"}
      {"id": 3, "name": "wangwu", "birth": "1992-01-01", "addr": "No.699 wangshang Rd, binjiang, hangzhou"}
      
    3. 执行以下命令,将测试数据上传至EMR Master节点的 tmp/hadoop-es 文件中。

      hadoop fs -put http_log.txt /tmp/hadoop-es
      
  2. 配置pom依赖。

    创建Java Maven工程,并将如下的pom依赖添加到Java工程的pom.xml文件中。

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>2.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-20_2.11</artifactId>
            <version>6.7.0</version>
        </dependency>
    </dependencies>
    

    注意 请确保pom依赖中版本与云服务对应版本保持一致,例如elasticsearch-spark-20_2.11版本与阿里云Elasticsearch版本一致;spark-core_2.12与HDFS版本一致。

  3. 编写示例代码。

    1. 写数据

      以下示例代码用来将测试数据写入Elasticsearch的company索引中。

      import java.util.Map;
      import java.util.concurrent.atomic.AtomicInteger;
      import org.apache.spark.SparkConf;
      import org.apache.spark.SparkContext;
      import org.apache.spark.api.java.JavaRDD;
      import org.apache.spark.api.java.function.Function;
      import org.apache.spark.sql.Row;
      import org.apache.spark.sql.SparkSession;
      import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
      import org.spark_project.guava.collect.ImmutableMap;
      public class SparkWriteEs {
          public static void main(String[] args) {
              SparkConf conf = new SparkConf();
              conf.setAppName("Es-write");
              conf.set("es.nodes", "es-cn-n6w1o1x0w001c****.elasticsearch.aliyuncs.com");
              conf.set("es.net.http.auth.user", "elastic");
              conf.set("es.net.http.auth.pass", "xxxxxx");
              conf.set("es.nodes.wan.only", "true");
              conf.set("es.nodes.discovery","false");
              conf.set("es.input.use.sliced.partitions","false");
              SparkSession ss = new SparkSession(new SparkContext(conf));
              final AtomicInteger employeesNo = new AtomicInteger(0);
              //以下的/tmp/hadoop-es/http_log.txt需要替换为您测试数据的路径。
              JavaRDD<Map<Object, ?>> javaRDD = ss.read().text("/tmp/hadoop-es/http_log.txt")
                      .javaRDD().map((Function<Row, Map<Object, ?>>) row -> ImmutableMap.of("employees"   employeesNo.getAndAdd(1), row.mkString()));
              JavaEsSpark.saveToEs(javaRDD, "company/_doc");
          }
      }
      
    2. 读数据

      以下示例代码用来读取上一步写入Elasticsearch的数据,并进行打印。

      import org.apache.spark.SparkConf;
      import org.apache.spark.api.java.JavaPairRDD;
      import org.apache.spark.api.java.JavaSparkContext;
      import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
      import  java.util.Map;
      public class ReadES {
          public static void main(String[] args) {
              SparkConf  conf = new SparkConf().setAppName("readEs").setMaster("local[*]")
                      .set("es.nodes", "es-cn-n6w1o1x0w001c****.elasticsearch.aliyuncs.com")
                      .set("es.port", "9200")
                      .set("es.net.http.auth.user", "elastic")
                      .set("es.net.http.auth.pass", "xxxxxx")
                      .set("es.nodes.wan.only", "true")
                      .set("es.nodes.discovery","false")
                      .set("es.input.use.sliced.partitions","false")
                      .set("es.resource", "company/_doc")
                      .set("es.scroll.size","500");
              JavaSparkContext sc = new JavaSparkContext(conf);
              JavaPairRDD<String, Map<String, Object>> rdd = JavaEsSpark.esRDD(sc);
              for ( Map<String, Object> item : rdd.values().collect()) {
                  System.out.println(item);
              }
              sc.stop();
          }
      }
      

    参数说明

    参数默认值说明
    es.nodeslocalhost指定阿里云Elasticsearch实例的访问地址,建议使用内网地址,可在实例的基本信息页面查看。更多信息,请参见查看实例的基本信息。
    es.port9200Elasticsearch实例的访问端口号。
    es.net.http.auth.userelasticElasticsearch实例的访问用户名。
    说明 如果程序中指定elastic账号访问Elasticsearch服务,后续在修改elastic账号对应密码后需要一些时间来生效,在密码生效期间会影响服务访问,因此不建议通过elastic来访问。建议在Kibana控制台中创建一个符合预期的Role角色用户进行访问,详情请参见创建角色和创建用户。
    es.net.http.auth.pass/对应用户的密码,在创建实例时指定。如果忘记可进行重置,具体操作步骤,请参见重置实例访问密码。
    es.nodes.wan.onlyfalse开启Elasticsearch集群在云上使用虚拟IP进行连接,是否进行节点嗅探:
    true:设置; false:不设置
    es.nodes.discoverytrue是否禁用节点发现:
    true:禁用 ;false:不禁用
    注意 使用阿里云Elasticsearch,必须将此参数设置为false。
    es.input.use.sliced.partitionstrue是否使用slice分区:
    true:使用。设置为true,可能会导致索引在预读阶段的时间明显变长,有时会远远超出查询数据所耗费的时间。建议设置为false,以提高查询效率; false:不使用。
    es.index.auto.createtrue通过Hadoop组件向Elasticsearch集群写入数据,是否自动创建不存在的index:
    true:自动创建 ; false:不会自动创建
    es.resource/指定要读写的index和type。
    es.mapping.names/表字段与Elasticsearch的索引字段名映射。

    更多的ES-Hadoop配置项说明,请参见官方配置说明。

  4. 将代码打成Jar包,上传至EMR客户端机器(例如Gateway或EMR集群主节点)。

  5. 在EMR客户端机器上,运行如下命令执行Spark程序。

    • 写数据

      cd /usr/lib/spark-current
      ./bin/spark-submit  --master yarn --executor-cores 1 --class "SparkWriteEs" /root/spark_es.jar
      

      注意 /root/spark_es.jar 需要替换为您Jar包上传的路径。

    • 读数据

      cd /usr/lib/spark-current
      ./bin/spark-submit  --master yarn --executor-cores 1 --class "ReadES"  /root/spark_es.jar
      

      读数据成功后,打印结果如下。在这里插入图片描述

验证结果

  1. 登录对应阿里云Elasticsearch实例的Kibana控制台。

    具体操作步骤请参见登录Kibana控制台。

  2. 在左侧导航栏,单击 Dev Tools

  3. Console 中,执行以下命令,查看通过Spark任务写入的数据。

    GET company/_search
    {
      "query": {
        "match_all": {}
      }
    }
    

    查询成功后,返回结果如下。在这里插入图片描述

总结

本文以阿里云Elasticsearch和EMR为例,介绍了如何通过ES-Hadoop,实现Spark读写阿里云Elasticsearch数据。与其他EMR组件相比,ES-Hadoop与Spark的集成,不仅包括RDD,还包括Spark Streaming、scale、DataSet与Spark SQL等,您可以根据需求进行配置。详细信息,请参见Apache Spark support。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

通过ES-Hadoop实现Spark读写Elasticsearch数据 的相关文章

随机推荐

  • Go语言操作数据库MySQL

    连接 Go语言中的database sql包提供了保证SQL或类SQL数据库的泛用接口 xff0c 并不提供具体的数据库驱动 使用database sql包时必须注入 xff08 至少 xff09 一个数据库驱动 我们常用的数据库基本上都有
  • 解决Git请求错误问题

    git clone gits github com Cloning into 39 FdogSerialize 39 git 39 remote gits 39 is not a git command See 39 git help 39
  • Reactor 模式

    Reactor 翻译过来的意思是 反应堆 xff0c 可能大家会联想到物理学里的核反应堆 xff0c 实际上并不是的这个意思 这里的反应指的是 对事件反应 xff0c 也就是来了一个事件 xff0c Reactor 就有相对应的反应 响应
  • MATLAB画图调整分辨率

    问题 xff1a 经常需要用MATLAB画图 xff0c 但是保存之后分辨率不高 xff0c 特别是需要放大的情况下 解决 xff1a 对于下面这种画出的图形 选择 文件 61 gt 导出设置 61 gt 渲染 61 gt 分辨率 选择60
  • C语言中常见的逻辑错误

    常见错误一 xff1a 61 和 61 61 混在一起 int main int ret if ret 61 1 return 0 结果 xff1a 变量被错误赋值 xff0c 逻辑判断错误 错误二 xff1a 定义较大的全局变量造成 编译
  • Qt中常见的位置和尺寸

    QPoint类的介绍 QPoint 类封装了我们常用用到的坐标点 x y 常用的 API 如下 构造函数 构造一个坐标原点 即 0 0 QPoint QPoint 参数为 x轴坐标 y轴坐标 QPoint QPoint int xpos i
  • 关于QT线程运用的三种方式

    QThread 类函数 QThread 类常用 API 构造函数 QThread QThread QObject parent 61 Q NULLPTR 判断线程中的任务是不是处理完毕了 bool QThread isFinished co
  • 安装Ubuntu22.04+nvidia驱动+CUDA-11.7+GRPMACS patch PLUMED

    首先是Ubuntu22 4的安装 Ubuntu系统一般直接可以使用RUFUS软件制作U盘启动项 xff0c 再依照顺序安装Ubuntu系统 xff0c 这里不赘述 CUDA 11 7 span class token function su
  • Linux部署Nexus私服

    这篇文章主要介绍了Linux搭建自己Nexus私服的实现方法 xff0c 文中通过示例代码介绍的非常详细 xff0c 对大家的学习或者工作具有一定的参考学习价值 一 Nexus介绍 对maven来说仓库分为两类 xff1a 本地仓库和远程仓
  • 元学习和机器学习的对比

    目录 引言机器学习元学习什么是元学习元学习的流程学习学习函数评价学习函数好坏迭代优化 整体框架 元学习和机器学习的对比定义的区别数据集划分的区别损失函数的区别两者之间的共通之处 总结 引言 本篇博客是李宏毅老师元学习课程的笔记 深度学习大部
  • 如何使用C++实现10个数的冒泡排序

    96 96 冒泡排序是一种计算机科学领域的较简单的排序算法 xff0c 是一种简单的适合初学者学习的算法 上图为冒泡排序简单的图片理解 xff0c 将第一个数依次与后面的数进行比较 将数值大的数沉到底部或将数值小的数浮到顶部 简称 大数沉淀
  • 通过Cerebro访问Elasticsearch

    本文以阿里云Elasticsearch为例 xff0c 介绍通过Cerebro访问Elasticsearch的方法 阿里云Elasticsearch兼容开源Elasticsearch的功能 xff0c 以及Security Machine
  • 手把手教您完成Elasticsearch数据迁移

    您可以通过Logstash reindex和OSS等多种方式在Elasticsearch之间迁移数据 本文以阿里云Elasticsearch xff08 简称ES xff09 为例 xff0c 介绍阿里云Elasticsearch间数据迁移
  • Solr集群数据迁移至Elasticsearch

    Elasticsearch是一款非常强大的搜索引擎 xff0c 可以让你在海量的数据中快速找到想要的内容 例如 xff0c 代码搜索 xff1a 可以帮助您找到相应的代码仓库 xff0c 还可以实现代码级的搜索及高亮显示 xff1b 网上s
  • k8s创建Deployment报错:no matches for kind “Deployment“ in version “extensions/v1beta1“

    报错类型 xff1a root 64 master kubectl create f lzb test yaml error unable to recognize 34 lzb test yaml 34 no matches for ki
  • 3分钟学会使用Elasticsearch跨集群复制功能(CCR)

    当您需要将本地Elasticsearch集群中的索引数据迁移到一个远程集群中 xff0c 或者将一个远程集群中的索引数据迁移到本地集群 xff0c 可通过跨集群复制CCR xff08 Cross Cluster Replication xf
  • 通过Elasticsearch和rsbeat实时分析Redis slowlog

    Redis是目前流行的高性能key value数据库 xff0c 但如果使用不当 xff0c 很容易出现慢查询 慢查询过多或者一个时间较长 xff08 例如20s xff09 的慢查询会导致操作队列 xff08 Redis是单进程 xff0
  • 通过Monstache实时同步MongoDB数据到Elasticsearch

    当您的业务数据存储在MongoDB中 xff0c 并且需要进行语义分析和大图展示时 xff0c 可借助Elasticsearch实现全文搜索 语义分析 可视化展示等 本文介绍如何通过Monstache将MongoDB数据实时同步至Elast
  • 【必入】云虚拟主机怎么屏蔽指定的IP地址

    解决方法 可通过下面2种方法屏蔽指定的IP地址 xff0c 具体操作步骤请点击此链接 xff1a https help aliyun com knowledge detail 36226 html 通过 htaccess文件屏蔽指定的IP地
  • 通过ES-Hadoop实现Spark读写Elasticsearch数据

    ES Hadoop是Elasticsearch推出的专门用于对接Hadoop生态的工具 xff0c 可以让数据在Elasticsearch和Hadoop之间双向移动 xff0c 无缝衔接Elasticsearch与Hadoop服务 xff0