从调用方法并行执行方法

我有一个库,客户正在使用它,他们正在传递具有的对象,以及其中的其他一些字段。现在我使用这个对象来创建一个URL,然后我进行HTTP调用,我的服务返回一个JSON响应,我用它来创建一个对象,并将这个对象返回给他们。DataRequestuseridtimeoutDataRequestRestTemplateDataResponseDataResponse

下面是客户通过将对象传递给它来使用的我的类。我正在使用客户传入的超时值来超时请求,如果它在方法上花费了太多时间。DataClientDataRequestDataRequestgetSyncData

public class DataClient implements Client {

    private RestTemplate restTemplate = new RestTemplate();
    // first executor
    private ExecutorService service = Executors.newFixedThreadPool(15);

    @Override
    public DataResponse getSyncData(DataRequest key) {
        DataResponse response = null;
        Future<DataResponse> responseFuture = null;

        try {
            responseFuture = getAsyncData(key);
            response = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
        } catch (TimeoutException ex) {
            response = new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR);
            responseFuture.cancel(true);
            // logging exception here               
        }

        return response;
    }   

    @Override
    public Future<DataResponse> getAsyncData(DataRequest key) {
        DataFetcherTask task = new DataFetcherTask(key, restTemplate);
        Future<DataResponse> future = service.submit(task);

        return future;
    }
}

DataFetcherTask类:

public class DataFetcherTask implements Callable<DataResponse> {

    private DataRequest key;
    private RestTemplate restTemplate;

    public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public DataResponse call() throws Exception {
        // In a nutshell below is what I am doing here. 
        // 1. Make an url using DataRequest key.
        // 2. And then execute the url RestTemplate.
        // 3. Make a DataResponse object and return it.

        // I am calling this whole logic in call method as LogicA
    }
}

截至目前,我的类负责一个键,如上所示。DataFetcherTaskDataRequest

问题陈述:-

现在我有一个小小的设计更改。客户将使用(keyA)对象(例如keyA)对象传递给我的库,然后我将通过使用(keyA)对象中存在的用户ID对另一个服务(我在当前设计中没有这样做)进行新的http调用,这将为我提供用户ID的列表,因此我将使用这些用户ID并进行其他一些(keyB, keyC, keyD) 对象一个,用于响应中返回的每个用户 ID。然后我将有对象,它将具有keyB,keyC和keyD对象。Max 元素中的将有三个,仅此而已。DataRequestDataRequestDataRequestList<DataRequest>DataRequestList<DataRequest>

现在,对于其中的每个对象,我想并行执行上述方法,然后通过为每个键添加每个对象来制作。因此,我将对 .此并行调用背后的想法是获取同一全局超时值中最多三个键的数据。DataRequestList<DataRequest>DataFetcherTask.callList<DataResponse>DataResponseDataFetcherTask.call

所以我的建议是 - 类将返回对象而不是,然后签名和方法也会改变。所以这是算法:DataFetcherTaskList<DataResponse>DataResponsegetSyncDatagetAsyncData

  • 使用客户传递的数据请求对象,通过调用另一个 HTTP 服务来制作。List<DataRequest>
  • 对 方法中的每个 in 进行并行调用,并将对象返回给客户而不是 。DataRequestList<DataRequest>DataFetcherTask.callList<DataResponse>DataResponse

通过这种方式,我也可以在步骤1和步骤2上应用相同的全局超时。如果上述任何一步需要时间,我们将只是在方法中超时。getSyncData

DataFetcherTask设计更改后的类:

public class DataFetcherTask implements Callable<List<DataResponse>> {

    private DataRequest key;
    private RestTemplate restTemplate;
    // second executor here
    private ExecutorService executorService = Executors.newFixedThreadPool(10);

