通常,在这种情况下你会做的就是设置BoundedCapacity
of the CreateData
堵塞。但这在这里行不通,因为TransformManyBlock
似乎不需要BoundedCapacity
从单个填充输出队列时考虑IEnumerable
.
您可以做的是创建一个迭代集合并使用的函数SendAsync()
仅当目标可以接受时才发送更多数据:
/// <remarks>
/// If iterating data throws an exception, the target block is faulted
/// and the returned Task completes successfully.
///
/// Depending on the usage, this might or might not be what you want.
/// </remarks>
public static async Task SendAllAsync<T>(
this ITargetBlock<T> target, IEnumerable<T> data)
{
try
{
foreach (var item in data)
{
await target.SendAsync(item);
}
}
catch (Exception e)
{
target.Fault(e);
}
}
Usage:
var data = Enumerable.Range(0, 1000 * 1000 * 1000).Select((s,i) => "Hello, World " + i);
await ParseFile.SendAllAsync(data);
ParseFile.Complete();
如果你还想拥有CreateData
块的行为与原始代码类似,您可以有两个有界的BufferBlock
s, SendAllAsync()
他们之间,然后使用Encapsulate()
使它们看起来像一个块:
/// <remarks>
/// boundedCapacity represents the capacity of the input queue
/// and the output queue separately, not their total.
/// </remarks>
public static IPropagatorBlock<TInput, TOutput>
CreateBoundedTransformManyBlock<TInput, TOutput>(
Func<TInput, IEnumerable<TOutput>> transform, int boundedCapacity)
{
var input = new BufferBlock<TInput>(
new DataflowBlockOptions { BoundedCapacity = boundedCapacity });
var output = new BufferBlock<TOutput>(
new DataflowBlockOptions { BoundedCapacity = boundedCapacity });
Task.Run(
async () =>
{
try
{
while (await input.OutputAvailableAsync())
{
var data = transform(await input.ReceiveAsync());
await output.SendAllAsync(data);
}
output.Complete();
}
catch (Exception e)
{
((IDataflowBlock)input).Fault(e);
((IDataflowBlock)output).Fault(e);
}
});
return DataflowBlock.Encapsulate(input, output);
}