使用多线程在 Java 中并行化 for 循环

我对java非常陌生,我想使用执行器服务或使用Java中的任何其他方法并行化嵌套的for循环。我想创建一些固定数量的线程,以便CPU不会完全被线程获取。

    for(SellerNames sellerNames : sellerDataList) {
        for(String selleName : sellerNames) {
        //getSellerAddress(sellerName)
        //parallize this task
        }
    }

卖家数据列表的大小 = 1000,卖家名称的大小 = 5000。

现在我想创建 10 个线程,并平均地为每个线程分配相等的任务块。这是对于第 i 个 sellerDataList,第一个线程应该获取 500 个名称的地址,第二个线程应该获取下一个 500 个名称的地址,依此类推。
完成这项工作的最佳方法是什么?


答案 1

有两种方法可以使其并行运行:流和执行器。

使用流

您可以使用并行流,并将其余部分留给 jvm。在这种情况下,您无法对何时发生的事情进行太多控制。另一方面,您的代码将易于阅读和维护:

    sellerDataList.stream().forEach(sellerNames -> {
        Stream<String> stream = StreamSupport.stream(sellerNames.spliterator(), true); // true means use parallel stream
        stream.forEach(sellerName -> {
            getSellerAddress(sellerName);
        });
    });

使用执行器服务

假设,您需要 5 个线程,并且希望能够等到任务完成。然后,您可以使用具有 5 个线程的固定线程池并使用 -s,这样您就可以等到它们完成。Future

    final ExecutorService executor = Executors.newFixedThreadPool(5); // it's just an arbitrary number
    final List<Future<?>> futures = new ArrayList<>();
    for (SellerNames sellerNames : sellerDataList) {
        for (final String sellerName : sellerNames) {
            Future<?> future = executor.submit(() -> {
                getSellerAddress(sellerName);
            });
            futures.add(future);
        }
    }
    try {
        for (Future<?> future : futures) {
            future.get(); // do anything you need, e.g. isDone(), ...
        }
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }

答案 2

如果您使用的是并行流,您仍然可以通过创建自己的ForkJoinPool来控制线程。

List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
  .collect(Collectors.toList());

ForkJoinPool customThreadPool = new ForkJoinPool(4);
long actualTotal = customThreadPool.submit(
  () -> aList.parallelStream().reduce(0L, Long::sum)).get();

在这个网站上,它被描述得很好。https://www.baeldung.com/java-8-parallel-streams-custom-threadpool