如何对 IEnumerable 进行分块,而不会在失败时丢失/丢弃项目?

2024-04-21

我有一个生产者-消费者场景,其中生产者是一个可枚举的项目序列(IEnumerable<Item>)。我想以大块/批次的方式处理这些项目,每个项目 10 个。所以我决定使用新的(.NET 6)Chunk https://learn.microsoft.com/en-us/dotnet/api/system.linq.enumerable.chunkLINQ 运算符,如这个问题中所建议的:在 LINQ 中创建批处理 https://stackoverflow.com/questions/13731796/create-batches-in-linq.

我的问题是,有时生产者会失败,在这种情况下,分块序列的消费者会收到错误,而无需先接收包含错误之前生成的最后一个项目的块。因此,例如,如果生产者生成 15 个项目然后失败,则消费者将获得包含项目 1-10 的块,然后会出现异常。 11-15号物品将会丢失!这是一个演示这种不良行为的最小示例:

static IEnumerable<int> Produce()
{
    int i = 0;
    while (true)
    {
        i++;
        Console.WriteLine($"Producing #{i}");
        yield return i;
        if (i == 15) throw new Exception("Oops!");
    }
}

// Consume
foreach (int[] chunk in Produce().Chunk(10))
{
    Console.WriteLine($"Consumed: [{String.Join(", ", chunk)}]");
}

Output:

Producing #1
Producing #2
Producing #3
Producing #4
Producing #5
Producing #6
Producing #7
Producing #8
Producing #9
Producing #10
Consumed: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Producing #11
Producing #12
Producing #13
Producing #14
Producing #15
Unhandled exception. System.Exception: Oops!
   at Program.<Main>g__Produce|0_0()+MoveNext()
   at System.Linq.Enumerable.ChunkIterator[TSource](IEnumerable`1 source, Int32 size)+MoveNext()
   at Program.Main()

在线演示 https://dotnetfiddle.net/Wz77NZ.

理想的行为是获取大量的值[11, 12, 13, 14, 15]在获得异常之前。

我的问题是:有什么办法可以配置Chunk运算符以便优先发出数据而不是异常?如果没有,我如何实现自定义 LINQ 运算符,例如命名ChunkNonDestructive,具有理想的行为?

public static IEnumerable<TSource[]> ChunkNonDestructive<TSource>(
    this IEnumerable<TSource> source, int size);

Note:除了从System.Linq.Chunk运算符我也尝试过Buffer运营商从系统交互 https://www.nuget.org/packages/System.Interactive/包,以及Batch https://morelinq.github.io/3.3/ref/api/html/Overload_MoreLinq_MoreEnumerable_Batch.htm运营商从MoreLinq https://www.nuget.org/packages/morelinq/包裹。显然他们的行为都是一样的(破坏性的)。


Update:以下是上述示例的理想输出:

Producing #1
Producing #2
Producing #3
Producing #4
Producing #5
Producing #6
Producing #7
Producing #8
Producing #9
Producing #10
Consumed: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Producing #11
Producing #12
Producing #13
Producing #14
Producing #15
Consumed: [11, 12, 13, 14, 15]
Unhandled exception. System.Exception: Oops!
   at Program.<Main>g__Produce|0_0()+MoveNext()
   at System.Linq.Enumerable.ChunkIterator[TSource](IEnumerable`1 source, Int32 size)+MoveNext()
   at Program.Main()

区别就是线Consumed: [11, 12, 13, 14, 15],这并不存在于实际输出中。


如果您对源进行预处理以使其在遇到异常时停止,那么您可以使用Chunk() as-is.

public static class Extensions
{
    public static IEnumerable<T> UntilFirstException<T>(this IEnumerable<T> source, Action<Exception> exceptionCallback = null)
    {
        using var enumerator = source.GetEnumerator();
        while (true)
        {
            T current;
            try
            {
                if (!enumerator.MoveNext())
                {
                    break;
                }
                current = enumerator.Current;
            }
            catch (Exception e)
            {
                exceptionCallback?.Invoke(e);
                break;
            }
            yield return current;
        }
    }
}
    Exception? e = null;
    foreach (int[] chunk in Produce().UntilFirstException(thrown => e = thrown).Chunk(10))
    {
        Console.WriteLine($"Consumed: [{String.Join(", ", chunk)}]");
    }

我觉得这样可以很好地分离职责。如果您想要一个抛出异常的帮助程序,而不必自己捕获异常,则可以将其用作组件来简化该帮助程序的编写:

    public static IEnumerable<T[]> ChunkUntilFirstException<T>(this IEnumerable<T> source, int size)
    {
        Exception? e = null;
        var result = source.UntilFirstException(thrown => e = thrown).Chunk(size);
        foreach (var element in result)
        {
            yield return element;
        }
        if (e != null)
        {
            throw new InvalidOperationException("source threw an exception", e);
        }
    }

请注意,这将引发与生产者发出的异常不同的异常。这使您可以保留与原始异常关联的堆栈跟踪,而throw e会覆盖该堆栈跟踪。

您可以根据您的需要进行调整。如果您需要捕获您期望生产者发出的特定类型的异常,那么使用when具有某种模式匹配的上下文关键字。

    try
    {
        foreach (int[] chunk in Produce().ChunkUntilFirstException(10))
        {
            Console.WriteLine($"Consumed: [{String.Join(", ", chunk)}]");
        }
    }
    catch (InvalidOperationException e) when (e.InnerException is {Message: "Oops!"})
    {
        Console.WriteLine(e.InnerException.ToString());
    }
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何对 IEnumerable 进行分块,而不会在失败时丢失/丢弃项目? 的相关文章

