使用并行流返回提供的最快值

我有一组供应商,它们都支持相同的结果,但速度不同(和不同)。

我想要一种优雅的方式来同时启动供应商,一旦其中一个产生了价值,就将其返回(丢弃其他结果)。

我尝试过使用并行流和为此,但它似乎总是阻塞,直到产生所有结果。Stream.findAny()

下面是一个单元测试,演示了我的问题:

import org.junit.Test;

import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.stream.Stream;

import static org.junit.Assert.*;

public class RaceTest {

    @Test
    public void testRace() {
        // Set up suppliers
        Set<Supplier<String>> suppliers = Collections.newSetFromMap(new ConcurrentHashMap<>());
        suppliers.add(() -> "fast"); // This supplier returns immediately
        suppliers.add(() -> {
            try {
                Thread.sleep(10_000);
                return "slow";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }); // This supplier takes 10 seconds to produce a value

        Stream<Supplier<String>> stream = suppliers.parallelStream();
        assertTrue(stream.isParallel()); // Stream can work in parallel
        long start = System.currentTimeMillis();
        Optional<String> winner = stream
                .map(Supplier::get)
                .findAny();
        long duration = System.currentTimeMillis() - start;
        assertTrue(winner.isPresent()); // Some value was produced
        assertEquals("fast", winner.get()); // The value is "fast"
        assertTrue(duration < 9_000); // The whole process took less than 9 seconds
    }
}

测试的结果是最后一个断言失败,因为整个测试大约需要 10 秒才能完成。

我在这里做错了什么?


答案 1

在这种情况下,您最好使用而不是(相同的功能签名),并使用自Java 5以来存在的良好的旧并发API:CallableSupplier

Set<Callable<String>> suppliers=new HashSet<>();
suppliers.add(() -> "fast"); // This supplier returns immediately
suppliers.add(() -> {
        Thread.sleep(10_000);
        return "slow";
    }
);

ExecutorService es=Executors.newCachedThreadPool();
try {

    String result = es.invokeAny(suppliers);
    System.out.println(result);

} catch (InterruptedException|ExecutionException ex) {
    Logger.getLogger(MyClass.class.getName()).log(Level.SEVERE, null, ex);
}
es.shutdown();

请注意,整个“全部运行并返回最快”如何成为单个方法调用...

它还具有取消/中断所有挂起操作的好处,只要有一个结果可用,因此慢速操作实际上不会在这里等待整整十秒钟(好吧,在大多数情况下,因为时间不是确定性的)。


答案 2

您当前使用的代码是非确定性的。引用 findAny() 的 Javadoc:

此操作的行为是显式非确定性的;可以自由选择流中的任何元素。

您可以使用完成服务并向其提交所有任务。然后,CompletionService.take() 将返回第一个已完成任务的 。Future

long start = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool(suppliers.size());
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
suppliers.forEach(s -> completionService.submit(() -> s.get()));
String winner = completionService.take().get();
long duration = System.currentTimeMillis() - start;
assertEquals("fast", winner); // The value is "fast"
assertTrue(duration < 9_000); // The whole process took less than 9 seconds