Java 中并发管道的策略

请考虑以下 shell 脚本:

gzip -dc in.gz | sed -e 's/@/_at_/g' | gzip -c > out.gz 

这有三个并行工作的进程来解压缩流,修改它,然后重新压缩它。运行我可以看到我的用户时间大约是我实时时间的两倍,这表明程序正在有效地并行工作。time

我试图通过将每个任务放在自己的线程中来在Java中创建相同的程序。遗憾的是,对于上述示例,多线程 Java 程序仅比单线程版本快约 30%。我尝试过同时使用ExchangerControlledQueue。ConcurrentLinkedQueue 链接队列会导致大量争用,尽管所有三个线程通常都保持忙碌。Exchanger 的争用较低,但更复杂,并且似乎无法使最慢的工作线程在 100% 的时间内运行。

我试图找出这个问题的纯Java解决方案,而不用看一个字节代码编织框架或基于JNI的MPI。

大多数并发研究和API都关注分而治之的算法,为每个节点提供正交且不依赖于先前计算的工作。另一种并发方法是管道方法,其中每个工作线程执行一些工作并将数据传递给下一个工作线程。

我并不是想找到最有效的方法来处理gzip'd文件,而是在研究如何有效地分解管道中的任务,以便将运行时减少到最慢的任务。

10m 行文件的当前计时如下所示:

Testing via shell

real    0m31.848s
user    0m58.946s
sys     0m1.694s

Testing SerialTest

real    0m59.997s
user    0m59.263s
sys     0m1.121s

Testing ParallelExchangerTest

real    0m41.573s
user    1m3.436s
sys     0m1.830s

Testing ConcurrentQueueTest

real    0m44.626s
user    1m24.231s
sys     0m10.856s

我为Java改进10%提供了奖励,这是通过在具有10m行测试数据的四核系统上实时衡量的。当前源在 Bitbucket 上可用。


答案 1

首先,该过程只会与最慢的部分一样快。如果时序故障为:

  • 枪位: 1 秒
  • sed: 5 秒
  • 格兹普: 1 秒

通过多线程,您将最多在5秒而不是7秒内完成。

其次,与其使用您正在使用的队列,不如尝试复制要复制的内容的功能,并使用 PipedInputStreamPipedOutputStream 将进程链接在一起。

编辑:有几种方法可以使用Java并发实用程序处理相关任务。将其划分为多个线程。首先创建一个公共基类:

public interface Worker {
  public run(InputStream in, OutputStream out);
}

此接口的作用是表示处理输入并生成输出的任意作业。把这些链接在一起,你就有了一个管道。您也可以抽象出样板。为此,我们需要一个类:

public class UnitOfWork implements Runnable {
  private final InputStream in;
  private final OutputStream out;
  private final Worker worker;

  public UnitOfWork(InputStream in, OutputStream out, Worker worker) {
    if (in == null) {
      throw new NullPointerException("in is null");
    }
    if (out == null) {
      throw new NullPointerException("out is null");
    }
    if (worker == null) {
      throw new NullPointerException("worker is null");
    }
    this.in = in;
    this.out = out;
    this.worker = worker;
  }

  public final void run() {
    worker.run(in, out);
  }
}

因此,例如,PART:Unzip

public class Unzip implements Worker {
  protected void run(InputStream in, OutputStream out) {
    ...
  }
}

,以此类推,用于 和 。然后,将它绑定在一起的是:SedZip

public static void pipe(InputStream in, OutputStream out, Worker... workers) {
  if (workers.length == 0) {
    throw new IllegalArgumentException("no workers");
  }
  OutputStream last = null;
  List<UnitOfWork> work = new ArrayList<UnitOfWork>(workers.length);
  PipedOutputStream last = null;
  for (int i=0; i<workers.length-2; i++) {
    PipedOutputStream out = new PipedOutputStream();
    work.add(new UnitOfWork(
      last == null ? in, new PipedInputStream(last), out, workers[i]);
    last = out;
  }
  work.add(new UnitOfWork(new PipedInputStream(last),
    out, workers[workers.length-1);
  ExecutorService exec = Executors.newFixedThreadPool(work.size());
  for (UnitOfWork w : work) {
    exec.submit(w);
  }
  exec.shutdown();
  try {
    exec.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
  } catch (InterruptedExxception e) {
    // do whatever
  }
}

我不确定你能做得比这更好,而且每个工作都需要编写最少的代码。然后你的代码变成:

public static processFile(String inputName, String outputName) {
  pipe(new FileInputStream(inputFile), new FileOutputStream(outputFile),
    new Zip(), new Sed(), new Unzip());
}

答案 2

我单独验证了所花费的时间,似乎阅读花费的时间不到10%,而阅读加处理所花费的时间不到整个时间的30%。所以我采用了ParallelExchangerTest(在你的代码中表现最好的)并将其修改为只有2个线程,第一个线程执行读取和替换,第二个线程执行写入。

以下是要比较的数字(在我的机器上英特尔双核(不是core2)运行带有1gb ram的ubuntu)

通过外壳进行>测试

实际 0m41.601s

用户 0m58.604s

系统 0m1.032s

>测试并行交换器测试

实际 1m55.424s

用户 2m14.160s

系统 0m4.768s

> ParallelExchangerTestMod (2 thread)

实际 1m35.524s

用户 1m55.319s

系统 0m3.580s

我知道字符串处理需要更长的时间,所以我用 matcher.replace 替换 line.repalceAll,我得到了这个数字

> ParallelExchangerTestMod_Regex(2 线程)

实际 1m12.781s

用户 1m33.382s

系统 0m2.916s

现在我领先了一步,不是一次读取一行,而是读取各种大小的char[]缓冲区并对其进行计时,(使用正则表达式搜索/替换,)我得到了这些数字

> 测试ParallelExchangerTestMod_Regex_Buff(一次处理 100 个字节)

实际 1m13.804s

用户 1m32.494s

系统 0m2.676s

> 测试ParallelExchangerTestMod_Regex_Buff(一次处理 500 字节)

实际 1m6.286s

用户 1m29.334s

系统 0m2.324s

> 测试ParallelExchangerTestMod_Regex_Buff(一次处理 800 字节)

实际 1m12.309s

用户 1m33.910s

系统 0m2.476s

看起来 500 字节是数据大小的最佳选择。

我分叉并在此处拥有更改的副本

https://bitbucket.org/chinmaya/java-concurrent_response/


推荐