在线程之间来回传递数据的一种简单方法是使用接口的实现BlockingQueue<E> http://download.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html,位于包中java.util.concurrent http://download.oracle.com/javase/6/docs/api/java/util/concurrent/package-summary.html.
该接口具有以不同行为向集合添加元素的方法:
-
add(E)
: 如果可能的话添加,否则抛出异常
-
boolean offer(E)
: 如果元素已添加则返回 true,否则返回 false
-
boolean offer(E, long, TimeUnit)
:尝试添加元素,等待指定的时间
-
put(E)
:阻塞调用线程,直到添加元素为止
它还定义了具有类似行为的元素检索方法:
-
take()
: 阻塞直到有可用元素
-
poll(long, TimeUnit)
: 检索一个元素或返回 null
我最常使用的实现是:ArrayBlockingQueue http://download.oracle.com/javase/6/docs/api/java/util/concurrent/ArrayBlockingQueue.html, LinkedBlockingQueue http://download.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html and SynchronousQueue http://download.oracle.com/javase/6/docs/api/java/util/concurrent/SynchronousQueue.html.
第一个,ArrayBlockingQueue
,具有固定大小,由传递给其构造函数的参数定义。
第二,LinkedBlockingQueue
,大小不受限制。它总是接受任何元素,即offer
将立即返回true,add
永远不会抛出异常。
第三个,对我来说也是最有趣的一个,SynchronousQueue
,实际上是一个管道。您可以将其视为大小为 0 的队列。它永远不会保留元素:只有当有其他线程尝试从中检索元素时,该队列才会接受元素。相反,如果有另一个线程尝试推送某个元素,则检索操作只会返回该元素。
为了履行homework的要求专门使用信号量进行同步,您可以从我给您提供的有关 SynchronousQueue 的描述中获得启发,并编写一些非常相似的内容:
class Pipe<E> {
private E e;
private final Semaphore read = new Semaphore(0);
private final Semaphore write = new Semaphore(1);
public final void put(final E e) {
write.acquire();
this.e = e;
read.release();
}
public final E take() {
read.acquire();
E e = this.e;
write.release();
return e;
}
}
请注意,此类呈现的行为与我所描述的 SynchronousQueue 类似。
一旦方法put(E)
被调用时它会获取写入信号量,该信号量将保留为空,以便对同一方法的另一个调用将在其第一行阻塞。然后,此方法存储对正在传递的对象的引用,并释放读取的信号量。此版本将使任何线程都可以调用take()
方法继续进行。
第一步take()
然后,方法自然是获取读取信号量,以禁止任何其他线程同时检索该元素。检索元素并将其保存在局部变量中后(练习:如果删除那行 E e = this.e 会发生什么?),该方法释放写信号量,使得该方法put(E)
可以由任何线程再次调用,并返回已保存在局部变量中的内容。
作为一个重要的评论,请注意对正在传递的对象的引用保存在私人领域,以及方法take()
and put(E)
都是final。这是极其重要的,但也常常被忽视。如果这些方法不是最终的(或更糟糕的是,该字段不是私有的),继承类将能够改变take()
and put(E)
违反合同。
最后,您可以避免在中声明局部变量的需要take()
方法通过使用try {} finally {}
如下:
class Pipe<E> {
// ...
public final E take() {
try {
read.acquire();
return e;
} finally {
write.release();
}
}
}
在这里,这个例子的目的只是为了展示try/finally
缺乏经验的开发人员不会注意到这一点。显然,在这种情况下,没有真正的收益。
哦该死,我已经帮你完成大部分作业了。为了报应——并且为了测试你对信号量的了解——,为什么不实现 BlockingQueue 合约定义的一些其他方法呢?例如,您可以实现一个offer(E)
方法和一个take(E, long, TimeUnit)
!
祝你好运。