首先,为了防止那些不喜欢读到我已读完的人将问题标记为重复生产者-消费者日志服务以不可靠的方式关闭 https://stackoverflow.com/questions/31626772/producer-consumer-logging-service-with-unreliable-way-to-shutdown问题。但它并没有完全回答问题,而且答案与书本内容相矛盾。
书中提供了以下代码:
public class LogWriter {
private final BlockingQueue<String> queue;
private final LoggerThread logger;
private static final int CAPACITY = 1000;
public LogWriter(Writer writer) {
this.queue = new LinkedBlockingQueue<String>(CAPACITY);
this.logger = new LoggerThread(writer);
}
public void start() {
logger.start();
}
public void log(String msg) throws InterruptedException {
queue.put(msg);
}
private class LoggerThread extends Thread {
private final PrintWriter writer;
public LoggerThread(Writer writer) {
this.writer = new PrintWriter(writer, true); // autoflush
}
public void run() {
try {
while (true)
writer.println(queue.take());
} catch (InterruptedException ignored) {
} finally {
writer.close();
}
}
}
}
现在我们应该了解如何停止这个过程。我们应该停止记录,但不应该跳过已经提交的消息。
作者研究方法:
public void log(String msg) throws InterruptedException {
if(!shutdownRequested)
queue.put(msg);
else
throw new IllegalArgumentException("logger is shut down");
}
并像这样评论它:
关闭 LogWriter 的另一种方法是设置
“关闭请求”标志以防止进一步的消息被发送
已提交,如清单 7.14 所示。然后消费者就可以排空
收到已请求关闭通知后的队列,
写出任何待处理的消息并解除对任何被阻止的生产者的阻止
在日志中。然而,这种方法存在竞争条件,使得它
不可靠。 log 的实现是一个 check-then-act 序列:
生产者可以观察到该服务尚未关闭
但在关闭后仍然对消息进行排队,同样存在以下风险
生产者可能会在日志中被阻止,并且永远不会被解除阻止。
有一些技巧可以减少这种情况的可能性(例如
消费者在声明队列耗尽之前等待几秒钟),但是
这些并没有改变根本问题,只是改变了可能性
这会导致失败。
这句话对我来说足够难了。
我明白那个
if(!shutdownRequested)
queue.put(msg);
不是原子的,消息可以在关闭后添加到队列中。是的,它不是很准确,但我没有看到问题。队列将被耗尽,当队列为空时,我们可以停止 LoggerThread。尤其我不明白为什么生产者会被屏蔽.
作者没有提供完整的代码,因此我无法理解所有细节。我相信社区大多数人都读过这本书,并且这个例子有详细的解释。
请用完整的代码示例进行解释。
首先要理解的是,当请求关闭时,生产者需要停止接受任何更多请求,而消费者(LoggerThread
在这种情况下)需要排空队列。您在问题中提供的代码仅说明了故事的一方面;生产者拒绝任何进一步的请求shutdownRequested
is true
。在这个例子之后,作者接着说:
然后,消费者可以在收到通知后清空队列
已请求关闭,写出所有待处理的消息并
解除对日志中被阻止的所有生产者的阻止
首先也是最重要的,queue.take
in LoggerThread
如您的问题所示,将无限阻止队列中可用的新消息;但是,如果我们想关闭LoggerThread
(优雅地),我们需要确保关闭代码LoggerThread
有机会执行时shutdownRequested
是真实的而不是无限地被阻止queue.take
.
当作者说消费者可以drain队列,他的意思是LogWritter
可以检查shutdownRequested
如果为 true,则可以调用非阻塞drainTo https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html#drainTo(java.util.Collection)方法在单独的集合中耗尽队列的当前内容,而不是调用queue.take
(或者调用类似的非阻塞方法)。或者,如果shutdownRequested
是假的,LogWriter
可以继续打电话queue.take
照常。
这种方法的真正问题在于log
方法(由生产者调用)被实现。由于它不是原子的,因此多个线程可能会错过以下设置shutdownRequested
为真。如果错过此更新的线程数大于CAPACITY
of the queue
。让我们看一下log
再次方法。 (为了解释添加了大括号):
public void log(String msg) throws InterruptedException {
if(!shutdownRequested) {//A. 1001 threads see shutdownRequested as false and pass the if condition.
//B. At this point, shutdownRequested is set to true by client code
//C. Meanwhile, the LoggerThread which is the consumer sees that shutdownRequested is true and calls
//queue.drainTo to drain all existing messages in the queue instead of `queue.take`.
//D. Producers insert the new message into the queue.
queue.put(msg);//Step E
} else
throw new IllegalArgumentException("logger is shut down");
}
}
如图所示Step E,多个生产者线程可以调用put
而LoggerThread
完成排空队列并退出 w。之前应该不会有任何问题1000th线程调用put
。真正的问题是当1001th线程调用put
。队列容量只有1000,就会阻塞LoggerThread
可能不再存在或订阅queue.take
method.
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)