我有一个生产者-消费者场景,其中生产者是一个可枚举的项目序列(IEnumerable<Item>
)。我想以大块/批次的方式处理这些项目,每个项目 10 个。所以我决定使用新的(.NET 6)Chunk https://learn.microsoft.com/en-us/dotnet/api/system.linq.enumerable.chunkLINQ 运算符,如这个问题中所建议的:在 LINQ 中创建批处理 https://stackoverflow.com/questions/13731796/create-batches-in-linq.
我的问题是,有时生产者会失败,在这种情况下,分块序列的消费者会收到错误,而无需先接收包含错误之前生成的最后一个项目的块。因此,例如,如果生产者生成 15 个项目然后失败,则消费者将获得包含项目 1-10 的块,然后会出现异常。 11-15号物品将会丢失!这是一个演示这种不良行为的最小示例:
static IEnumerable<int> Produce()
{
int i = 0;
while (true)
{
i++;
Console.WriteLine($"Producing #{i}");
yield return i;
if (i == 15) throw new Exception("Oops!");
}
}
// Consume
foreach (int[] chunk in Produce().Chunk(10))
{
Console.WriteLine($"Consumed: [{String.Join(", ", chunk)}]");
}
Output:
Producing #1
Producing #2
Producing #3
Producing #4
Producing #5
Producing #6
Producing #7
Producing #8
Producing #9
Producing #10
Consumed: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Producing #11
Producing #12
Producing #13
Producing #14
Producing #15
Unhandled exception. System.Exception: Oops!
at Program.<Main>g__Produce|0_0()+MoveNext()
at System.Linq.Enumerable.ChunkIterator[TSource](IEnumerable`1 source, Int32 size)+MoveNext()
at Program.Main()
在线演示 https://dotnetfiddle.net/Wz77NZ.
理想的行为是获取大量的值[11, 12, 13, 14, 15]
在获得异常之前。
我的问题是:有什么办法可以配置Chunk
运算符以便优先发出数据而不是异常?如果没有,我如何实现自定义 LINQ 运算符,例如命名ChunkNonDestructive
,具有理想的行为?
public static IEnumerable<TSource[]> ChunkNonDestructive<TSource>(
this IEnumerable<TSource> source, int size);
Note:除了从System.Linq.Chunk
运算符我也尝试过Buffer
运营商从系统交互 https://www.nuget.org/packages/System.Interactive/包,以及Batch https://morelinq.github.io/3.3/ref/api/html/Overload_MoreLinq_MoreEnumerable_Batch.htm运营商从MoreLinq https://www.nuget.org/packages/morelinq/包裹。显然他们的行为都是一样的(破坏性的)。
Update:以下是上述示例的理想输出:
Producing #1
Producing #2
Producing #3
Producing #4
Producing #5
Producing #6
Producing #7
Producing #8
Producing #9
Producing #10
Consumed: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Producing #11
Producing #12
Producing #13
Producing #14
Producing #15
Consumed: [11, 12, 13, 14, 15]
Unhandled exception. System.Exception: Oops!
at Program.<Main>g__Produce|0_0()+MoveNext()
at System.Linq.Enumerable.ChunkIterator[TSource](IEnumerable`1 source, Int32 size)+MoveNext()
at Program.Main()
区别就是线Consumed: [11, 12, 13, 14, 15]
,这并不存在于实际输出中。