Elasticsearch使用update_by_query

2023-05-16

    elasticsearch中有一个方法是批量修改,就是先查询出需要修改的索引记录,然后批量修改。这个本来没什么,但是使用过的都知道,用java来调用这个方法很别扭。

    一般来说,我们使用elasticsearch,都建议使用Java Rest Client,就是RestHighLevelClient这个api。这里得从Java Client和Java Rest Client说起了,低版本的elasticsearch提供了ElasticsearchClient的实现,TransportClient,一个传输客户端。在6.4.x 版本中,需要用TransportClient来构建ElasticsearchClient,并且这个client才是实现本文update_by_query所需的client。

    因为我们要使用UpdateByQueryRequestBuilder,所以必须使用ElasticsearchClient,而这个client只能通过TransportClient来构建,在6.4.x版本中,我们还需要引入transport依赖:

<dependency>
  <groupId>org.elasticsearch.client</groupId>
  <artifactId>transport</artifactId>
  <version>6.4.0</version>
</dependency>

    因为RestClient无法满足构建条件。transportclient连接elasticsearch,使用的端口是9300,而不是和restclient一样使用的是http连接的9200。

    对比一下restclient构建:

    elasticsearch开启了两种不同类型的服务端口,transportclient与elasticsearch更加的解耦合。

    前面提到了要使用UpdateByQueryRequestBuilder,就必须使用transport依赖。而改依赖里面就是一个简单的实现类PreBuiltTransportClient。

    前面废话这么多,无非就是现在elasticsearch版本多,依赖版本也多,而且有的api被丢弃,有的api发生了改变,让我们很难捉摸。

    下面我们通过一个简单的示例,了解一下update_by_query,首先通过工具以命令的方式看看执行结果:这里构建一个index=students,type=student的索引,有3条记录,每条记录有一个age字段均为18。

    这里通过执行查询然后修改操作,将student的age全部修改为32:

    操作执行成功,受影响的记录有3条。再次查看所有的索引记录:

    以上这个步骤是通过命令的方式验证了_update_by_query的可行性,我们现在通过java代码的方式来实现这种操作,前面说了,这个操作需要用到ElasticsearchClient,而ElasticsearchClient需要通过TransportClient来构建。这里直接给出源代码,我的pom.xml依赖是这样的:

    java代码:

package com.xxx.elasticsearch;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.UpdateByQueryAction;
import org.elasticsearch.index.reindex.UpdateByQueryRequestBuilder;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

@SuppressWarnings("unchecked")
public class UpdateByQueryApp {
	
	private static final Logger log = LogManager.getLogger(UpdateByQueryApp.class);

	private static ElasticsearchClient client = null;
	