随机推荐

  • 如何使用 Xcode 5 本地化我的应用程序?

    这是关于的后续问题 和答案 如何使用 Xcode 4 本地化我的应用程序 https stackoverflow com questions 5349066 how to localize my app with xcode 4 11282
  • Angular 2:实现自定义上下文菜单

    我正在实现 Angular 2 属性指令 以允许我向元素添加自定义上下文菜单 如下所示 p Hello world p 该指令添加了一个鼠标事件处理程序来捕获右键单击 其想法是构建一个上下文菜单 将其添加到 DOM 然后在用户完成操作时销毁
  • Clojure gen-class 返回自己的类

    我现在正在使用 Clojure 创建一个类对象 它有一个返回对象本身的方法 用Java编写的 我想要制作的对象是这样的 class Point public double x public double y public Point dou
  • 静态与非静态方法

    假设您有一些可以在非静态类中设为静态的方法 例如 private double power double a double b return Math Pow a b 您认为将方法签名更改为静态有什么好处吗 在上面的例子中 private
  • docker-compose 相当于 docker run --init 吗?

    根据https github com krallin tini using tini https github com krallin tini using tini tini内置于docker中 可以通过传递 init标记为docker
  • docker 容器中 PostgreSQL 的权限问题

    我正在尝试使用 PostgreSQL 运行一个 docker 映像 该映像配置了一个用于持久数据的卷 docker compose yml version 3 1 services db image postgres restart alw
  • 启动 StepFunction 并退出不会触发执行

    我有 Lambda 函数tranportKickoff它接收输入 然后将输入发送 代理到阶跃函数 下面的代码does运行 我没有收到任何错误 但同时步骤函数没有执行 对于设计也很重要 我不希望transportKickoff函数等待步骤函数
  • Mongoose Population: CastError: 路径“_id”处的值“[object Object]”转换为 ObjectId 失败

    遇到一个CastError在 Mongoose 中填充嵌套 ObjectId 引用时 值 显然是valid 只要它们在保存到架构时不会被阻止 有兴趣在服务器端解决此问题以防止将来出现格式错误的数据 但是 我知道不从客户端保存这些值是一个好主
  • java 是否存在只有键没有值的哈希结构?

    我正在寻找一种无需值即可对键进行哈希处理的结构 查询时 如果找到密钥 则应返回 true 否则返回 false 我正在寻找类似的东西Hashtable
  • 获取符合条件的组合

    问题 我有一个表 我需要在其中提取行 或列 如果我转置表 的所有有效组合 列中只有值 或 并且当组合中的至少一行中有 时 组合被认为是有效的 也就是说 所有行中带有 的任何组合都是无效的 示例表 Guns P 01 P 02 P 03 P
  • React 路由器匹配失败

    使用有什么好处Match and Miss组件来自react router over Router成分 我似乎找不到任何关于此的文档反应路由器文档 https github com ReactTraining react router tr
  • R外部接口

    我想实现一些用 C 代码编写的 R 包 C 代码必须 将数组 任何类型 作为输入 生成数组作为输出 大小不可预测 实现数组传递的最佳实践是什么 目前 C 代码被调用 C 它通过指针直接从 R 访问数组 不幸的是 无法对输出执行相同的操作 因
  • 如何检查java方法的字节码长度

    目前 我参与了一个大型遗留项目 其中包含许多巨大的类和生成的代码 我希望找到所有字节码长度大于 8000 字节的方法 因为 OOTB java 不会优化它 我发现这样的手动方式 Java 中的特定方法有多少字节的字节码 https stac
  • 如何从 gdb 命令提示符执行外部命令?

    我正在使用 gdb 调试程序 每当我错过断点或决定添加另一个观察点时 我必须终止该进程并重新运行它 为了将现有的 gdb 附加到它 我使用attach
  • 使用 VS2010 Professional 从 TFS 在线删除项目(TFSDeleteProject 不在我的计算机上!)

    我有 Visual Studio 2010 Professional 并且一直在 Visualstudio com 上使用 TFS 服务 我读了这个问题及其答案 https stackoverflow com questions 13635
  • 在 ggplot 中自定义图例

    我需要帮助使用 ggplot2 自定义图表 下面是我正在使用的代码和生成的图表 gt p ggplot a2 aes x grid y median geom line size 1 3 geom line aes x grid y low
  • 如何根据同月的日期查找一个月中特定日期的第五个或结束日期

    我一直在尝试根据同月的日期查找一个月中某一天的第五周日期 例如第五周星期一日期 第五周星期二日期 星期三 等等 该日期可以属于同月的任何一周 我尝试过像 DateTime MonthEventDate 05 01 2016 Date for
  • 如何在 HTML 表单中“预填充”文本区域的值? [复制]

    这个问题在这里已经有答案了 我正在创建一个简单的后端应用程序 用户可以通过它创建 更新 删除数据库行 在本例中为工作列表 当用户编辑现有列表时 我尝试使用现有行中的数据预填充大部分 HTML 表单 我已经使用 value 属性成功地完成了文
  • 如何就地刷新组合框项目?

    ComboBox Items 集合是一个 ObjectCollection 因此您当然可以在其中存储您想要的任何内容 但这意味着您不会像使用 ListViewItem 那样获得 Text 属性 ComboBox 通过对每个项目调用 ToSt
  • 如何对 IEnumerable 进行分块,而不会在失败时丢失/丢弃项目?

    我有一个生产者 消费者场景 其中生产者是一个可枚举的项目序列 IEnumerable