一、前言
最近工作中有这样一个ElasticSearch(以下简称ES)写入的场景,Flink处理完数据实时写入ES。现在需要将一批历史数据通过Flink加载到到ES,有两个点需要保证:
- 对于历史数据,ES已有文档,则舍弃旧数据,ES没有则插入历史数据。
- 对于新数据,能对现有的ES数据进行更新。
参考ElasticSearch进阶篇(一)–版本控制,可以使用ES的版本实现该需求的开发。
二、代码实现及验证
代码实现
请求写数据时加入version和version_type参数,主要代码如下:
IndexRequest indexRequest = Requests.indexRequest()
.index(indexName)
.id("1")