    public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public List<DataResponse> call() throws Exception {
        List<DataRequest> keys = generateKeys();
        CompletionService<DataResponse> comp = new ExecutorCompletionService<>(executorService);

        int count = 0;
        for (final DataRequest key : keys) {
            comp.submit(new Callable<DataResponse>() {
                @Override
                public DataResponse call() throws Exception {
                    return performDataRequest(key);
                }
            });
        }

        List<DataResponse> responseList = new ArrayList<DataResponse>();
        while (count-- > 0) {
            Future<DataResponse> future = comp.take();
            responseList.add(future.get());
        }
        return responseList;
    }

    // In this method I am making a HTTP call to another service
    // and then I will make List<DataRequest> accordingly.
    private List<DataRequest> generateKeys() {
        List<DataRequest> keys = new ArrayList<>();
        // use key object which is passed in contructor to make HTTP call to another service
        // and then make List of DataRequest object and return keys.
        return keys;
    }       

    private DataResponse performDataRequest(DataRequest key) {
        // This will have all LogicA code here which is shown in my original design.
        // everything as it is same..
    }
}

现在我的问题是 -

  • 它必须像这样吗?解决这个问题的正确设计是什么?我的意思是在另一种方法中有方法看起来很奇怪?callcall
  • 我们是否需要有两个像我在代码中的执行器?有没有更好的方法来解决这个问题,或者我们可以在这里做任何类型的简化/设计更改?

我已经简化了代码,以便这个想法清楚地了解我想要做什么。


答案 1

正如你问题的评论中提到的,你可以使用Java的ForkJoin框架。这将为您节省.DataFetcherTask

您只需要在 中使用 a 并将您的转换为 a(的子类型之一)。这使您可以轻松地并行执行其他子任务。ForkJoinPoolDataClientDataFetcherTaskRecursiveTaskForkJoinTask

因此,经过这些修改后,您的代码将如下所示:

DataFetcherTask

现在是 首先生成密钥并为每个生成的密钥调用子任务的 is。这些子任务在与父任务相同的方式中执行。DataFetcherTaskRecursiveTaskForkJoinPool

public class DataFetcherTask extends RecursiveTask<List<DataResponse>> {

  private final DataRequest key;
  private final RestTemplate restTemplate;

  public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
      this.key = key;
      this.restTemplate = restTemplate;
  }

  @Override
  protected List<DataResponse> compute() {
    // Create subtasks for the key and invoke them
    List<DataRequestTask> requestTasks = requestTasks(generateKeys());
    invokeAll(requestTasks);

    // All tasks are finished if invokeAll() returns.
    List<DataResponse> responseList = new ArrayList<>(requestTasks.size());
    for (DataRequestTask task : requestTasks) {
      try {
        responseList.add(task.get());
      } catch (InterruptedException | ExecutionException e) {
        // TODO - Handle exception properly
        Thread.currentThread().interrupt();
        return Collections.emptyList();
      }
    }

    return responseList;
  }

  private List<DataRequestTask> requestTasks(List<DataRequest> keys) {
    List<DataRequestTask> tasks = new ArrayList<>(keys.size());
    for (DataRequest key : keys) {
      tasks.add(new DataRequestTask(key));
    }

    return tasks;
  }

  // In this method I am making a HTTP call to another service
  // and then I will make List<DataRequest> accordingly.
  private List<DataRequest> generateKeys() {
      List<DataRequest> keys = new ArrayList<>();
      // use key object which is passed in contructor to make HTTP call to another service
      // and then make List of DataRequest object and return keys.
      return keys;
  }

  /** Inner class for the subtasks. */
  private static class DataRequestTask extends RecursiveTask<DataResponse> {

    private final DataRequest request;

    public DataRequestTask(DataRequest request) {
      this.request = request;
    }

    @Override
    protected DataResponse compute() {
      return performDataRequest(this.request);
    }

    private DataResponse performDataRequest(DataRequest key) {
      // This will have all LogicA code here which is shown in my original design.
      // everything as it is same..
      return new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK);
    }
  }

}

数据客户端

除了新的线程池之外,不会有太大变化:DataClient

public class DataClient implements Client {

  private final RestTemplate restTemplate = new RestTemplate();
  // Replace the ExecutorService with a ForkJoinPool
  private final ForkJoinPool service = new ForkJoinPool(15);

