我遇到一种情况,不同的线程填充一个队列(生产者),并且一个消费者从该队列中检索元素。我的问题是,当从队列中检索这些元素之一时,某些元素会丢失(丢失信号?)。生产者代码是:
class Producer implements Runnable {
private Consumer consumer;
Producer(Consumer consumer) { this.consumer = consumer; }
@Override
public void run() {
consumer.send("message");
}
}
它们是通过以下方式创建和运行的:
ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 20; i++) {
executor.execute(new Producer(consumer));
}
消费者代码是:
class Consumer implements Runnable {
private Queue<String> queue = new ConcurrentLinkedQueue<String>();
void send(String message) {
synchronized (queue) {
queue.add(message);
System.out.println("SIZE: " + queue.size());
queue.notify();
}
}
@Override
public void run() {
int counter = 0;
synchronized (queue) {
while(true) {
try {
System.out.println("SLEEP");
queue.wait(10);
} catch (InterruptedException e) {
Thread.interrupted();
}
System.out.println(counter);
if (!queue.isEmpty()) {
queue.poll();
counter++;
}
}
}
}
}
运行代码时,有时会添加 20 个元素并检索 20 个元素,但在其他情况下检索到的元素少于 20 个。知道如何解决这个问题吗?
我建议您使用 BlockingQueue 而不是队列。 LinkedBlockingDeque 可能是您的不错选择。
你的代码看起来像这样:
void send(String message) {
synchronized (queue) {
queue.put(message);
System.out.println("SIZE: " + queue.size());
}
}
然后你需要
queue.take()
在你的消费者线程上
这个想法是 .take() 将阻塞,直到队列中有可用的项目,然后准确返回one(我认为这就是您的实施遇到问题的地方:轮询时缺少通知)。 .put() 负责为您处理所有通知。无需等待/通知。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)