将CompletableFuture<Stream<T>>转换为Publisher<T>是否正确?

为了允许对来自 a 的结果流进行多次迭代,我正在考虑以下方法之一:CompletableFuture<Stream<String>>

  1. 通过以下方式将生成的未来转换为:CompletableFuture<List<String>>teams.thenApply(st -> st.collect(toList()))

  2. 将生成的未来转换为缓存:Flux<String>Flux.fromStream(teams::join).cache();

Flux<T>是在项目反应堆中实施的。Publisher<T>

用例:

我想从一个数据源中获取一个英超球队名称的序列(例如),该数据源提供了一个带有(基于足球数据RESTful API,例如 http://api.football-data.org/v1/soccerseasons/445/leagueTable)的对象。使用和我们有:Stream<String>LeagueStanding[]AsyncHttpClientGson

CompletableFuture<Stream<String>> teams = asyncHttpClient
    .prepareGet("http://api.football-data.org/v1/soccerseasons/445/leagueTable")
    .execute()
    .toCompletableFuture()
    .thenApply(Response::getResponseBody)
    .thenApply(body -> gson.fromJson(body, League.class));
    .thenApply(l -> stream(l.standings).map(s -> s.teamName));

要重用生成的流,我有两个选择:

1. CompletableFuture<List<String>> res = teams.thenApply(st -> st.collect(toList()))

2. Flux<String> res = Flux.fromStream(teams::join).cache()

Flux<T>不那么冗长,提供了我需要的一切。但是,在这种情况下使用它是否正确?

或者我应该改用?还是有其他更好的选择?CompletableFuture<List<String>>

更新了一些想法 (2018-03-16)

CompletableFuture<List<String>>:

  • [优点]将在延续中收集,当我们需要继续处理未来的结果时,也许它已经完成了。List<String>
  • [缺点]声明详细程度。
  • [缺点]如果我们只想使用它一次,那么我们不需要在.List<T>

Flux<String>:

  • [优点]声明简洁性
  • [优点]如果我们只想使用它一次,那么我们可以省略并将其转发到下一层,下一层可以利用反应式API,例如Web flux反应式控制器,例如.cache()@GetMapping(produces =MediaType.TEXT_EVENT_STREAM) public Flux<String> getTeams() {…}
  • [缺点]如果我们想重用它,我们必须将其包装在可缓存()中,这反过来又会增加第一次遍历的开销,因为它必须将生成的项存储在内部缓存中。Flux<T>Flux<T>….cache()

答案 1
    CompletableFuture<Stream<String>> teams = ...;
    Flux<String> teamsFlux = Mono.fromFuture(teams).flatMapMany(stream -> Flux.fromStream(stream));

Flux.fromStream(teams::join)是一种代码异味,因为它阻止了当前线程从另一个线程上运行的结果中获取结果。CompletableFuture


答案 2

一旦你下载了排行榜,并且从这个表格中提取了球队名称,我不确定你是否需要一个背压就绪的流来迭代这些项目。将流转换为标准列表(或数组)应该足够好,并且应该具有更好的性能,不是吗?

例如:

String[] teamNames = teams.join().toArray(String[]::new);

推荐