如何向不同的Web服务发送多个异步请求?

我需要向许多不同的Web服务发送多个请求并接收结果。问题是,如果我逐个发送请求,只要我需要单独发送和处理所有请求,就需要。

我想知道如何一次发送所有请求并接收结果。

如下面的代码所示,我有三个主要方法,每个方法都有自己的子方法。每个子方法都向其关联的Web服务发送请求并接收结果;因此,例如,要接收Web服务9的结果,我必须等到从1到8的所有Web服务完成,逐个发送所有请求并接收其结果需要很长时间。

如下图所示,没有一个方法和子方法彼此相关,所以我可以调用它们并以任何顺序接收它们的结果,唯一重要的是接收每个子方法的结果并填充它们的关联列表。

private List<StudentsResults> studentsResults = new ArrayList();
private List<DoctorsResults> doctorsResults = new ArrayList();
private List<PatientsResults> patientsResults = new ArrayList();

main (){
    retrieveAllLists();
}

retrieveAllLists(){

     retrieveStudents();
     retrieveDoctors();
     retrievePatients();
}

retrieveStudents(){

    this.studentsResults = retrieveStdWS1();   //send request to Web Service 1 to receive its  list of students
    this.studentsResults = retrieveStdWS2();  //send request to Web Service 2 to receive its  list of students
    this.studentsResults = retrieveStdWS3(); //send request to Web Service 3 to receive its  list of students

}

retrieveDoctors(){

   this.doctorsResults = retrieveDocWS4();   //send request to Web Service 4 to receive its list of doctors
   this.doctorsResults = retrieveDocWS5();  //send request to Web Service 5 to receive its  list of doctors
   this.doctorsResults = retrieveDocWS6(); //send request to Web Service 6 to receive its  list of doctors

}

retrievePatients(){

   this.patientsResults = retrievePtWS7();   //send request to Web Service 7 to receive its list of patients
   this.patientsResults = retrievePtWS8();  //send request to Web Service 8 to receive its list of patients
   this.patientsResults = retrievePtWS9(); //send request to Web Service 9 to receive its list of patients

}

答案 1

这是一种简单的分叉连接方法,但为清楚起见,您可以启动任意数量的线程,并在以后检索结果(如此方法)。

    ExecutorService pool = Executors.newFixedThreadPool(10);
    List<Callable<String>> tasks = new ArrayList<>();
    tasks.add(new Callable<String>() {
        public String call() throws Exception {
            Thread.sleep((new Random().nextInt(5000)) + 500);
            return "Hello world";
        }

    });
    List<Future<String>> results = pool.invokeAll(tasks);

    for (Future<String> future : results) {
        System.out.println(future.get());
    }
    pool.shutdown();

更新,完成:

这是一个冗长但可行的解决方案。我是临时写的,没有编译。鉴于这三个列表具有不同的类型,并且WS方法是单独的,因此它并不是真正的模块化,而是尝试使用您最好的编程技能,看看是否可以更好地模块化它。

    ExecutorService pool = Executors.newFixedThreadPool(10);

    List<Callable<List<StudentsResults>>> stasks = new ArrayList<>();
    List<Callable<List<DoctorsResults>>> dtasks = new ArrayList<>();
    List<Callable<List<PatientsResults>>> ptasks = new ArrayList<>();

    stasks.add(new Callable<List<StudentsResults>>() {
        public List<StudentsResults> call() throws Exception {
            return retrieveStdWS1();
        }

    });
    stasks.add(new Callable<List<StudentsResults>>() {
        public List<StudentsResults> call() throws Exception {
            return retrieveStdWS2();
        }

    });
    stasks.add(new Callable<List<StudentsResults>>() {
        public List<StudentsResults> call() throws Exception {
            return retrieveStdWS3();
        }

    });

    dtasks.add(new Callable<List<DoctorsResults>>() {
        public List<DoctorsResults> call() throws Exception {
            return retrieveDocWS4();
        }

    });
    dtasks.add(new Callable<List<DoctorsResults>>() {
        public List<DoctorsResults> call() throws Exception {
            return retrieveDocWS5();
        }

    });
    dtasks.add(new Callable<List<DoctorsResults>>() {
        public List<DoctorsResults> call() throws Exception {
            return retrieveDocWS6();
        }

    });

    ptasks.add(new Callable<List<PatientsResults>>() {
        public List<PatientsResults> call() throws Exception {
            return retrievePtWS7();
        }

    });
    ptasks.add(new Callable<List<PatientsResults>>() {
        public List<PatientsResults> call() throws Exception {
            return retrievePtWS8();
        }

    });
    ptasks.add(new Callable<List<PatientsResults>>() {
        public List<PatientsResults> call() throws Exception {
            return retrievePtWS9();
        }

    });

    List<Future<List<StudentsResults>>> sresults = pool.invokeAll(stasks);
    List<Future<List<DoctorsResults>>> dresults = pool.invokeAll(dtasks);
    List<Future<List<PatientsResults>>> presults = pool.invokeAll(ptasks);

    for (Future<List<StudentsResults>> future : sresults) {
       this.studentsResults.addAll(future.get());
    }
    for (Future<List<DoctorsResults>> future : dresults) {
       this.doctorsResults.addAll(future.get());
    }
    for (Future<List<PatientsResults>> future : presults) {
       this.patientsResults.addAll(future.get());
    }
    pool.shutdown();