  @Override
  public List<DataResponse> getSyncData(DataRequest key) {
      List<DataResponse> responsList = null;
      Future<List<DataResponse>> responseFuture = null;

      try {
          responseFuture = getAsyncData(key);
          responsList = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
      } catch (TimeoutException | ExecutionException | InterruptedException ex) {
          responsList = Collections.singletonList(new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR));
          responseFuture.cancel(true);
          // logging exception here
      }

      return responsList;
  }

  @Override
  public Future<List<DataResponse>> getAsyncData(DataRequest key) {
      DataFetcherTask task = new DataFetcherTask(key, this.restTemplate);
      return this.service.submit(task);
  }
}

一旦你在Java8上,你可以考虑将实现更改为s。然后它看起来像这样:CompletableFuture

DataClientCF

public class DataClientCF {

  private final RestTemplate restTemplate = new RestTemplate();
  private final ExecutorService executor = Executors.newFixedThreadPool(15);

  public List<DataResponse> getData(DataRequest initialKey) {
    return CompletableFuture.supplyAsync(() -> generateKeys(initialKey), this.executor)
      .thenApply(requests -> requests.stream().map(this::supplyRequestAsync).collect(Collectors.toList()))
      .thenApply(responseFutures -> responseFutures.stream().map(future -> future.join()).collect(Collectors.toList()))
      .exceptionally(t -> { throw new RuntimeException(t); })
      .join();
  }

  private List<DataRequest> generateKeys(DataRequest key) {
    return new ArrayList<>();
  }

  private CompletableFuture<DataResponse> supplyRequestAsync(DataRequest key) {
    return CompletableFuture.supplyAsync(() -> new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK), this.executor);
  }
}

正如评论中提到的,Guava会为Java7提供类似的功能,但是如果没有Lambda,它们往往会变得笨拙。ListenableFuture


答案 2

据我所知,RestTemplate正在阻塞,在ForkJoinPool JavaDoc中的ForkJoinTask中是这样说的:

计算应避免使用同步方法或块,并且除了加入其他任务或使用同步器(如同步器)来通告与分叉/连接调度合作之外,还应尽量减少其他阻塞同步。...
任务也不应执行阻塞 IO,...

呼入式呼叫是多余的。
而且您不需要两个执行器。您也可以在 中返回部分结果。这可以像DataClient

样完成.javagetSyncData(DataRequest key)

public class DataClient implements Client {

    private RestTemplate restTemplate = new RestTemplate();
    // first executor
    private ExecutorService service = Executors.newFixedThreadPool(15);

    @Override
    public List<DataResponse> getSyncData(DataRequest key) {
        List<DataResponse> responseList = null;
        DataFetcherResult response = null;
        try {
            response = getAsyncData(key);
            responseList = response.get(key.getTimeout(), key.getTimeoutUnit());
        } catch (TimeoutException ex) {
            response.cancel(true);
            responseList = response.getPartialResult();
        }
        return responseList;
    }

    @Override
    public DataFetcherResult getAsyncData(DataRequest key) {
        List<DataRequest> keys = generateKeys(key);
        final List<Future<DataResponse>> responseList = new ArrayList<>();
        final CountDownLatch latch = new CountDownLatch(keys.size());//assume keys is not null
        for (final DataRequest _key : keys) {
            responseList.add(service.submit(new Callable<DataResponse>() {
                @Override
                public DataResponse call() throws Exception {
                    DataResponse response = null;
                    try {
                        response = performDataRequest(_key);
                    } finally {
                        latch.countDown();
                        return response;
                    }
                }
            }));
        }
        return new DataFetcherResult(responseList, latch);
    }

    // In this method I am making a HTTP call to another service
    // and then I will make List<DataRequest> accordingly.
    private List<DataRequest> generateKeys(DataRequest key) {
        List<DataRequest> keys = new ArrayList<>();
        // use key object which is passed in contructor to make HTTP call to another service
        // and then make List of DataRequest object and return keys.
        return keys;
    }

