哪种 Java 同步结构可能提供最好的
并发、迭代处理场景的性能
像下面概述的那样固定数量的线程?实验后
我自己呆了一段时间(使用 ExecutorService 和 CyclicBarrier)并且
对结果有些惊讶,我会感激一些
专家建议,也许还有一些新想法。这里现有的问题
似乎并不主要关注性能,因此有了这个新的。
提前致谢!
该应用程序的核心是一个简单的迭代数据处理算法,
并行化,将计算负载分散到 8 个内核上
Mac Pro,运行 OS X 10.6 和 Java 1.6.0_07。待处理的数据
被分成8个块,每个块被送入一个Runnable来执行
由固定数量的线程之一。算法的并行化是
相当简单,并且它的功能按预期工作,但是
它的表现还没有达到我的预期。该应用程序似乎
花费大量时间在系统调用同步上,所以经过一些
profiling 我想知道我是否选择了最合适的
同步机制。
该算法的一个关键要求是它需要继续进行
阶段,因此线程需要在每个阶段结束时同步。
主线程准备工作(非常低的开销),将其传递给
线程,让它们处理它,然后当所有线程
完成后,重新安排工作(同样非常低的开销)并重复
循环。机器专门负责这个任务,垃圾收集
通过使用预分配项的每线程池来最小化,并且
线程数量可以是固定的(没有传入请求等,
每个 CPU 核心只有一个线程)。
V1 - 执行服务
我的第一个实现使用了带有 8 个工作线程的 ExecutorService
线程。该程序创建 8 个任务来保存工作,然后
让他们继续工作,大致如下:
// create one thread per CPU
executorService = Executors.newFixedThreadPool( 8 );
...
// now process data in cycles
while( ...) {
// package data into 8 work items
...
// create one Callable task per work item
...
// submit the Callables to the worker threads
executorService.invokeAll( taskList );
}
这在功能上运作良好(它做了它应该做的事情),并且对于
非常大的工作项确实所有 8 个 CPU 都变得高负载,因为
正如处理算法预期允许的那样(一些
工作项目将比其他工作项目完成得更快,然后闲置)。然而,
随着工作项目变得更小(这并不是真正的
程序的控制),用户CPU负载急剧下降:
blocksize | system | user | cycles/sec
256k 1.8% 85% 1.30
64k 2.5% 77% 5.6
16k 4% 64% 22.5
4096 8% 56% 86
1024 13% 38% 227
256 17% 19% 420
64 19% 17% 948
16 19% 13% 1626
传奇:
- 块大小=工作项的大小(=计算步骤)
- system = 系统负载,如 OS X 活动监视器(红条)所示
- user = 用户负载,如 OS X 活动监视器(绿色条)中所示
- 周期/秒 = 主 while 循环的迭代次数,越多越好
这里主要关注的是花费的时间比例很高
在系统中,似乎是由线程同步驱动的
来电。正如预期的那样,对于较小的工作项,ExecutorService.invokeAll()
需要相对更多的努力来同步线程
与每个线程中执行的工作量。但
因为 ExecutorService 比它需要的更通用
对于这个用例(如果有的话,它可以为线程排队任务
任务多于核心),我想也许会有更精简的
同步构造。
V2 - 循环屏障
下一个实现使用 CyclicBarrier 来同步
接收工作之前和完成之后的线程,
大致如下:
main() {
// create the barrier
barrier = new CyclicBarrier( 8 + 1 );
// create Runable for thread, tell it about the barrier
Runnable task = new WorkerThreadRunnable( barrier );
// start the threads
for( int i = 0; i < 8; i++ )
{
// create one thread per core
new Thread( task ).start();
}
while( ... ) {
// tell threads about the work
...
// N threads + this will call await(), then system proceeds
barrier.await();
// ... now worker threads work on the work...
// wait for worker threads to finish
barrier.await();
}
}
class WorkerThreadRunnable implements Runnable {
CyclicBarrier barrier;
WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; }
public void run()
{
while( true )
{
// wait for work
barrier.await();
// do the work
...
// wait for everyone else to finish
barrier.await();
}
}
}
同样,这在功能上运作良好(它做了它应该做的事情),
对于非常大的工作项目,实际上所有 8 个 CPU 都会变得高度
已加载,如前所述。然而,随着工作项目变得越来越小,
负载仍然急剧减少:
blocksize | system | user | cycles/sec
256k 1.9% 85% 1.30
64k 2.7% 78% 6.1
16k 5.5% 52% 25
4096 9% 29% 64
1024 11% 15% 117
256 12% 8% 169
64 12% 6.5% 285
16 12% 6% 377
对于大型工作项目,同步可以忽略不计,并且
性能与V1相同。但没想到结果
(高度专业化的)CyclicBarrier 似乎比
对于(通用)ExecutorService:吞吐量(周期/秒)
仅为V1的1/4左右。初步结论是
尽管这似乎是宣传的理想用途
对于 CyclicBarrier 来说,它的性能比
通用ExecutorService。
V3 - 等待/通知 + CyclicBarrier
似乎值得尝试替换第一个循环屏障await()
使用简单的等待/通知机制:
main() {
// create the barrier
// create Runable for thread, tell it about the barrier
// start the threads
while( ... ) {
// tell threads about the work
// for each: workerThreadRunnable.setWorkItem( ... );
// ... now worker threads work on the work...
// wait for worker threads to finish
barrier.await();
}
}
class WorkerThreadRunnable implements Runnable {
CyclicBarrier barrier;
@NotNull volatile private Callable<Integer> workItem;
WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; this.workItem = NO_WORK; }
final protected void
setWorkItem( @NotNull final Callable<Integer> callable )
{
synchronized( this )
{
workItem = callable;
notify();
}
}
public void run()
{
while( true )
{
// wait for work
while( true )
{
synchronized( this )
{
if( workItem != NO_WORK ) break;
try
{
wait();
}
catch( InterruptedException e ) { e.printStackTrace(); }
}
}
// do the work
...
// wait for everyone else to finish
barrier.await();
}
}
}
同样,这在功能上运作良好(它做了它应该做的事情)。
blocksize | system | user | cycles/sec
256k 1.9% 85% 1.30
64k 2.4% 80% 6.3
16k 4.6% 60% 30.1
4096 8.6% 41% 98.5
1024 12% 23% 202
256 14% 11.6% 299
64 14% 10.0% 518
16 14.8% 8.7% 679
小工作项的吞吐量还是差很多
ExecutorService 的大小,但大约是 CyclicBarrier 的 2 倍。
消除一个 CyclicBarrier 就消除了一半的间隙。
V4 - 忙等待而不是等待/通知
由于此应用程序是系统上运行的第一个应用程序,并且
如果核心不忙于工作项目,它们无论如何都会闲置,
为什么不在每个线程中尝试繁忙等待工作项,即使
这会导致 CPU 不必要地旋转。工作线程代码发生变化
如下:
class WorkerThreadRunnable implements Runnable {
// as before
final protected void
setWorkItem( @NotNull final Callable<Integer> callable )
{
workItem = callable;
}
public void run()
{
while( true )
{
// busy-wait for work
while( true )
{
if( workItem != NO_WORK ) break;
}
// do the work
...
// wait for everyone else to finish
barrier.await();
}
}
}
功能上也运行良好(它做了它应该做的事情)。
blocksize | system | user | cycles/sec
256k 1.9% 85% 1.30
64k 2.2% 81% 6.3
16k 4.2% 62% 33
4096 7.5% 40% 107
1024 10.4% 23% 210
256 12.0% 12.0% 310
64 11.9% 10.2% 550
16 12.2% 8.6% 741
对于小型工作项目,这进一步提高了吞吐量
比 CyclicBarrier + wait/notify 变体提高 10%,这不是
微不足道。但它的吞吐量仍然比 V1 低很多
与ExecutorService。
V5 - ?
那么对于这样的情况,最好的同步机制是什么?
(想必并不少见)问题?我厌倦了写我的
自己的同步机制完全替代ExecutorService
(假设它太通用并且必须有一些东西
仍然可以将其取出以提高效率)。
这不是我的专业领域,我担心我会
花了很多时间调试它(因为我什至不确定
我的等待/通知和繁忙等待变体是正确的)
不确定的收益。
任何建议将不胜感激。