我正在研究 TPL 数据流管道,并注意到与排序/并行性相关的一些奇怪行为TransformManyBlock
s(也可能适用于其他块)。
这是我要重现的代码(.NET 4.7.2,TPL Dataflow 4.9.0):
class Program
{
static void Main(string[] args)
{
var sourceBlock = new TransformManyBlock<int, Tuple<int, int>>(i => Source(i),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, EnsureOrdered = false });
var targetBlock = new ActionBlock<Tuple<int, int>>(tpl =>
{
Console.WriteLine($"Received ({tpl.Item1}, {tpl.Item2})");
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, EnsureOrdered = true });
sourceBlock.LinkTo(targetBlock, new DataflowLinkOptions { PropagateCompletion = true });
for (int i = 0; i < 10; i++)
{
sourceBlock.Post(i);
}
sourceBlock.Complete();
targetBlock.Completion.Wait();
Console.WriteLine("Finished");
Console.Read();
}
static IEnumerable<Tuple<int, int>> Source(int i)
{
var rand = new Random(543543254);
for (int j = 0; j < i; j++)
{
Thread.Sleep(rand.Next(100, 1500));
Console.WriteLine($"Returning ({i}, {j})");
yield return Tuple.Create(i, j);
}
}
}
我期望的行为如下:
- 源块应该并行返回元组,唯一的要求是它们应该按次要属性排序
j
.
- 目标块应该按照接收到的顺序处理消息。
据我了解,二次排序条件是由以下性质满足的yield return
, so EnsureOrdered
可以设置为false
。如果设置为true
,源块将保留消息一段不可接受的时间,因为它将等待所有消息yield return
在传递消息之前完成(在真实的应用程序中,要处理许多 GB 的数据,这意味着我们希望尽快通过管道传播数据,以便我们可以释放 RAM)。这是一个示例输出,当EnsureOrdered
源块的设置为true
:
Returning (1, 0)
Returning (2, 0)
Returning (4, 0)
Returning (3, 0)
Returning (2, 1)
Returning (4, 1)
Returning (3, 1)
Received (1, 0)
Received (2, 0)
Received (2, 1)
Returning (4, 2)
Returning (3, 2)
Received (3, 0)
Received (3, 1)
Received (3, 2)
Returning (5, 0)
Returning (6, 0)
我们可以看到源块并行工作,但等待传播消息,直到下一个块的所有消息都传播完毕。i
in line 已生成(如预期)。
然而当EnsureOrdered
对于源块是false
(如代码示例中所示),我得到以下输出:
Returning (2, 0)
Received (2, 0)
Returning (2, 1)
Received (2, 1)
Returning (4, 0)
Received (4, 0)
Returning (4, 1)
Received (4, 1)
Returning (4, 2)
Received (4, 2)
Returning (4, 3)
Received (4, 3)
Returning (1, 0)
Received (1, 0)
Returning (3, 0)
Received (3, 0)
Returning (3, 1)
Received (3, 1)
Returning (3, 2)
Received (3, 2)
源块在可用时成功传播消息,但是似乎并行性丢失了,因为它只处理一个i
一次。
为什么是这样?我怎样才能强制它并行处理?