每个都返回一个结果列表,并在其自己的单独线程中调用。
调用该方法时,会将结果返回到主线程。
在 完成之前,结果不可用,因此不存在并发问题。CallableFuture.get()Callable


答案 2

因此,为了好玩,我提供了两个工作示例。第一个演示了在java 1.5之前的旧学校方法。第二个显示了使用java 1.5中可用工具的更清晰的方法:

import java.util.ArrayList;

public class ThreadingExample
{
    private ArrayList <MyThread> myThreads;

    public static class MyRunnable implements Runnable
    {
        private String data;

        public String getData()
        {
            return data;
        }

        public void setData(String data)
        {
            this.data = data;
        }

        @Override
        public void run()
        {
        }
    }

    public static class MyThread extends Thread
    {
        private MyRunnable myRunnable;

        MyThread(MyRunnable runnable)
        {
            super(runnable);
            setMyRunnable(runnable);
        }

        /**
         * @return the myRunnable
         */
        public MyRunnable getMyRunnable()
        {
            return myRunnable;
        }

        /**
         * @param myRunnable the myRunnable to set
         */
        public void setMyRunnable(MyRunnable myRunnable)
        {
            this.myRunnable = myRunnable;
        }
    }

    public ThreadingExample()
    {
        myThreads = new ArrayList <MyThread> ();
    }

    public ArrayList <String> retrieveMyData ()
    {
        ArrayList <String> allmyData = new ArrayList <String> ();

        if (isComplete() == false)
        {
            // Sadly we aren't done
            return (null);
        }

        for (MyThread myThread : myThreads)
        {
            allmyData.add(myThread.getMyRunnable().getData());
        }

        return (allmyData);
    }

    private boolean isComplete()
    {
        boolean complete = true;

        // wait for all of them to finish
        for (MyThread x : myThreads)
        {
            if (x.isAlive())
            {
                complete = false;
                break;
            }
        }
        return (complete);
    }

    public void kickOffQueries()
    {
        myThreads.clear();

        MyThread a = new MyThread(new MyRunnable()
        {
            @Override
            public void run()
            {
                // This is where you make the call to external services
                // giving the results to setData("");
                setData("Data from list A");
            }
        });
        myThreads.add(a);

        MyThread b = new MyThread (new MyRunnable()
        {
            @Override
            public void run()
            {
                // This is where you make the call to external services
                // giving the results to setData("");
                setData("Data from list B");
            }
        });
        myThreads.add(b);

        for (MyThread x : myThreads)
        {
            x.start();
        }

        boolean done = false;

        while (done == false)
        {
            if (isComplete())
            {
                done = true;
            }
            else
            {
                // Sleep for 10 milliseconds
                try
                {
                    Thread.sleep(10);
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            }
        }
    }


    public static void main(String [] args)
    {
        ThreadingExample example = new ThreadingExample();
        example.kickOffQueries();

        ArrayList <String> data = example.retrieveMyData();
        if (data != null)
        {
            for (String s : data)
            {
                System.out.println (s);
            }
        }
    }
}

这是更简单的工作版本:

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ThreadingExample
{

    public static void main(String [] args)
    {
        ExecutorService service = Executors.newCachedThreadPool();
        Set <Callable<String>> callables = new HashSet <Callable<String>> ();

        callables.add(new Callable<String>()
        {
            @Override
            public String call() throws Exception
            {
                return "This is where I make the call to web service A, and put its results here";
            }
        });

        callables.add(new Callable<String>()
        {
            @Override
            public String call() throws Exception
            {
                return "This is where I make the call to web service B, and put its results here";
            }
        });

        callables.add(new Callable<String>()
        {
            @Override
            public String call() throws Exception
            {
                return "This is where I make the call to web service C, and put its results here";
            }
        });

        try
        {
            List<Future<String>> futures = service.invokeAll(callables);
            for (Future<String> future : futures)
            {
                System.out.println (future.get());
            }
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        catch (ExecutionException e)
        {
            e.printStackTrace();
        }
    }
}

推荐