如何停止所有正在运行的线程,如果其中一个线程引发异常?

2022-09-04 03:46:34

在我的一个应用程序中,我正在使用该类创建固定线程池并等待线程完成。如果进程没有引发任何异常,则此方法工作正常。如果任何线程中发生异常,我需要停止所有正在运行的线程并将错误报告给主线程。任何人都可以帮我解决这个问题吗?ExecutorServiceCountDownLatch

这是我用于执行多个线程的示例代码。

    private void executeThreads()
    {
        int noOfThreads = 10;
        ExecutorService executor = Executors.newFixedThreadPool(noOfThreads);     
        try      
       {
        CountDownLatch latch = new CountDownLatch(noOfThreads);
        for(int i=0; i< noOfThreads; i++){
         executor.submit(new ThreadExecutor(latch));
        }
        latch.await();           
       }
       catch(Exception e)
       {
        e.printStackTrace();
       }
       finally
       {
        executor.shutDown();
       }
   }

这是执行器类

     public class ThreadExecutor implements Callable<String> {
        CountDownLatch latch ;
        public ThreadExecutor(CountDownLatch latch){
            this.latch = latch;
        }   

    @Override
    public String call() throws Exception
    {
        doMyTask(); // process logic goes here!
        this.latch.countDown();
        return "Success";
    }

=============================================================================

谢谢大家:)

我已经纠正了我的课程,如下所示,现在正在工作。

private void executeThreads()
    {
        int noOfThreads = 10;
        ExecutorService executor = Executors.newFixedThreadPool(noOfThreads);     
       ArrayList<Future<Object>> futureList = new ArrayList<Future<Object>>(noOfThreads );
    try
    {
        userContext = BSF.getMyContext();
        CountDownLatch latch = new CountDownLatch(noOfComponentsToImport);

        for(ImportContent artifact:artifactList){
            futureList.add(executor.submit(new ThreadExecutor(latch)));
        }

        latch.await();

        for(Future<Object> future : futureList)
        {
                  try
                  {
                      future.get();                 
                   }
                   catch(ExecutionException e)
                   {   //handle it               
                    }
        }           

    }
    catch (Exception e) {
       //handle it
    }
    finally
    {
        executor.shutdown();      

        try
        {
            executor.awaitTermination(90000, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e)
        {
           //handle it
        }
    }
   }

执行器类 :

public class ThreadExecutor implements Callable<String> {
        private static volatile boolean isAnyError;
        CountDownLatch latch ;
        public ThreadExecutor(CountDownLatch latch){
            this.latch = latch;
        }   

    @Override
    public String call() throws Exception
    {

      try{
            if(!isAnyError)
            { 
               doMyTask(); // process logic goes here!
            }
     }
     catch(Exception e)
     {
        isAnyError = true ;
        throw e;
      }
      finally
      {
        this.latch.countDown();
       }
        return "Success";
    }

答案 1

使用 , 完成 和 超过任务持续时间的 完成 (即它之后不会被关闭):ExecutorCompletionServiceExecutorService

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

class Threader {

    static ExecutorService service = Executors.newCachedThreadPool();

    public static void main(String[] args) {
        new Threader().start();
        service.shutdown();
    }

    private void start() {
        CompletionService<Void> completionService = new ExecutorCompletionService<Void>(
                service);
        /*
         * Holds all the futures for the submitted tasks
         */
        List<Future<Void>> results = new ArrayList<Future<Void>>();

        for (int i = 0; i < 3; i++) {
            final int callableNumber = i;

            results.add(completionService.submit(new Callable<Void>() {
                 @Override
                 public Void call() throws Exception {
                     System.out.println("Task " + callableNumber
                             + " in progress");
                     try {
                         Thread.sleep(callableNumber * 1000);
                     } catch (InterruptedException ex) {
                         System.out.println("Task " + callableNumber
                                 + " cancelled");
                         return null;
                     }
                     if (callableNumber == 1) {
                         throw new Exception("Wrong answer for task "
                                 + callableNumber);
                     }
                     System.out.println("Task " + callableNumber + " complete");
                     return null;
                 }
             }));
        }

        boolean complete = false;
        while (!complete) {
            complete = true;
            Iterator<Future<Void>> futuresIt = results.iterator();
            while (futuresIt.hasNext()) {
                if (futuresIt.next().isDone()) {
                    futuresIt.remove();
                } else {
                    complete = false;
                }
            }

            if (!results.isEmpty()) {
                try {
                    /*
                     * Examine results of next completed task
                     */
                    completionService.take().get();
                } catch (InterruptedException e) {
                    /*
                     * Give up - interrupted.
                     */
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                } catch (ExecutionException e) {
                    /*
                     * The task threw an exception
                     */
                    System.out.println("Execution exception " + e.getMessage());
                    complete = true;
                    for (Future<Void> future : results) {
                        if (!future.isDone()) {
                            System.out.println("Cancelling " + future);
                            future.cancel(true);
                        }
                    }
                }
            }
        }

    }
}

输出类似于:

Task 0 in progress
Task 2 in progress
Task 1 in progress
Task 0 complete
Execution exception java.lang.Exception: Wrong answer for task 1
Cancelling java.util.concurrent.FutureTask@a59698
Task 2 cancelled

其中,由于任务 1 失败,任务 2 被取消。


答案 2

我强烈建议您使用坚固的机制来倒计时闩锁。使用单独的机制,使用包罗万象的“检测线程中的错误”。try-finally { latch.countDown(); }