我有一个类,它使用 TPL 数据流实现由 3 个步骤组成的数据流。
在构造函数中,我将步骤创建为 TransformBlocks,并使用 LinkTo 将它们链接起来,并将 DataflowLinkOptions.PropagateCompletion 设置为 true。该类公开了一个方法,该方法通过在第一步调用 SendAsync 来启动工作流程。该方法返回工作流最后一步的“Completion”属性。
目前,工作流程中的步骤似乎按预期执行,但最后一步永远不会完成,除非我明确调用 Complete 。但是这样做会缩短工作流程并且不会执行任何步骤吗?我究竟做错了什么?
public class MessagePipeline {
private TransformBlock<object, object> step1;
private TransformBlock<object, object> step2;
private TransformBlock<object, object> step3;
public MessagePipeline() {
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
step1 = new TransformBlock<object, object>(
x => {
Console.WriteLine("Step1...");
return x;
});
step2 = new TransformBlock<object, object>(
x => {
Console.WriteLine("Step2...");
return x;
});
step3 = new TransformBlock<object, object>(
x => {
Console.WriteLine("Step3...");
return x;
});
step1.LinkTo(step2, linkOptions);
step2.LinkTo(step3, linkOptions);
}
public Task Push(object message) {
step1.SendAsync(message);
step1.Complete();
return step3.Completion;
}
}
...
public class Program {
public static void Main(string[] args) {
var pipeline = new MessagePipeline();
var result = pipeline.Push("Hello, world!");
result.ContinueWith(_ => Console.WriteLine("Completed"));
Console.ReadLine();
}
}
当您链接步骤时,您需要传递一个 DataflowLinkOptions传播完成 https://msdn.microsoft.com/en-us/library/system.threading.tasks.dataflow.dataflowlinkoptions.propagatecompletion(v=vs.110).aspx属性设置为 true 以传播完成和错误。一旦你这样做了,打电话Complete()
第一个块上的完成会将完成传播到下游块。
一旦块接收到完成事件,它就会完成处理,然后通知其链接的下游目标。
这样您就可以将所有数据发布到第一步并调用Complete()
。只有当所有上游块都完成时,最终块才会完成。
例如,
var linkOptions=new DataflowLinkOptions { PropagateCompletion = true};
myFirstBlock.LinkTo(mySecondBlock,linkOptions);
mySecondBlock.LinkTo(myFinalBlock,linkOptions);
foreach(var message in messages)
{
myFirstBlock.Post(message);
}
myFirstBlock.Complete();
......
await myFinalBlock.Completion;
默认情况下,PropagateCompletion 不是 true,因为在更复杂的场景(例如非线性流或动态变化的流)中,您不希望自动传播完成和错误。如果您想在不终止整个流程的情况下处理错误,您可能还希望避免自动完成。
早在 TPL Dataflow 处于测试版时就已成为默认设置was确实如此,但 RTM 上已更改
UPDATE
该代码永远不会完成,因为最后一步是TransformBlock
没有链接目标来接收其输出。这意味着即使该块收到了完成信号,它也没有完成所有工作,并且无法更改其自己的完成状态。
将其更改为ActionBlock<object>
消除了这个问题。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)