在并发编程中,比较经典的编程例子就是生产者和消费者模型。下面就是一个例子来诠释一下什么是生产者和消费者以及他们的特点和注意点。
1、先定义一个数据对象,
public class Data {
private String id;
private String name;
public Data(String id,String name){
this.id = id;
this.name = name;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Data [id=" + id + ", name=" + name + "]";
}
}
2.定义一个生产者,实现Runnable接口。
public class Provider implements Runnable{
private BlockingQueue<Data> queue;
private volatile boolean isRunning = true;
private static AtomicInteger count = new AtomicInteger();
private static Random r = new Random();
public Provider(BlockingQueue queue){
this.queue = queue;
}
@Override
public void run() {
while(isRunning){
try {
Thread.sleep(r.nextInt(1000));
int id = count.incrementAndGet();
Data data = new Data(Integer.toString(id),"数据"+id);
System.out.println("当前线程:"+ Thread.currentThread().getName() + ",获取了数据,id为:"+ id+ ",进行装载到公共缓冲区中。。。");
if(!this.queue.offer(data,2,TimeUnit.SECONDS)){
System.out.print("提交缓冲区数据失败");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.print("aaa");
}
}
public void stop(){
this.isRunning = false;
}
}
这里有几个注意点,一个就是对共享缓冲区的选择,作为生产者–消费者模型而言,共享缓冲区一定要具备阻塞的能力。所以这边选择的是阻塞队列。还有一个就是在并发编程的时候,如果需要使用类似i++这种id自增长的功能,需要使用Atomic包下的并发类。因为这些类是采用CAS设计的,不会产生并发问题。
3.消费者
public class Consumer implements Runnable {
private BlockingQueue<Data> queue;
public Consumer(BlockingQueue queu){
this.queue = queu;
}
private static Random r = new Random();
@Override
public void run() {
while(true){
try{
Data data = this.queue.take();
Thread.sleep(r.nextInt(1000));
System.out.print("当前消费线程"+Thread.currentThread().getName() +",消费成功,消费id为"+data.getId());
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
}
消费者主要就是从阻塞队列中获取数据,如果队列中没有元素,则会释放CPU,然后等待。(注意这里使用的是take而不是poll,不同点在于take在没有元素的时候会释放CPU,而poll则是直接返回null)。
main函数:
public class Main {
public static void main(String[] args){
BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10);
Provider p1 = new Provider(queue);
Provider p2 = new Provider(queue);
Provider p3 = new Provider(queue);
Consumer c1 = new Consumer(queue);
Consumer c2 = new Consumer(queue);
Consumer c3 = new Consumer(queue);
ExecutorService cachepool = Executors.newCachedThreadPool();
cachepool.execute(p1);
cachepool.execute(p2);
cachepool.execute(p3);
cachepool.execute(c1);
cachepool.execute(c2);
cachepool.execute(c3);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
p1.stop();
p2.stop();
p3.stop();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)