如何在Java 8中创建阻塞后台加载程序?

2022-09-02 01:49:43

问题

如何在Java 8中创建合适的后台加载程序?条件:

  • 数据应在后台加载
  • 加载后应显示数据
  • 加载数据时,不应接受进一步的请求
  • 如果在加载数据时有请求,则应在一定超时(例如5秒)后安排另一次加载

例如,目的是接受重装请求,但不接受被请求淹没的数据库。

断续器

这是一个 MCVE。它由一个后台任务组成,只需调用 Thread.sleep 2 秒钟即可模拟加载。该任务每秒计划一次,这自然会导致后台加载任务的重叠,应避免这种情况。

public class LoadInBackgroundExample {

  /**
   * A simple background task which should perform the data loading operation. In this minimal example it simply invokes Thread.sleep
   */
  public static class BackgroundTask implements Runnable {

    private int id;

    public BackgroundTask(int id) {
      this.id = id;
    }

    /**
     * Sleep for a given amount of time to simulate loading.
     */
    @Override
    public void run() {

      try {

        System.out.println("Start #" + id + ": " + Thread.currentThread());

        long sleepTime = 2000; 
        Thread.sleep( sleepTime);

      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {
        System.out.println("Finish #" + id + ": " + Thread.currentThread());
      }

    }
  }

  /**
   * CompletableFuture which simulates loading and showing data.
   * @param taskId Identifier of the current task
   */
  public static void loadInBackground( int taskId) {

    // create the loading task
    BackgroundTask backgroundTask = new BackgroundTask( taskId);

    // "load" the data asynchronously
    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {

      @Override
      public String get() {

        CompletableFuture<Void> future = CompletableFuture.runAsync(backgroundTask);

        try {

          future.get();

        } catch (InterruptedException | ExecutionException e) {
          e.printStackTrace();
        }

        return "task " + backgroundTask.id;
      }
    });

    // display the data after they are loaded
    CompletableFuture<Void> future = completableFuture.thenAccept(x -> {

      System.out.println( "Background task finished:" + x);

    });

  }


  public static void main(String[] args) {

    // runnable which invokes the background loader every second
    Runnable trigger = new Runnable() {

      int taskId = 0;

      public void run() { 

        loadInBackground( taskId++);

      }
    };

    // create scheduler
    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(trigger, 0, 1, TimeUnit.SECONDS);

    // cancel the scheudler and the application after 10 seconds
    scheduler.schedule(() -> beeperHandle.cancel(true), 10, TimeUnit.SECONDS);

    try {
      beeperHandle.get();
    } catch (Throwable th) {
    }

    System.out.println( "Cancelled");
    System.exit(0);
  }

}

输出如下:

Start #0: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Start #1: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Start #2: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Finish #0: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Background task finished:task 0
Finish #1: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Background task finished:task 1
Start #3: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Finish #2: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Background task finished:task 2
Start #4: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Start #5: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Finish #3: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Background task finished:task 3
Start #6: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Finish #4: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Background task finished:task 4
Finish #5: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Background task finished:task 5
Start #7: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Finish #6: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Start #8: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Background task finished:task 6
Start #9: Thread[ForkJoinPool.commonPool-worker-4,5,main]
Finish #7: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Background task finished:task 7
Start #10: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Finish #8: Thread[ForkJoinPool.commonPool-worker-6,5,main]
Background task finished:task 8
Cancelled

目标是跳过例如#1和#2,因为#0仍在运行。

问题

在哪里正确设置了阻止机制?是否应使用同步?还是一些?如果是这样,它应该在方法内部还是在其他地方?AtomicBooleanget()


答案 1

您已经有一个线程池来执行任务。在另一个异步执行器中运行任务并不一定使事情变得复杂(当您使用时)ForkJoinPoolCompletableFuture)

让它变得简单:

public static void loadInBackground(int taskId) {
    // create the loading task
    BackgroundTask backgroundTask = new BackgroundTask(taskId);
    // No need to run in async, as it already in executor
    backgroundTask.run();
}

ScheduledExecutorService 将确保当您使用 scheduleAtFixedRate 调用它时,一次只运行一个任务

