Using BulkAll()
批量插入时我收到这个奇怪的错误
BulkAll halted after receiving failures that can not be retried from _bulk
但是,当我检查异常时,我仍然得到成功的响应:
Successful low level call on POST: /cf-lblogs-2019.01.23/cloudflareloadbalancinglogelasticentity/_bulk?
我在这里做错了什么?下面是代码片段:
var waitHandle = new CountdownEvent(1);
var bulk = _client.BulkAll(group.ToList(), a => a
.Index(_index.Replace("*", string.Empty) + group.Key)
.BackOffRetries(2)
.BackOffTime("30s")
.RefreshOnCompleted(true)
.MaxDegreeOfParallelism(4)
.Size(group.Count()));
bulk.Subscribe(new BulkAllObserver(
onNext: response => _logger.LogInformation($"Indexed {response.Page * group.Count()} with {response.Retries} retries"),
onError: HandleInsertError,
onCompleted: () => waitHandle.Signal()
));
waitHandle.Wait();
private void HandleInsertError(Exception e)
{
var exceptionString = e.ToString();
_logger.LogError(exceptionString);
}
巢 6.4.2。
弹性6.5.4。
就我而言,我已经解决了,如下所示:
List<string> errors = new List<string>();
int seenPages = 0;
int requests = 0;
CancellationTokenSource tokenSource = new CancellationTokenSource();
ConcurrentBag<BulkResponse> bulkResponses = new ConcurrentBag<BulkResponse>();
ConcurrentBag<BulkAllResponse> bulkAllResponses = new ConcurrentBag<BulkAllResponse>();
ConcurrentBag<items> deadLetterQueue = new ConcurrentBag<items>();
BulkAllObservable<items> observableBulk = elasticClient.BulkAll(lst, f => f
.MaxDegreeOfParallelism(Environment.ProcessorCount)
.BulkResponseCallback(r =>
{
bulkResponses.Add(r);
Interlocked.Increment(ref requests);
})
.ContinueAfterDroppedDocuments()
.DroppedDocumentCallback((r, o) =>
{
errors.Add(r.Error.Reason);
deadLetterQueue.Add(o);
})
.BackOffTime(TimeSpan.FromSeconds(5))
.BackOffRetries(2)
.Size(1000)
.RefreshOnCompleted()
.Index(indeksName)
.BufferToBulk((r, buffer) => r.IndexMany(buffer))
, tokenSource.Token);
try
{
observableBulk.Wait(TimeSpan.FromMinutes(15), b =>
{
bulkAllResponses.Add(b);
Interlocked.Increment(ref seenPages);
});
}
catch (Exception e)
{
Console.WriteLine("Exxx => " + e.Message);
}
foreach (var err in errors)
{
Console.WriteLine("Error : " + err);
}
我希望它可以帮助其他也有这个问题的人。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)