使用队列的生产者/使用者线程

我想创建某种线程应用程序。但我不确定在两者之间实现队列的最佳方法是什么。Producer/Consumer

因此,我有两个想法(这两个想法都可能完全错误)。我想知道哪个会更好,如果它们都很糟糕,那么实现队列的最佳方法是什么。我关注的主要是我在这些示例中对队列的实现。我正在扩展一个 Queue 类,它是一个内部类,并且是线程安全的。下面是两个示例,每个示例有 4 个类。

主类-

public class SomeApp
{
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        consumer = new Consumer();
        producer = new Producer();
    }
} 

消费类-

public class Consumer implements Runnable
{
    public Consumer()
    {
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = QueueHandler.dequeue();
            //do some stuff with the object
        }
    }
}

生产者类-

public class Producer implements Runnable
{
    public Producer()
    {
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {
        while(true)
        {
            //add to the queue some sort of unique object
            QueueHandler.enqueue(new Object());
        }
    }
}

队列类-

public class QueueHandler
{
    //This Queue class is a thread safe (written in house) class
    public static Queue<Object> readQ = new Queue<Object>(100);

    public static void enqueue(Object object)
    {
        //do some stuff
        readQ.add(object);
    }

    public static Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}

主类-

public class SomeApp
{
    Queue<Object> readQ;
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        readQ = new Queue<Object>(100);
        consumer = new Consumer(readQ);
        producer = new Producer(readQ);
    }
} 

消费类-

public class Consumer implements Runnable
{
    Queue<Object> queue;

    public Consumer(Queue<Object> readQ)
    {
        queue = readQ;
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = queue.dequeue();
            //do some stuff with the object
        }
    }
}

生产者类-

public class Producer implements Runnable
{
    Queue<Object> queue;

    public Producer(Queue<Object> readQ)
    {
        queue = readQ;
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {

        while(true)
        {
            //add to the queue some sort of unique object
            queue.enqueue(new Object());
        }
    }
}

队列类-

//the extended Queue class is a thread safe (written in house) class
public class QueueHandler extends Queue<Object>
{    
    public QueueHandler(int size)
    {
        super(size); //All I'm thinking about now is McDonalds.
    }

    public void enqueue(Object object)
    {
        //do some stuff
        readQ.add();
    }

    public Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}

然后走!


答案 1

Java 5+拥有这种事情所需的所有工具。您将需要:

  1. 将所有生产者放在一个执行器服务中;
  2. 把你所有的消费者放在另一个ExecutorService;
  3. 如有必要,请使用 BlockingQueue 在两者之间进行通信。

我说(3)的“如果有必要”,因为根据我的经验,这是一个不必要的步骤。您所要做的就是将新任务提交到使用者执行器服务。所以:

final ExecutorService producers = Executors.newFixedThreadPool(100);
final ExecutorService consumers = Executors.newFixedThreadPool(100);
while (/* has more work */) {
  producers.submit(...);
}
producers.shutdown();
producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
consumers.shutdown();
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

所以直接提交到.producersconsumers


答案 2

好的,正如其他人所指出的,最好的办法是使用包。我强烈推荐“Java并发实践”。这是一本很棒的书,几乎涵盖了你需要知道的一切。java.util.concurrent

至于你的特定实现,正如我在注释中指出的那样,不要从构造函数启动线程 - 它可能是不安全的。

撇开这一点不谈,第二种实现似乎更好。您不希望将队列放在静态字段中。你可能只是无缘无故地失去了灵活性。

如果你想继续你自己的实现(我猜是为了学习目的?),至少提供一个方法。您应该构造对象(可以实例化该对象),然后调用以启动线程。start()Threadstart()

编辑:有自己的队列,所以这可能会令人困惑。这里有一些东西可以帮助你开始。ExecutorService

public class Main {
    public static void main(String[] args) {
        //The numbers are just silly tune parameters. Refer to the API.
        //The important thing is, we are passing a bounded queue.
        ExecutorService consumer = new ThreadPoolExecutor(1,4,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(100));

        //No need to bound the queue for this executor.
        //Use utility method instead of the complicated Constructor.
        ExecutorService producer = Executors.newSingleThreadExecutor();

        Runnable produce = new Produce(consumer);
        producer.submit(produce);   
    }
}

class Produce implements Runnable {
    private final ExecutorService consumer;

    public Produce(ExecutorService consumer) {
        this.consumer = consumer;
    }

    @Override
    public void run() {
        Pancake cake = Pan.cook();
        Runnable consume = new Consume(cake);
        consumer.submit(consume);
    }
}

class Consume implements Runnable {
    private final Pancake cake;

    public Consume(Pancake cake){
        this.cake = cake;
    }

    @Override
    public void run() {
        cake.eat();
    }
}

进一步编辑:对于生产者,而不是,你可以做这样的事情:while(true)

@Override
public void run(){
    while(!Thread.currentThread().isInterrupted()){
        //do stuff
    }
}

这样,您可以通过调用 来关闭执行程序。如果您使用 ,它不会关闭。.shutdownNow()while(true)

还要注意,仍然容易受到攻击(即一个人将停止处理)ProducerRuntimeExceptionsRuntimeException


推荐