我正在使用 Hadoop 0.20.2(无法更改),并且我想向我的输入路径添加一个过滤器。数据如下:
/path1/test_a1
/path1/test_a2
/path1/train_a1
/path1/train_a2
我只想处理所有文件train在他们中。
查看 FileInputFormat 类建议使用:
FileInputFormat.setInputPathFilter(Job job, Class<? extends PathFilter> filter)
这就是我的问题开始的地方,因为路径过滤器 http://hadoop.apache.org/docs/r0.20.2/api/org/apache/hadoop/fs/PathFilter.html是一个接口 - 当然,我可以扩展该接口,但我仍然没有实现。因此,我实现了该接口:
class TrainFilter implements PathFilter
{
boolean accept(Path path)
{
return path.toString().contains("train");
}
}
当我使用 TrainFilter 作为 PathFilter 时,代码会编译,但是当我运行它时,我会收到异常,因为输入路径搞砸了。在不设置过滤器的情况下,我的代码会运行 /path1 下面的所有文件,但是,在设置过滤器时,它会抛出错误:
InvalidInputException: Input path does not exist hdfs://localhost:9000/path1
这是我在驱动程序代码中设置的方法:
job.setMapperClass(....class);
job.setInputFormatClass(....class);
job.setMapOutputKeyClass(...class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPathFilter(job, TrainFilter.class);
FileInputFormat.addInputPath(job, new Path("/path1/"));
FileOutputFormat.setOutputPath(job, new Path("/path2/"));
job.waitForCompletion(true);
对我在这里做错了什么有什么建议吗?
编辑:我发现了问题。对 PathFilter 的第一次调用始终是目录本身(/path1),并且由于它不包含(“train”),因此目录本身无效,因此引发异常。这让我想到了另一个问题:如何测试任意路径是否是目录?据我所知,我需要对文件系统的引用,它不是 PathFilter 的默认参数之一。