交叉发布到http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9 http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9
我知道...我并没有真正发挥 TplDataflow 的最大潜力。我只是使用 ATMBufferBlock
作为消息传递的安全队列,生产者和消费者以不同的速率运行。我看到一些奇怪的行为,让我不知道如何
继续。
private BufferBlock<object> messageQueue = new BufferBlock<object>();
public void Send(object message)
{
var accepted=messageQueue.Post(message);
logger.Info("Send message was called qlen = {0} accepted={1}",
messageQueue.Count,accepted);
}
public async Task<object> GetMessageAsync()
{
try
{
var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
//despite messageQueue.Count>0 next line
//occasionally does not execute
logger.Info("message received");
//.......
}
catch(TimeoutException)
{
//do something
}
}
在上面的代码中(这是 2000 行分布式解决方案的一部分),Send
每 100 毫秒左右定期调用一次。这意味着一个项目是Post
ed to messageQueue
每秒大约10次。这是经过验证的。然而,偶尔会出现这样的情况ReceiveAsync
未在超时时间内完成(即Post
没有造成ReceiveAsync
完成)和TimeoutException
30岁后被提升。在此刻,messageQueue.Count
有数百个。这是出乎意料的。在较慢的发布速度(1 个帖子/秒)下也观察到此问题,并且通常发生在 1000 个项目通过之前BufferBlock
.
因此,为了解决这个问题,我使用以下代码,该代码可以工作,但偶尔会在接收时导致 1 秒延迟(由于出现上述错误)
public async Task<object> GetMessageAsync()
{
try
{
object m;
var attempts = 0;
for (; ; )
{
try
{
m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1));
}
catch (TimeoutException)
{
attempts++;
if (attempts >= 30) throw;
continue;
}
break;
}
logger.Info("message received");
//.......
}
catch(TimeoutException)
{
//do something
}
}
对我来说,这看起来像是 TDF 中的竞争条件,但我无法弄清楚为什么在我使用的其他地方不会发生这种情况BufferBlock
以类似的方式。实验性地改变自ReceiveAsync
to Receive
没有帮助。我还没有检查过,但我想单独来看,上面的代码可以完美地工作。这是我在“TPL 数据流简介”中看到的一种模式tpl数据流.docx http://www.microsoft.com/download/en/details.aspx?id=14782.
我该怎么做才能查清真相?是否有任何指标可以帮助推断正在发生的情况?如果我无法创建可靠的测试用例,我还能提供哪些信息?
Help!