    private DataResponse performDataRequest(DataRequest key) {
        // This will have all LogicA code here which is shown in my original design.
        // everything as it is same..
        return null;
    }
}

DataFetcherResult.java

public class DataFetcherResult implements Future<List<DataResponse>> {
    final List<Future<DataResponse>> futures;
    final CountDownLatch latch;

    public DataFetcherResult(List<Future<DataResponse>> futures, CountDownLatch latch) {
        this.futures = futures;
        this.latch = latch;
    }

    //non-blocking
    public List<DataResponse> getPartialResult() {
        List<DataResponse> result = new ArrayList<>(futures.size());
        for (Future<DataResponse> future : futures) {
            try {
                result.add(future.isDone() ? future.get() : null);
                //instead of null you can return new DataResponse(DataErrorEnum.NOT_READY, DataStatusEnum.ERROR);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
                //ExecutionException or CancellationException could be thrown, especially if DataFetcherResult was cancelled
                //you can handle them here and return DataResponse with corresponding DataErrorEnum and DataStatusEnum
            }
        }
        return result;
    }

    @Override
    public List<DataResponse> get() throws ExecutionException, InterruptedException {
        List<DataResponse> result = new ArrayList<>(futures.size());
        for (Future<DataResponse> future : futures) {
            result.add(future.get());
        }
        return result;
    }

    @Override
    public List<DataResponse> get(long timeout, TimeUnit timeUnit)
            throws ExecutionException, InterruptedException, TimeoutException {
        if (latch.await(timeout, timeUnit)) {
            return get();
        }
        throw new TimeoutException();//or getPartialResult()
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        boolean cancelled = true;
        for (Future<DataResponse> future : futures) {
            cancelled &= future.cancel(mayInterruptIfRunning);
        }
        return cancelled;
    }

    @Override
    public boolean isCancelled() {
        boolean cancelled = true;
        for (Future<DataResponse> future : futures) {
            cancelled &= future.isCancelled();
        }
        return cancelled;
    }

    @Override
    public boolean isDone() {
        boolean done = true;
        for (Future<DataResponse> future : futures) {
            done &= future.isDone();
        }
        return done;
    }

    //and etc.
}

我写了它,它看起来很棒,但请注意有一个细微差别。您可能会因为与未来的状态不同步而陷入困境一段时间。而且可能会发生这种情况,但并非所有期货都会同时回归。因为它们已经通过了Callable的块,但没有改变内部,这仍然等于。
因此,在内部呼叫可能会导致一小段延迟。
这里也描述了类似的案例。

Get with timeout 可以使用 futures 重写,您可以从类中删除。CountDownLatchDataFetcherResult.get(long timeout, TimeUnit timeUnit)CountDownLatchlatch.getCount() == 0future.isDone() == truelatch.countDown();finally {}stateNEWget()get(long timeout, TimeUnit timeUnit)DataFetcherResult.get(...)future.get(long timeout, TimeUnit timeUnit)CountDownLatch

public List<DataResponse> get(long timeout, TimeUnit timeUnit)
        throws ExecutionException, InterruptedException{
    List<DataResponse> result = new ArrayList<>(futures.size());
    long timeoutMs = timeUnit.toMillis(timeout);
    boolean timeout = false;
    for (Future<DataResponse> future : futures) {
        long beforeGet = System.currentTimeMillis();
        try {
            if (!timeout && timeoutMs > 0) {
                result.add(future.get(timeoutMs, TimeUnit.MILLISECONDS));
                timeoutMs -= System.currentTimeMillis() - beforeGet;
            } else {
                if (future.isDone()) {
                    result.add(future.get());
                } else {
                    //result.add(new DataResponse(DataErrorEnum.NOT_READY, DataStatusEnum.ERROR)); ?
                }
            }
        } catch (TimeoutException e) {
            result.add(new DataResponse(DataErrorEnum.TIMEOUT, DataStatusEnum.ERROR));
            timeout = true;
        }
        //you can also handle ExecutionException or CancellationException here
    }

    return result;
}

此代码作为示例给出,应在生产中使用之前进行测试,但似乎合法:)