我有一个数据流管道,由多个处理异构文档(XLS、PDF 等)的块组成。每种类型的文档均由专门的人员处理TransformBlock
。在管道的末端我有一个ActionBlock
它接收所有处理后的文档,并将它们一一上传到网络服务器。我的问题是,我找不到一种方法来满足按照最初在管道中输入的顺序上传文档的要求。例如我不能使用EnsureOrdered https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.dataflowblockoptions.ensureordered这个选项对我有利,因为此选项配置单个块的行为,而不是并行工作的多个块的行为。我的要求是:
- 按特定顺序将文档插入管道中。
- 根据每个文档的类型,对每个文档进行不同的处理。
- 特定类型的文件应按顺序处理。
- 不同类型的文档可以(并且应该)并行处理。
- 所有文件应在处理后尽快上传。
- 文档必须按顺序上传,并按照它们进入管道的顺序相同。
例如,要求文档#8必须在文档#7之后上传,即使它是在文档#7之前处理的。
第五个需求意味着我不能等待所有文档处理完毕,然后按索引排序,最后上传。上传必须与处理同时进行。
这是我正在尝试做的一个最小的例子。为简单起见,我不会向块提供以下实例IDocument
接口,但带有简单的整数。每个整数的值代表它进入管道的顺序以及必须上传的顺序:
var xlsBlock = new TransformBlock<int, int>(document =>
{
int duration = 300 + document % 3 * 300;
Thread.Sleep(duration); // Simulate CPU-bound work
return document;
});
var pdfBlock = new TransformBlock<int, int>(document =>
{
int duration = 100 + document % 5 * 200;
Thread.Sleep(duration); // Simulate CPU-bound work
return document;
});
var uploader = new ActionBlock<int>(async document =>
{
Console.WriteLine($"Uploading document #{document}");
await Task.Delay(500); // Simulate I/O-bound work
});
xlsBlock.LinkTo(uploader);
pdfBlock.LinkTo(uploader);
foreach (var document in Enumerable.Range(1, 10))
{
if (document % 2 == 0)
xlsBlock.Post(document);
else
pdfBlock.Post(document);
}
xlsBlock.Complete();
pdfBlock.Complete();
_ = Task.WhenAll(xlsBlock.Completion, pdfBlock.Completion)
.ContinueWith(_ => uploader.Complete());
await uploader.Completion;
输出是:
Uploading document #1
Uploading document #2
Uploading document #3
Uploading document #5
Uploading document #4
Uploading document #7
Uploading document #6
Uploading document #9
Uploading document #8
Uploading document #10
(在小提琴上尝试一下 https://dotnetfiddle.net/JGiTcI)
理想的顺序是#1、#2、#3、#4、#5、#6、#7、#8、#9、#10。
在将已处理文档发送到之前,如何恢复它们的顺序uploader
block?
澄清:通过替换多个特定的管道来彻底改变管道的模式TransformBlock
s 具有单个泛型TransformBlock
,不是一个选项。理想的情况是拦截处理器和上传器之间的单个块,这将恢复文档的顺序。