	static{
		try {
			client = new PreBuiltTransportClient(Settings.EMPTY)
					.addTransportAddress(
							new TransportAddress(InetAddress.getByName("127.0.0.1"),9300)
					);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	public static void update(){
		UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction
				.INSTANCE.newRequestBuilder(client);
		Map<String, Object> params = new HashMap<String, Object>();
		params.put("age", 18);
		ScriptType type = ScriptType.INLINE;
		String lang = "painless";
		String code = "ctx._source.age=params.age";
		Script script = new Script(type, lang, code, params);
		BulkByScrollResponse response = updateByQuery.source("students").script(script)
				.filter(QueryBuilders.termQuery("age", "32"))
				.abortOnVersionConflict(false)
				.get();
		log.info("update : "+response.getUpdated());
	}
	
	public static void main(String[] args) {
		update();
	}

}

    前面我们通过命令的方式将所有students索引记录的age修改为了32,这里我们就将所有age=32的记录,全部修改为age=18。运行程序,控制台打印信息如下:

2019-09-04 21:15:02 - org.elasticsearch.plugins.PluginsService.logPluginInfo [main] [INFO ] - no modules loaded
2019-09-04 21:15:02 - org.elasticsearch.plugins.PluginsService.logPluginInfo [main] [INFO ] - loaded plugin [org.elasticsearch.index.reindex.ReindexPlugin]
2019-09-04 21:15:02 - org.elasticsearch.plugins.PluginsService.logPluginInfo [main] [INFO ] - loaded plugin [org.elasticsearch.join.ParentJoinPlugin]
2019-09-04 21:15:02 - org.elasticsearch.plugins.PluginsService.logPluginInfo [main] [INFO ] - loaded plugin [org.elasticsearch.percolator.PercolatorPlugin]
2019-09-04 21:15:02 - org.elasticsearch.plugins.PluginsService.logPluginInfo [main] [INFO ] - loaded plugin [org.elasticsearch.script.mustache.MustachePlugin]
2019-09-04 21:15:02 - org.elasticsearch.plugins.PluginsService.logPluginInfo [main] [INFO ] - loaded plugin [org.elasticsearch.transport.Netty4Plugin]
2019-09-04 21:15:05 - com.xxx.elasticsearch.UpdateByQueryApp.update [main] [INFO ] - update : 3

    表明批量修改成功,可以查看记录:

    需要注意的是,transportclient和restclient他们构建时所需的端口是不一样的,分别是9300和9200。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Elasticsearch使用update_by_query 的相关文章

随机推荐

  • ubuntu开启SSH服务远程登录

    ssh secure shell xff0c 提供安全的远程登录 从事嵌入式开发搭建linux开发环境中 xff0c ssh的服务的安装是其中必不可少的一步 ssh方便一个开发小组中人员登录一台服务器 xff0c 从事代码的编写 编译 运行
  • Python实现让视频自动打码,再也不怕出现少儿不宜的画面了

    人生苦短 我用Python 序言准备工作代码解析完整代码 序言 我们在观看视频的时候 xff0c 有时候会出现一些奇怪的马赛克 xff0c 影响我们的观影体验 xff0c 那么这些马赛克是如何精确的加上去的呢 xff1f 本次我们就来用Py
  • Docker安装nextcloud实验

    Docker安装nextcloud实验 修改验证方式 xff1a 从密钥到密码 sudo passwd root su root vi etc ssh sshd config 去掉下面前的 或修改yes no port 22 Address
  • Tesseract-OCR 字符识别---样本训练

    Tesseract是一个开源的OCR xff08 Optical Character Recognition xff0c 光学字符识别 xff09 引擎 xff0c 可以识别多种格式的图像文件并将其转换成文本 xff0c 目前已支持60多种
  • FPGA与OPENCV的联合仿真

    对于初学者来说 xff0c 图像处理行业 xff0c 最佳仿真方式 xff1a FPGA 43 OPENCV xff0c 因为OPENCV适合商业化 xff0c 适合自己写算法 1 xff09 中间交互数据介质 txt文档 2 xff09
  • 华硕P8Z77-V LX老主板转换卡升级NVMe M2硬盘经验,老主机的福音,质的飞跃

    每年双十一都是淘货升级老家伙的时候 xff0c 今年也不例外 xff0c 随着日子长久 xff0c 软件的增多 xff0c 虽然已经尽量装在系统盘以外的盘 xff0c 但C盘还是日渐不够用 xff0c 从以前的30G系统盘升到60G xff
  • linux 更换 软件源后 GPG错误

    linux 更换 软件源后 GPG错误 linux 软件源 GPG 签名 密钥 linux 更换 软件源后 GPG错误 http my oschina net emptytimespace blog 83633 如文章 1 中提到 xff1
  • ROS2学习笔记(四)-- 用方向键控制小车行走

    简介 xff1a 在上一节的内容中 xff0c 我们通过ROS2的话题发布功能将小车实时视频信息发布了出来 xff0c 同时使用GUI工具进行查看 xff0c 在这一节内容中 xff0c 我们学习一下如何订阅话题并处理话题消息 xff0c
  • flume大数据框架数据采集系统

    flume是cloudera开源的数据采集系统 xff0c 现在是apache基金会下的子项目 xff0c 他是hadoop生态系统的日志采集系统 xff0c 用途广泛 xff0c 可以将日志 网络数据 kafka消息收集并存储在大数据hd
  • flume日志收集系统常见配置

    前面介绍了flume入门实例 xff0c 介绍了配置netcat信源 xff0c 以及memory信道 xff0c logger信宿 xff0c 其实flume常见的信源信道信宿有很多 xff0c 这里介绍flume常用信源的三种方式 xf
  • flume自定义拦截器实现定制收集日志需求

    flume默认提供了timestamp host static regex等几种类型的拦截器 xff0c timestamp host static等拦截器 xff0c 其实就是在消息头中增加了时间戳 xff0c 主机名 xff0c 键值对
  • Eclipse开发mapreduce程序环境搭建

    Eclipse作为一个常用的java IDE xff0c 其使用程度虽然比不上idea那么强大 xff0c 但是对于习惯使用eclipse开发的人来说 xff0c 也不失为一个可以选择的IDE 对于喜欢eclipse开发的人来说 xff0c
  • hdfs常见操作java示例

    我们学习hadoop xff0c 最常见的编程是编写mapreduce程序 xff0c 但是 xff0c 有时候我们也会利用java程序做一些常见的hdfs操作 比如删除一个目录 xff0c 新建一个文件 xff0c 从本地上传一个文件到h
  • MapReduce编程开发之数据去重

    MapReduce就是一个利用分而治之的思想做计算的框架 xff0c 所谓分 xff0c 就是将数据打散 xff0c 分成可以计算的小份 xff0c 治就是将数据合并 xff0c 相同键的数据合并成一个集合 MapReduce并不能解决所有
  • MapReduce编程开发之求平均成绩

    MapReduce计算平均成绩是一个常见的算法 xff0c 本省思路很简单 xff0c 就是将每个人的成绩汇总 xff0c 然后做除法 xff0c 在map阶段 xff0c 是直接将姓名做key 分数作为value输出 在shuffle阶段
  • MapReduce编程开发之数据排序

    MapReduce的数据排序 xff0c 其实没有很复杂的实现 xff0c 默认在shuffle阶段 xff0c MapReduce就帮我们将数据排好序了 xff0c 我们在Map和Reduce阶段 xff0c 无需做额外的操作 MapRe
  • MapReduce编程开发之倒排索引

    倒排索引是词频统计的一个变种 xff0c 其实也是做一个词频统计 xff0c 不过这个词频统计需要加上文件的名称 倒排索引被广泛用来做全文检索 倒排索引最终的结果是一个单词在文件中出现的次数的集合 xff0c 以下面的数据为例 xff1a
  • ROS2学习笔记(五)-- ROS2命令行操作常用指令总结(一)

    简介 xff1a 在前面的章节中 xff0c 我们先简单学习了ROS2的话题发布和订阅 xff0c 两种操作都是通过python代码实现的 xff0c 而在实际应用过程中 xff0c 我们会经常用到命令行操作来辅助调试 xff0c 更进一步
  • 实例演示ElasticSearch索引查询term,match,match_phase,query_string之间的区别

    通常在面试elasticsearch中 xff0c 面试官会问一个关于查询的问题 xff0c 就是term查询和match查询有什么区别 xff1f 如果你对这两个查询不清楚 xff0c 面试官会认为你没有用过elasticsearch x
  • Elasticsearch使用update_by_query

    elasticsearch中有一个方法是批量修改 xff0c 就是先查询出需要修改的索引记录 xff0c 然后批量修改 这个本来没什么 xff0c 但是使用过的都知道 xff0c 用java来调用这个方法很别扭 一般来说 xff0c 我们使