我想在 Java 8 中尝试 ForkJoinPool,所以我编写了一个小程序来搜索给定目录中名称包含特定关键字的所有文件。
Program:
public class DirectoryService {
public static void main(String[] args) {
FileSearchRecursiveTask task = new FileSearchRecursiveTask("./DIR");
ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
List<String> files = pool.invoke(task);
pool.shutdown();
System.out.println("Total no of files with hello" + files.size());
}
}
class FileSearchRecursiveTask extends RecursiveTask<List<String>> {
private String path;
public FileSearchRecursiveTask(String path) {
this.path = path;
}
@Override
protected List<String> compute() {
File mainDirectory = new File(path);
List<String> filetedFileList = new ArrayList<>();
List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>();
if(mainDirectory.isDirectory()) {
System.out.println(Thread.currentThread() + " - Directory is " + mainDirectory.getName());
if(mainDirectory.canRead()) {
File[] fileList = mainDirectory.listFiles();
for(File file : fileList) {
System.out.println(Thread.currentThread() + "Looking into:" + file.getAbsolutePath());
if(file.isDirectory()) {
FileSearchRecursiveTask task = new FileSearchRecursiveTask(file.getAbsolutePath());
recursiveTasks.add(task);
task.fork();
} else {
if (file.getName().contains("hello")) {
System.out.println(file.getName());
filetedFileList.add(file.getName());
}
}
}
}
for(FileSearchRecursiveTask task : recursiveTasks) {
filetedFileList.addAll(task.join());
}
}
return filetedFileList;
}
}
当目录没有太多子目录和文件时,该程序可以正常工作,但如果目录很大,则会抛出 OutOfMemoryError 。
我的理解是,最大线程数(包括补偿线程)是有限的,那么为什么会出现这个错误呢?我的程序中是否缺少任何内容?
Caused by: java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)
at java.util.concurrent.ForkJoinPool.createWorker(ForkJoinPool.java:1486)
at java.util.concurrent.ForkJoinPool.tryCompensate(ForkJoinPool.java:2020)
at java.util.concurrent.ForkJoinPool.awaitJoin(ForkJoinPool.java:2057)
at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:390)
at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719)
at FileSearchRecursiveTask.compute(DirectoryService.java:51)
at FileSearchRecursiveTask.compute(DirectoryService.java:20)
at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:94)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.tryRemoveAndExec(ForkJoinPool.java:1107)
at java.util.concurrent.ForkJoinPool.awaitJoin(ForkJoinPool.java:2046)
at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:390)
at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719)
at FileSearchRecursiveTask.compute(DirectoryService.java:51)
at FileSearchRecursiveTask.compute(DirectoryService.java:20)
at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:94)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
你不应该分出超出所有认知的新任务。基本上,只要另一个工作线程有机会接受分叉的作业并在本地进行评估,您就应该进行分叉。然后,一旦你分叉了一个任务,就不要调用join()
就在之后。虽然底层框架将启动补偿线程以确保您的作业将继续进行,而不是仅仅让所有线程阻塞等待子任务,但这将创建大量可能超出系统能力的线程。
这是您的代码的修订版本:
public class DirectoryService {
public static void main(String[] args) {
FileSearchRecursiveTask task = new FileSearchRecursiveTask(new File("./DIR"));
List<String> files = task.invoke();
System.out.println("Total no of files with hello " + files.size());
}
}
class FileSearchRecursiveTask extends RecursiveTask<List<String>> {
private static final int TARGET_SURPLUS = 3;
private File path;
public FileSearchRecursiveTask(File file) {
this.path = file;
}
@Override
protected List<String> compute() {
File directory = path;
if(directory.isDirectory() && directory.canRead()) {
System.out.println(Thread.currentThread() + " - Directory is " + directory.getName());
return scan(directory);
}
return Collections.emptyList();
}
private List<String> scan(File directory)
{
File[] fileList = directory.listFiles();
if(fileList == null || fileList.length == 0) return Collections.emptyList();
List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>();
List<String> filteredFileList = new ArrayList<>();
for(File file: fileList) {
System.out.println(Thread.currentThread() + "Looking into:" + file.getAbsolutePath());
if(file.isDirectory())
{
if(getSurplusQueuedTaskCount() < TARGET_SURPLUS)
{
FileSearchRecursiveTask task = new FileSearchRecursiveTask(file);
recursiveTasks.add(task);
task.fork();
}
else filteredFileList.addAll(scan(file));
}
else if(file.getName().contains("hello")) {
filteredFileList.add(file.getAbsolutePath());
}
}
for(int ix = recursiveTasks.size() - 1; ix >= 0; ix--) {
FileSearchRecursiveTask task = recursiveTasks.get(ix);
if(task.tryUnfork()) task.complete(scan(task.path));
}
for(FileSearchRecursiveTask task: recursiveTasks) {
filteredFileList.addAll(task.join());
}
return filteredFileList;
}
}
进行处理的方法已被分解为接收目录作为参数的方法,因此我们可以在本地将其用于不一定与某个目录关联的任意目录。FileSearchRecursiveTask
实例。
然后,该方法使用getSurplusQueuedTaskCount() https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinTask.html#getSurplusQueuedTaskCount--确定尚未被其他工作线程拾取的本地排队任务的数量。确保有一些有助于工作平衡。但如果这个数字超过阈值,处理将在本地完成,而不会分叉更多作业。
本地处理后,它迭代任务并使用tryUnfork() https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinTask.html#tryUnfork--识别未被其他工作线程窃取的作业并在本地处理它们。向后迭代,从最年轻的工作开始,增加了找到一些工作的机会。
只是事后,它join()
包含现在已完成或当前由另一个工作线程处理的所有子作业。
请注意,我更改了启动代码以使用默认池。这使用“CPU 核心数”减去一个工作线程,加上启动线程,即main
本例中的线程。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)