Reader#lines() 由于其拆分器中不可配置的批大小策略而严重并行化
当流源是 .在四核CPU上运行以下代码,我观察到最初使用了3个内核,然后突然下降到只有两个,然后是一个内核。整体 CPU 利用率徘徊在 50% 左右。Reader
请注意该示例的以下特征:
- 只有6,000行;
- 每条生产线大约需要20毫秒来处理;
- 整个过程大约需要一分钟。
这意味着所有的压力都集中在 CPU 上,I/O 最小。示例是用于自动并行化的坐着的鸭子。
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
... class imports elided ...
public class Main
{
static final AtomicLong totalTime = new AtomicLong();
public static void main(String[] args) throws IOException {
final long start = System.nanoTime();
final Path inputPath = createInput();
System.out.println("Start processing");
try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(Paths.get("output.txt")))) {
Files.lines(inputPath).parallel().map(Main::processLine)
.forEach(w::println);
}
final double cpuTime = totalTime.get(),
realTime = System.nanoTime()-start;
final int cores = Runtime.getRuntime().availableProcessors();
System.out.println(" Cores: " + cores);
System.out.format(" CPU time: %.2f s\n", cpuTime/SECONDS.toNanos(1));
System.out.format(" Real time: %.2f s\n", realTime/SECONDS.toNanos(1));
System.out.format("CPU utilization: %.2f%%", 100.0*cpuTime/realTime/cores);
}
private static String processLine(String line) {
final long localStart = System.nanoTime();
double ret = 0;
for (int i = 0; i < line.length(); i++)
for (int j = 0; j < line.length(); j++)
ret += Math.pow(line.charAt(i), line.charAt(j)/32.0);
final long took = System.nanoTime()-localStart;
totalTime.getAndAdd(took);
return NANOSECONDS.toMillis(took) + " " + ret;
}
private static Path createInput() throws IOException {
final Path inputPath = Paths.get("input.txt");
try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(inputPath))) {
for (int i = 0; i < 6_000; i++) {
final String text = String.valueOf(System.nanoTime());
for (int j = 0; j < 25; j++) w.print(text);
w.println();
}
}
return inputPath;
}
}
我的典型输出:
Cores: 4
CPU time: 110.23 s
Real time: 53.60 s
CPU utilization: 51.41%
为了进行比较,如果我使用稍微修改的变体,我首先收集到一个列表中,然后处理:
Files.lines(inputPath).collect(toList()).parallelStream().map(Main::processLine)
.forEach(w::println);
我得到这个典型的输出:
Cores: 4
CPU time: 138.43 s
Real time: 35.00 s
CPU utilization: 98.87%
什么可以解释这种影响,我该如何解决它以获得充分利用?
请注意,我最初是在 servlet 输入流的读取器上观察到的,因此它不是特定于 .FileReader