Java中最快的循环同步是什么(ExecutorService vs. CyclicBarrier vs. X)?
哪种 Java 同步构造可能为具有固定数量线程的并发迭代处理方案提供最佳性能,如下所示?在我自己尝试了一段时间(使用ExecutorService和CyclicBarrier)并对结果感到有些惊讶之后,我将不胜感激一些专家建议,也许还有一些新的想法。这里的现有问题似乎并不主要集中在性能上,因此是这个新的问题。提前致谢!
该应用程序的核心是一个简单的迭代数据处理算法,并行化,将计算负载分散到Mac Pro上的8个内核上,运行OS X 10.6和Java 1.6.0_07。要处理的数据被分成8个块,每个块被馈送到一个Runnable,由固定数量的线程之一执行。并行化算法相当简单,它在功能上可以根据需要工作,但其性能还不是我认为的那样。该应用程序似乎在系统调用同步上花费了大量时间,因此在进行一些分析之后,我想知道我是否选择了最合适的同步机制。
该算法的一个关键要求是它需要分阶段进行,因此线程需要在每个阶段结束时同步。主线程准备工作(非常低的开销),将其传递给线程,让他们在上面工作,然后在所有线程完成后继续,重新安排工作(同样是非常低的开销)并重复循环。机器专用于此任务,垃圾回收通过使用预分配项目的每线程池最小化,并且可以固定线程数(没有传入请求或类似请求,每个CPU内核只有一个线程)。
V1 - 执行器服务
我的第一个实现使用了具有 8 个工作线程的执行器服务。该程序创建8个任务来保存工作,然后让他们处理它,大致如下:
// create one thread per CPU
executorService = Executors.newFixedThreadPool( 8 );
...
// now process data in cycles
while( ...) {
// package data into 8 work items
...
// create one Callable task per work item
...
// submit the Callables to the worker threads
executorService.invokeAll( taskList );
}
这在功能上运行良好(它做了它应该做的事情),并且对于非常大的工作项,实际上所有8个CPU都变得高负载,就像处理算法预期允许的那样(某些工作项将比其他工作项更快地完成,然后空闲)。但是,随着工作项变得越来越小(这实际上并不在程序的控制之下),用户 CPU 负载会急剧减少:
blocksize | system | user | cycles/sec
256k 1.8% 85% 1.30
64k 2.5% 77% 5.6
16k 4% 64% 22.5
4096 8% 56% 86
1024 13% 38% 227
256 17% 19% 420
64 19% 17% 948
16 19% 13% 1626
图例:- 块大小 = 工作项的大小(= 计算步骤) - 系统 = 系统负载,如 OS X 活动监视器(红色条)中所示 - 用户 = 用户负载,如 OS X 活动监视器(绿色条)所示 - 周期/秒 = 通过主 while 循环的迭代次数,越多越好
这里主要关注的领域是在系统中花费的时间百分比很高,这似乎是由线程同步调用驱动的。正如预期的那样,对于较小的工作项,ExecutorService.invokeAll() 将需要相对更多的工作来同步线程,而不是在每个线程中执行的工作量。但是,由于 ExecutorService 比此用例所需的更通用(如果任务数多于内核数,它可以对线程的任务进行排队),因此我可能会有更精简的同步构造。
V2 - 循环巴里耶
下一个实现使用CyclicBarrier在接收工作之前和完成工作之后同步线程,大致如下:
main() {
// create the barrier
barrier = new CyclicBarrier( 8 + 1 );
// create Runable for thread, tell it about the barrier
Runnable task = new WorkerThreadRunnable( barrier );
// start the threads
for( int i = 0; i < 8; i++ )
{
// create one thread per core
new Thread( task ).start();
}
while( ... ) {
// tell threads about the work
...
// N threads + this will call await(), then system proceeds
barrier.await();
// ... now worker threads work on the work...
// wait for worker threads to finish
barrier.await();
}
}
class WorkerThreadRunnable implements Runnable {
CyclicBarrier barrier;
WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; }
public void run()
{
while( true )
{
// wait for work
barrier.await();
// do the work
...
// wait for everyone else to finish
barrier.await();
}
}
}
同样,这在功能上运行良好(它做了它应该做的事情),并且对于非常大的工作项,实际上所有8个CPU都像以前一样变得高负载。但是,随着工作项变小,负载仍会急剧缩小:
blocksize | system | user | cycles/sec
256k 1.9% 85% 1.30
64k 2.7% 78% 6.1
16k 5.5% 52% 25
4096 9% 29% 64
1024 11% 15% 117
256 12% 8% 169
64 12% 6.5% 285
16 12% 6% 377
对于大型工作项,同步可以忽略不计,并且性能与 V1 相同。但出乎意料的是,(高度专业化的)CyclicBarrier的结果似乎比(通用的)ExecutorService的结果差得多:吞吐量(周期/秒)仅为V1的1/4左右。初步的结论是,尽管这似乎是CyclicBarrier的理想用例,但它的性能比通用的ExecutorService差得多。
V3 - 等待/通知 + 循环巴里耶
似乎值得尝试用简单的等待/通知机制替换第一个循环屏障 await():
main() {
// create the barrier
// create Runable for thread, tell it about the barrier
// start the threads
while( ... ) {
// tell threads about the work
// for each: workerThreadRunnable.setWorkItem( ... );
// ... now worker threads work on the work...
// wait for worker threads to finish
barrier.await();
}
}
class WorkerThreadRunnable implements Runnable {
CyclicBarrier barrier;
@NotNull volatile private Callable<Integer> workItem;
WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; this.workItem = NO_WORK; }
final protected void
setWorkItem( @NotNull final Callable<Integer> callable )
{
synchronized( this )
{
workItem = callable;
notify();
}
}
public void run()
{
while( true )
{
// wait for work
while( true )
{
synchronized( this )
{
if( workItem != NO_WORK ) break;
try
{
wait();
}
catch( InterruptedException e ) { e.printStackTrace(); }
}
}
// do the work
...
// wait for everyone else to finish
barrier.await();
}
}
}
同样,这在功能上效果很好(它做了它应该做的事情)。
blocksize | system | user | cycles/sec
256k 1.9% 85% 1.30
64k 2.4% 80% 6.3
16k 4.6% 60% 30.1
4096 8.6% 41% 98.5
1024 12% 23% 202
256 14% 11.6% 299
64 14% 10.0% 518
16 14.8% 8.7% 679
小型工作项的吞吐量仍然比执行器服务差得多,但大约是CyclicBarrier的2倍。消除一个循环巴里尔可以消除一半的间隙。
V4 - 忙于等待而不是等待/通知
由于此应用程序是在系统上运行的主要应用程序,并且如果内核不忙于工作项,则内核无论如何都会空闲,因此为什么不尝试在每个线程中忙于等待工作项,即使这会不必要地旋转CPU。工作线程代码更改如下:
class WorkerThreadRunnable implements Runnable {
// as before
final protected void
setWorkItem( @NotNull final Callable<Integer> callable )
{
workItem = callable;
}
public void run()
{
while( true )
{
// busy-wait for work
while( true )
{
if( workItem != NO_WORK ) break;
}
// do the work
...
// wait for everyone else to finish
barrier.await();
}
}
}
在功能上也运行良好(它做了它应该做的事情)。
blocksize | system | user | cycles/sec
256k 1.9% 85% 1.30
64k 2.2% 81% 6.3
16k 4.2% 62% 33
4096 7.5% 40% 107
1024 10.4% 23% 210
256 12.0% 12.0% 310
64 11.9% 10.2% 550
16 12.2% 8.6% 741
对于小型工作项,这比 CyclicBarrier + wait/notify 变体进一步提高了 10%,这并非微不足道。但它仍然比使用执行器服务的V1低得多。
V5 - ?
那么,对于这种(可能并不罕见)问题,最好的同步机制是什么呢?我厌倦了编写自己的同步机制来完全取代ExecutorService(假设它太通用了,并且必须有一些东西仍然可以被删除以使其更有效)。这不是我的专业领域,我担心我会花很多时间调试它(因为我甚至不确定我的等待/通知和繁忙的等待变体是否正确)以获得不确定的收益。
任何建议将不胜感激。