在与这个问题度过了一个浪漫的夜晚并尝试了克里斯建议的不同方法之后,我发现有完后还有你必须做的事情才能让它正常工作。
具体来说,yes,您需要在消费者队列地址上设置预取:
sbc.UseRabbitMq(
f =>
f.ConfigureHost(
new Uri( "rabbitmq://guest:guest@localhost/masstransit_consumer" ),
c =>
{
} )
);
int pf = 20; // prefetch
// set consumer prefetch (required!)
sbc.ReceiveFrom( string.Format( "rabbitmq://guest:guest@localhost/masstransit_consumer?prefetch={0}", pf ) );
但这还不够。
密钥可以在代码中找到mtstress
克里斯在他的答案下面的评论中提到了工具。事实证明该工具调用:
int _t, _ct;
ThreadPool.GetMinThreads( out _t, out _ct );
ThreadPool.SetMinThreads( pf, _ct );
将其添加到我的代码中可以解决该问题。我想知道为什么 MSMQ 传输不需要这样做......
更新#1
经过进一步调查,我发现了可能的罪魁祸首。它位于ServiceBusBuilderImpl
.
有一种方法可以提高限制,即ConfigureThreadPool
.
这里的问题是它调用CalculateRequiredThreads
它应该返回所需的线程数。不幸的是后者返回一个negative在我的客户端 Windows 7 和 Windows Server 上都有价值。就这样ConfigureThreadPool
实际上什么都不做,因为调用时负值会被忽略ThreadPool.SetMin/MaxThreads
.
这个负值怎么办?看来CalculateRequiredThreads
calls ThreadPool.GetMinThreads
and ThreadPool.GetAvailableThreads
并使用公式得出所需的线程数:
var requiredThreads = consumerThreads + (workerThreads - availableWorkerThreads);
这里的问题是,在我的机器上,这实际上是:
40 (my limit) + 8 (workerThreads) - 1023 (availableThreads)
这当然会返回
-975
结论是:上面来自大众交通内部的代码似乎是错误的。当我提前手动提高限制时,ConfigureMinThreads
尊重它(因为它仅在高于读取值时设置限制)。
如果没有提前手动设置限制,则无法设置限制,因此代码会执行与默认线程池限制一样多的线程(在我的机器上似乎是 8)。
显然有人认为这个公式会产生
40 + 8 - 8
在默认场景下。为什么GetMinThreads
and GetAvailableThreads
返回此类不相关的值尚未确定......
更新#2
改变
static int CalculateRequiredThreads( int consumerThreads )
{
int workerThreads;
int completionPortThreads;
ThreadPool.GetMinThreads( out workerThreads, out completionPortThreads );
int availableWorkerThreads;
int availableCompletionPortThreads;
ThreadPool.GetAvailableThreads( out availableWorkerThreads, out availableCompletionPortThreads );
var requiredThreads = consumerThreads + ( workerThreads - availableWorkerThreads );
return requiredThreads;
}
to
static int CalculateRequiredThreads( int consumerThreads )
{
int workerThreads;
int completionPortThreads;
ThreadPool.GetMaxThreads( out workerThreads, out completionPortThreads );
int availableWorkerThreads;
int availableCompletionPortThreads;
ThreadPool.GetAvailableThreads( out availableWorkerThreads, out availableCompletionPortThreads );
var requiredThreads = consumerThreads + ( workerThreads - availableWorkerThreads );
return requiredThreads;
}
解决问题。此处均返回 1023,并且公式的输出是正确的预期线程数。