public class BlockingQueueTest { private static AtomicInteger count = new AtomicInteger(0); private static AtomicInteger countCreate = new AtomicInteger(0); public static void main(String args[]) { //定义一个阻塞队列,存放文件信息 BlockingQueue fileQueue = new ArrayBlockingQueue(5); String path = "F:\\Song"; File root = new File(path); //生产者线程去遍历文件,放入队列 FileCrawler fileCrawler = new FileCrawler(fileQueue, root); //消费者线程去遍历队列,取出文件 Indexer indexer = new Indexer(fileQueue); //开启几个生产者线程开始遍历文件 for (File file : root.listFiles()) { new Thread(new FileCrawler(fileQueue, file)).start(); } //开启7个消费者者线程开始取出文件 for (int i = 0; i < 7; i++) { new Thread(new Indexer(fileQueue)).start(); } try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } System.out.println("生产者生产:" + countCreate.get()); System.out.println("消费者取到:" + count.get()); } static class FileCrawler implements Runnable { private final BlockingQueue fileQueue; private final File root; FileCrawler(BlockingQueue fileQueue, File root) { this.fileQueue = fileQueue; this.root = root; } public void run() { try { System.out.println("生产者开始生产:" + fileQueue.size()); crawl(root); } catch (InterruptedException e) { e.printStackTrace(); } } private void crawl(File root) throws InterruptedException { File[] files = root.listFiles(); if (files != null) { for (File file : files) { if (file.isDirectory()) { crawl(file); } else { fileQueue.put(file); //put countCreate.incrementAndGet(); } } } } } static class Indexer implements Runnable { private final BlockingQueue fileQueue; Indexer(BlockingQueue fileQueue) { this.fileQueue = fileQueue; } public void run() { while (true) { try { System.out.println("消费者开始消费:" + fileQueue.size()); File file = (File) fileQueue.take(); //take count.incrementAndGet(); System.out.println(file.getName()); } catch (InterruptedException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } } } } } |