我正在尝试生产者-消费者问题的多个生产者-多个消费者用例。
我使用 BlockingQueue 在多个生产者/消费者之间共享公共队列。
下面是我的代码。
Producer
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable {
private BlockingQueue inputQueue;
private static volatile int i = 0;
private volatile boolean isRunning = true;
public Producer(BlockingQueue q){
this.inputQueue=q;
}
public synchronized void run() {
//produce messages
for(i=0; i<10; i++)
{
try {
inputQueue.put(new Integer(i));
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Produced "+i);
}
finish();
}
public void finish() {
//you can also clear here if you wanted
isRunning = false;
}
}
Consumer
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable {
private BlockingQueue inputQueue;
private volatile boolean isRunning = true;
private final Integer POISON_PILL = new Integer(-1);
Consumer(BlockingQueue queue) {
this.inputQueue = queue;
}
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(!inputQueue.isEmpty()) {
try {
Integer queueElement = (Integer) inputQueue.take();
System.out.println("Consumed : " + queueElement.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println("Queue ");
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void finish() {
//you can also clear here if you wanted
isRunning = false;
inputQueue.add(POISON_PILL);
}
}
测试班
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerService {
public static void main(String[] args) {
//Creating BlockingQueue of size 10
BlockingQueue queue = new ArrayBlockingQueue(10);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
//starting producer to produce messages in queue
new Thread(producer).start();
//starting producer to produce messages in queue
new Thread(producer).start();
//starting consumer to consume messages from queue
new Thread(consumer).start();
//starting consumer to consume messages from queue
new Thread(consumer).start();
System.out.println("Producer and Consumer has been started");
}
}
当我运行以下代码时,我没有看到正确的输出。
我在这里做错了什么吗?
您的代码中有相当多没有意义。我建议您坐下来弄清楚代码为何存在以及它在做什么。
如果您删除了isFinshed
旗帜,什么都不会改变。
如果您删除了使用synchronized
在生产者中,您将拥有并发生产者。将仅在同步块中访问的字段设置为易失性没有任何好处。
如果生产者是并发的,那么共享循环计数器是没有意义的。
通常,生产者会发送毒丸,而消费者不会消费该毒丸。例如如果您有两个消费者,一个可能会添加药丸,另一个可能会服用它。你的消费者忽视了毒丸,就像他们忽视了isFinished
flag.
您不想仅仅因为队列暂时为空就停止消费者。否则,它将看不到生产者生成的所有消息,甚至可能看不到任何消息。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)