并发编程-生产者消费者模式Java代码实现
生产者消费者模式
-
生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据。
-
消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据。
消费队列可以用来平衡生产和消费的线程资源。
代码如下:
- 使用双向链表和Synchronized锁来实现消息队列。
- 使用Excutors中的创建线程池的方法模拟生产者和消费者线程。
其他可见代码中的注释。
import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class MessageTest {
public static void main(String[] args) {
ExecutorService producer = Executors.newFixedThreadPool(3, new ThreadFactory() {
private AtomicInteger t = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "生产者-" + t.getAndIncrement());
}
});
ExecutorService consumer = Executors.newFixedThreadPool(2, new ThreadFactory() {
private AtomicInteger t = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "消费者-" + t.getAndIncrement());
}
});
MessageQueue<Message> queue = new MessageQueue<>(2);
for (int i = 0; i < 5; i++) {
int id = i;
producer.submit(()->{
queue.put(new Message(id, "value : " + id));
});
}
for (;;) {
consumer.submit(()->{
Message take = queue.take();
});
}
}
}
class MessageQueue<T>{
private LinkedList<T> list = new LinkedList<>();
private int capacity;
public MessageQueue(int capacity) {
this.capacity = capacity;
}
public T take(){
synchronized (list){
String t_name = Thread.currentThread().getName();
while(list.isEmpty()){
System.out.println("["+t_name+"]"+" 队列为空,消费者线程等待...");
try {
list.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
T message = list.removeFirst();
System.out.println("["+t_name+"]"+" 消费消息: " + message.toString());
list.notifyAll();
return message;
}
}
public void put(T message){
synchronized (list){
String t_name = Thread.currentThread().getName();
while(list.size()==capacity){
System.out.println("["+t_name+"]"+" 队列已满,生产者线程等待...");
try {
list.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
list.addLast(message);
System.out.println("["+t_name+"]"+" 生产消息: " + message.toString());
list.notifyAll();
}
}
}
class Message{
private int id;
private String value;
public Message(int id, String value) {
this.id = id;
this.value = value;
}
public int getId() {
return id;
}
public String getValue() {
return value;
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", value='" + value + '\'' +
'}';
}
}
运行结果:
可以看到首先消费者线程消费,但是队列为空,则线程阻塞等待,之后生产者-1和生产者-3生产消息,但是因为队列容量为2,当生产者-2想要继续放入消息时,被阻塞。之后消费者-2进行消费消息之后,生产者-2被唤醒又可以将消息放入到消息队列中去…
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)