创建并执行一个周期性操作,该操作在给定的初始延迟后首先启用,随后在给定的时间段内启用;也就是说,执行将在初始延迟之后开始,然后是初始延迟+周期,然后是初始延迟 + 2 *周期,依此类推。如果任务的任何执行遇到异常,则禁止后续执行。否则,任务将仅通过取消或终止执行程序而终止。如果执行此任务所需的时间超过其周期,则后续执行可能会延迟开始,但不会同时执行


答案 2

以以下要求为前提:

  • 数据应在后台加载
  • 加载后应显示数据
  • 加载数据时,不应接受进一步的请求
  • 如果在加载数据时有请求,则应在一定超时(例如5秒)后安排另一次加载

该解决方案应基于 、 和 以下公式构建:Executors.newSingleThreadExecutor()CompletableFutureLinkedBlockingQueue

public class SingleThreadedLoader {

  private static class BackgroundTask extends CompletableFuture<String> {

    private final String query;

    private BackgroundTask(String query) {
      this.query = query;
    }

    public String getQuery() {
      return query;
    }
  }

  private final BlockingQueue<BackgroundTask> tasks = new LinkedBlockingQueue<>();
  // while data are loaded no further requests should be accepted
  private final Executor executor = Executors.newSingleThreadExecutor();

  private final int delaySeconds;

  private AtomicReference<Instant> lastExecution = new AtomicReference<>(Instant.EPOCH);

  public SingleThreadedLoader(int delaySeconds) {
    this.delaySeconds = delaySeconds;
    setupLoading();
  }

  public BackgroundTask loadInBackground(String query) {
    log("Enqueued query " + query);
    BackgroundTask task = new BackgroundTask(query);
    tasks.add(task);
    return task;
  }

  private void setupLoading() {
    // data should be loaded in background
    executor.execute(() -> {
      while (true) {
        try {
          // if there were requests while the data were loaded
          // another loading should be scheduled after a certain timeout (e. g. 5 seconds)
          Instant prev = lastExecution.get();
          long delay = Duration.between(prev, Instant.now()).toSeconds();
          if (delay < delaySeconds) {
            log("Waiting for 5 seconds before next data loading");
            TimeUnit.SECONDS.sleep(delaySeconds - delay);
          }
          BackgroundTask task = tasks.take();
          try {
            String query = task.getQuery();
            String data = loadData(query);
            task.complete(data);
          } catch (Exception e) {
            task.completeExceptionally(e);
          }
          lastExecution.set(Instant.now());
        } catch (InterruptedException e) {
          log(e.getMessage());
          return;
        }
      }
    });
  }

  private String loadData(String query) {
    try {
      log("Loading data for " + query);
      TimeUnit.SECONDS.sleep(2);
      log("Loaded data for " + query);
      return "Result " + query;
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }

  private static void log(String str) {
    String time = LocalTime.now().truncatedTo(ChronoUnit.SECONDS).format(DateTimeFormatter.ISO_TIME);
    String thread = Thread.currentThread().getName();
    System.out.println(time + ' ' + thread + ": " + str);
  }

  public static void main(String[] args) throws Exception {
    SingleThreadedLoader loader = new SingleThreadedLoader(5);
    // after the loading the data should be displayed
    loader.loadInBackground("1").thenAccept(SingleThreadedLoader::log);
    loader.loadInBackground("2").thenAccept(SingleThreadedLoader::log);
    loader.loadInBackground("3").thenAccept(SingleThreadedLoader::log);

    log("Do another work in the main thread");

    TimeUnit.SECONDS.sleep(30);
  }
}

执行后,标准输出将具有以下输出:

10:29:26 main: Enqueued query 1
10:29:26 pool-1-thread-1: Loading data for 1
10:29:26 main: Enqueued query 2
10:29:26 main: Enqueued query 3
10:29:26 main: Do another work in the main thread
10:29:28 pool-1-thread-1: Loaded data for 1
10:29:28 pool-1-thread-1: Result 1
10:29:28 pool-1-thread-1: Waiting for 5 seconds before next data loading
10:29:33 pool-1-thread-1: Loading data for 2
10:29:36 pool-1-thread-1: Loaded data for 2
10:29:36 pool-1-thread-1: Result 2
10:29:36 pool-1-thread-1: Waiting for 5 seconds before next data loading
10:29:41 pool-1-thread-1: Loading data for 3
10:29:43 pool-1-thread-1: Loaded data for 3
10:29:43 pool-1-thread-1: Result 3