ElasticSearch Nest BulkAll 在收到无法从 _bulk 重试的失败后停止

2024-04-10

Using BulkAll()批量插入时我收到这个奇怪的错误

BulkAll halted after receiving failures that can not be retried from _bulk

但是,当我检查异常时,我仍然得到成功的响应:

Successful low level call on POST: /cf-lblogs-2019.01.23/cloudflareloadbalancinglogelasticentity/_bulk?

我在这里做错了什么?下面是代码片段:

var waitHandle = new CountdownEvent(1);

var bulk = _client.BulkAll(group.ToList(), a => a
                .Index(_index.Replace("*", string.Empty) + group.Key)
                .BackOffRetries(2)
                .BackOffTime("30s")
                .RefreshOnCompleted(true)
                .MaxDegreeOfParallelism(4)
                .Size(group.Count()));

bulk.Subscribe(new BulkAllObserver(
                onNext: response => _logger.LogInformation($"Indexed {response.Page * group.Count()} with {response.Retries} retries"),
                onError: HandleInsertError,
                onCompleted: () => waitHandle.Signal()
            ));

waitHandle.Wait();


private void HandleInsertError(Exception e)
    {
        var exceptionString = e.ToString(); 
        _logger.LogError(exceptionString);
    }

巢 6.4.2。

弹性6.5.4。


就我而言,我已经解决了,如下所示:

        List<string> errors = new List<string>();
        int seenPages = 0;
        int requests = 0;
        CancellationTokenSource tokenSource = new CancellationTokenSource();
        ConcurrentBag<BulkResponse> bulkResponses = new ConcurrentBag<BulkResponse>();
        ConcurrentBag<BulkAllResponse> bulkAllResponses = new ConcurrentBag<BulkAllResponse>();
        ConcurrentBag<items> deadLetterQueue = new ConcurrentBag<items>();
        BulkAllObservable<items> observableBulk = elasticClient.BulkAll(lst, f => f
                .MaxDegreeOfParallelism(Environment.ProcessorCount)
                .BulkResponseCallback(r =>
                {
                    bulkResponses.Add(r);
                    Interlocked.Increment(ref requests);
                })
                .ContinueAfterDroppedDocuments()
                .DroppedDocumentCallback((r, o) =>
                {
                    errors.Add(r.Error.Reason);
                    deadLetterQueue.Add(o);
                })
                .BackOffTime(TimeSpan.FromSeconds(5))
                .BackOffRetries(2)
                .Size(1000)
                .RefreshOnCompleted()
                .Index(indeksName)
                .BufferToBulk((r, buffer) => r.IndexMany(buffer))
            , tokenSource.Token);

        try
        {
            observableBulk.Wait(TimeSpan.FromMinutes(15), b =>
            {
                bulkAllResponses.Add(b);
                Interlocked.Increment(ref seenPages);
            });
        }
        catch (Exception e)
        {
            Console.WriteLine("Exxx => " + e.Message);
        }
        foreach (var err in errors)
        {
            Console.WriteLine("Error : " + err);
        }

我希望它可以帮助其他也有这个问题的人。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

