我目前正在尝试将我们的 elasticsearch 数据迁移到 2.0 兼容(即:字段名称中没有点),为从 1.x 升级到 2.x 做准备。
我编写了一个程序,它(批量)运行位于单节点集群中的数据,并重命名字段,使用批量 API 重新索引文档。
在某些时候,一切都会出错,并且从我的查询返回的文档总数(要“升级”)不会改变,即使它应该倒计时。
最初我以为它不起作用。当我选择一个文档并查询它以查看它是否正在更改时,我可以看到它正在工作。
但是,当我查询文档中的特定字段时,我会得到两个具有相同 ID 的结果。其中一项结果具有升级字段,另一项则没有。
经过进一步检查,我可以看到它们来自不同的分片:
{
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
},
"hits" : {
"total" : 2,
"max_score" : 19.059433,
"hits" : [ {
"_shard" : 0,
"_node" : "FxbpjCyQRzKfA9QvBbSsmA",
"_index" : "status",
"_type" : "status",
"_id" : "http://static.photosite.com/80018335.jpg",
"_version" : 2,
"_score" : 19.059433,
"_source":{"url":"http://static.photosite.com/80018335.jpg","metadata":{"url.path":["http://www.photosite.com/80018335"],"source":["http://www.photosite.com/80018335"],"longitude":["104.507755"],"latitude":["21.601669"]}},
...
}, {
"_shard" : 3,
"_node" : "FxbpjCyQRzKfA9QvBbSsmA",
"_index" : "status",
"_type" : "status",
"_id" : "http://static.photosite.com/80018335.jpg",
"_version" : 27,
"_score" : 17.607681,
"_source":{"url":"http://static.photosite.com/80018335.jpg","metadata":{"url_path":["http://www.photosite.com/80018335"],"source":["http://www.photosite.com/80018335"],"longitude":["104.507755"],"latitude":["21.601669"]}},
...
}
}
我怎样才能防止这种情况发生?
弹性搜索版本: 1.7.3
query:
{
"bool" : {
"must" : {
"wildcard" : {
"metadata.url.path" : "*"
}
},
"must_not" : {
"wildcard" : {
"metadata.url_path" : "*"
}
}
}
}
编写文档的代码:
BulkRequestBuilder bulkRequest = destinationConnection.getClient().prepareBulk();
for(Map<String, Object> doc : batch.getDocs()){
XContentBuilder builder;
try {
builder = XContentFactory.jsonBuilder().startObject();
for(Map.Entry<String, Object> mapEntry : doc.entrySet()){
if(!mapEntry.getKey().equals("id")){
builder.field(mapEntry.getKey(), mapEntry.getValue());
}
}
builder.endObject();
} catch (IOException e) {
throw new DocumentBuilderException("Error building request to move items to new parent!", e);
}
bulkRequest.add(destinationConnection.getClient().prepareIndex(destinationIndex, destinationType, (String) doc.get("id")).setSource(builder).request());
}
// Tried with and without setRefresh
BulkResponse response = bulkRequest.setRefresh(true).execute().actionGet();
for(BulkItemResponse itemResponse : response.getItems()){
if(itemResponse.isFailed()){
LOG.error("Updating item: {} failed: {}", itemResponse.getFailure().getId(), itemResponse.getFailureMessage());
}
}
Update
可能是刷新/查询速度?
该程序设置为处理 5000 个文档批次,并且不使用滚动查询,因此我预计每次迭代从该查询返回的结果总数会减少 5000。
事实上这并没有发生。每次迭代从总结果集中删除的文档数量不断减少,直到最终每次迭代都相同:
10:43:42.220 INFO : Fetching another batch
10:43:51.701 INFO : Found 9260992 matching documents. Processing 5000...
10:43:51.794 INFO : Total remaining: 9260992
10:43:51.813 INFO : Writing batch of 5000 items
10:43:57.261 INFO : Fetching another batch
10:44:06.136 INFO : Found 9258661 matching documents. Processing 5000...
10:44:06.154 INFO : Total remaining: 9258661
10:44:06.158 INFO : Writing batch of 5000 items
10:44:11.369 INFO : Fetching another batch
10:44:19.790 INFO : Found 9256813 matching documents. Processing 5000...
10:44:19.804 INFO : Total remaining: 9256813
10:44:19.807 INFO : Writing batch of 5000 items
10:44:22.684 INFO : Fetching another batch
10:44:31.182 INFO : Found 9255697 matching documents. Processing 5000...
10:44:31.193 INFO : Total remaining: 9255697
10:44:31.196 INFO : Writing batch of 5000 items
10:44:33.852 INFO : Fetching another batch
10:44:42.394 INFO : Found 9255115 matching documents. Processing 5000...
10:44:42.406 INFO : Total remaining: 9255115
10:44:42.409 INFO : Writing batch of 5000 items
10:44:45.152 INFO : Fetching another batch
10:44:51.473 INFO : Found 9254744 matching documents. Processing 5000...
10:44:51.483 INFO : Total remaining: 9254744
10:44:51.486 INFO : Writing batch of 5000 items
10:44:53.853 INFO : Fetching another batch
10:44:59.966 INFO : Found 9254551 matching documents. Processing 5000...
10:44:59.978 INFO : Total remaining: 9254551
10:44:59.981 INFO : Writing batch of 5000 items
10:45:02.446 INFO : Fetching another batch
10:45:07.773 INFO : Found 9254445 matching documents. Processing 5000...
10:45:07.787 INFO : Total remaining: 9254445
10:45:07.791 INFO : Writing batch of 5000 items
10:45:10.237 INFO : Fetching another batch
10:45:15.679 INFO : Found 9254384 matching documents. Processing 5000...
10:45:15.703 INFO : Total remaining: 9254384
10:45:15.712 INFO : Writing batch of 5000 items
10:45:18.078 INFO : Fetching another batch
10:45:23.660 INFO : Found 9254359 matching documents. Processing 5000...
10:45:23.712 INFO : Total remaining: 9254359
10:45:23.725 INFO : Writing batch of 5000 items
10:45:26.520 INFO : Fetching another batch
10:45:31.895 INFO : Found 9254343 matching documents. Processing 5000...
10:45:31.905 INFO : Total remaining: 9254343
10:45:31.908 INFO : Writing batch of 5000 items
10:45:34.279 INFO : Fetching another batch
10:45:40.121 INFO : Found 9254333 matching documents. Processing 5000...
10:45:40.136 INFO : Total remaining: 9254333
10:45:40.139 INFO : Writing batch of 5000 items
10:45:42.381 INFO : Fetching another batch
10:45:47.798 INFO : Found 9254325 matching documents. Processing 5000...
10:45:47.823 INFO : Total remaining: 9254325
10:45:47.833 INFO : Writing batch of 5000 items
10:45:50.370 INFO : Fetching another batch
10:45:57.105 INFO : Found 9254321 matching documents. Processing 5000...
10:45:57.117 INFO : Total remaining: 9254321
10:45:57.121 INFO : Writing batch of 5000 items
10:45:59.459 INFO : Fetching another batch
看起来文档重复从一开始就很普遍。
我刚刚尝试了一个集群健康状态为绿色的两节点集群,并且发生了同样的事情。
接下来我将尝试使用没有复制的单节点。
Update:
以下是批量处理器侦听器数据之前/之后的示例:
Before:
Item( id=http://static.photosite.com/20160123_093502.jpg, index=status, type=status, op_type=INDEX, version=-3, parent=null, routing=null )
之后(BulkResponse 表明没有失败):
Item( id=http://static.photosite.com/20160123_093502.jpg, index=status, type=status, op_type=index, version=22)
注意事项:
- 没有父母
- 无路由
- 文档版本大幅跃升
此代码片段还没有明确指出,beforeBulk 请求中的每个项目都在 afterBulk 请求详细信息中表示为成功的 IndexRequest(即:没有丢失)。
Update 2
我认为最初的负面版本可能与此有关:https://discuss.elastic.co/t/negative-version-number-on-snapshot-restore-from-s3-bucket/56642 https://discuss.elastic.co/t/negative-version-number-on-snapshot-restore-from-s3-bucket/56642
Update 3
我刚刚发现,当我使用curl 查询文档时,版本是肯定的,即:
- 恢复快照。
- 使用curl查询文档,版本为2
- 使用java API查询文档,版本为-1
- 重新索引文档会导致版本为 1 的重复(具有相同 ID 的新文档写入不同的分片)。
这里发生了什么事?