ElasticSearch Nest BulkAll 在收到无法从 _bulk 重试的失败后停止 的相关文章

  • 通过 SocketCAN 进行 boost::asio

    我正在考虑利用升压阿西奥 http www boost org doc libs 1 49 0 doc html boost asio html从a读取数据套接字CAN http en wikipedia org wiki SocketCA
  • 使用 mono/nunit-console/4 在 Mac OS X 控制台上运行测试

    我安装了 Max OS X 10 11 1 上面装有 Xamarin 我编写了简单的测试类 只是为了测试在 Mac OS X 和 Ubuntu 上运行 Nunit 测试 该类实际上有一个返回字符串的方法 using System names
  • 在 C# 中按元素相乘数组具有意想不到的性能

    我想找到按元素相乘两个数组的最佳方法 这是更广泛项目的一部分 其中性能而不是唯一的考虑因素 我今天开始用 C Linqpad 编写一些函数 因此它还没有以任何方式进行优化 下面代码的输出如下 Environment ProcessorCou
  • 如何保证对象只有一个线程

    我有以下代码 class Service public void start creates thread which creates window and goes to message loop void stop sends WM C
  • ASP.NET Web API 客户端 ProgressMessageHandler Post 任务卡在 WinForm 应用程序中

    我在用着HttpClient and ProgressMessageHandler来自MS ASP NET Web API 客户端库 http nuget org packages Microsoft AspNet WebApi Clien
  • 读取 C# 中的默认应用程序设置

    我的自定义网格控件有许多应用程序设置 在用户范围内 其中大部分是颜色设置 我有一个表单 用户可以在其中自定义这些颜色 并且我想添加一个用于恢复默认颜色设置的按钮 如何读取默认设置 例如 我有一个名为的用户设置CellBackgroundCo
  • 时间:2019-03-17 标签:c#ThreadSafeDeepCopy

    我一直在阅读很多其他问题以及大量谷歌搜索 但我一直无法找到明确的解决方案 根据我读过的一些最佳实践 类的静态方法应该创建线程安全的 并且实例成员应该将线程安全留给消费者 我想为该类实现深度复制方法 该类本身还有其他引用类型成员 有没有什么方
  • 为什么 set_symmetry_difference 无法与比较器一起使用?

    Example program include
  • 如何从文本文件读取整数到数组

    这就是我想做的 我对此有些不满 但我希望你能容忍我 这对我来说是一个非常新的概念 1 在我的程序中 我希望创建一个包含 50 个整数的数组来保存来自文件的数据 我的程序必须获取用户的文档文件夹的路径 2 文件的名称为 grades txt
  • 将二进制数据从 C# 上传到 PHP

    我想将文件从 Windows C 应用程序上传到运行 PHP 的 Web 服务器 我知道 WebClient UploadFile 方法 但我希望能够分块上传文件 以便我可以监控进度并能够暂停 恢复 因此 我正在读取文件的一部分并使用 We
  • 给出 5 个参数,但在终端中只得到 3 个参数

    我想将一个文件传递给一个c 程序 如果我在 IDE 中执行此操作 test string string lt test txt return argc 5 但在终端上我刚刚得到argc 3 看来 这是因为 什么是 lt 意思是 我正在使用
  • 每个租户的唯一用户名和电子邮件

    我正在使用以下代码编写多租户应用程序ASP NET Core 2 1 我想覆盖默认的与用户创建相关的验证机制 目前我无法创建多个具有相同的用户UserName My ApplicationUser模型有一个名为TenantID 我想要实现的
  • ASP.NET MailMessage.BodyEncoding 和 MailMessage.SubjectEncoding 默认值

    很简单的问题 但我在 MSDN 上找不到答案 查找 ASP NET 将用于的默认值 MailMessage BodyEncoding and MailMessage SubjectEncoding 如果你不在代码中设置它们 Thanks F
  • C# 中的 strstr() 等效项

    我有两个byte 我想找到第二个的第一次出现byte 在第一个byte 或其中的一个范围 我不想使用字符串来提高效率 翻译第一个byte to a string会效率低下 基本上我相信就是这样strstr 在 C 中做 最好的方法是什么 这
  • Elastic Search 索引经常被删除[关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我正在 google cloud 上对个人项目运行弹性搜索 并将其用作我的应用程序的搜索索引 从最近三天开始 索引就被神秘地删除了 我不知
  • 我可以让 ungetc 取消阻止阻塞的 fgetc 调用吗?

    我想在收到 SIGUSR1 后使用 ungetc 将 A 字符重新填充到标准输入中 想象一下我有充分的理由这样做 调用 foo 时 stdin 中的阻塞读取不会被收到信号时的 ungetc 调用中断 虽然我没想到它会按原样工作 但我想知道是
  • IEnumerable.Except 不起作用,那么我该怎么办?

    我有一个 linq to sql 数据库 非常简单 我们有 3 个表 项目和用户 有一个名为 User Projects 的连接表将它们连接在一起 我已经有了一个获得的工作方法IEnumberable
  • 为 Logstash 中的新字段设置 Elasticsearch Analyzer

    通过使用GROK filter 我们可以向Logstash添加新字段 但是 我想知道如何为该特定字段设置分析器 例如 我有一个新的 id 字段 其中有一个字段 例如a b 但是 Elasticsearch 附带的普通分析器会将其分解为a a
  • 使用restsharp序列化对象并将其传递给WebApi而不是序列化列表

    我有一个看起来像的视图模型 public class StoreItemViewModel public Guid ItemId get set public List
  • 使用taskkill停止Windows服务

    我需要帮助来使用 C 终止 Windows 服务 现在要终止该服务 请使用以下选项 从命令 sc queryex ServiceName 发现后PID服务的 taskkill pid 1234 exemple f 为了便于阅读 但如果您明白

随机推荐