如何在MapReduce程序中解析PDF文件?

2023-12-03

我想在我的 hadoop 2.2.0 程序中解析 PDF 文件,我发现this,按照它所说的,到现在为止,我有这三个课程:

  1. PDFWordCount:包含map和reduce函数的主类。 (就像原生 hadoop 字数统计样本但代替TextInputFormat我用我的PDFInputFormat class.
  2. PDFRecordReader extends RecordReader<LongWritable, Text>:这是这里的主要工作。特别是我把我的initialize此处功能可获取更多说明。

    public void initialize(InputSplit genericSplit, TaskAttemptContext context)
          throws IOException, InterruptedException {
          System.out.println("initialize");
          System.out.println(genericSplit.toString());
        FileSplit split = (FileSplit) genericSplit;
        System.out.println("filesplit convertion has been done");
        final Path file = split.getPath();
        Configuration conf = context.getConfiguration();
        conf.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
        FileSystem fs = file.getFileSystem(conf);
        System.out.println("fs has been opened");
        start = split.getStart();
        end = start + split.getLength();
        System.out.println("going to open split");
        FSDataInputStream filein = fs.open(split.getPath());
        System.out.println("going to load pdf");
        PDDocument pd = PDDocument.load(filein);
        System.out.println("pdf has been loaded");
        PDFTextStripper stripper = new PDFTextStripper();
        in =
        new LineReader(new ByteArrayInputStream(stripper.getText(pd).getBytes(
            "UTF-8")));
        start = 0;
        this.pos = start;
        System.out.println("init has finished");
      }
    

    (你可以看到我的system.out.printlns 用于调试。 该方法转换失败genericSplit to FileSplit。我在控制台中看到的最后一件事是:

    hdfs://localhost:9000/in:0+9396432
    

    这是genericSplit.toString()

  3. PDFInputFormat extends FileInputFormat<LongWritable, Text>:这只是创造了new PDFRecordReader in createRecordReader method.

我想知道我的错误是什么?

我需要额外的课程或其他什么吗?


阅读 PDF 并不那么困难,您需要扩展类 FileInputFormat 以及 RecordReader。 FileInputClass 不应该能够分割 PDF 文件,因为它们是二进制文件。

public class PDFInputFormat extends FileInputFormat<Text, Text> {

  @Override
  public RecordReader<Text, Text> createRecordReader(InputSplit split,
    TaskAttemptContext context) throws IOException, InterruptedException {
      return new PDFLineRecordReader();
  }

  // Do not allow to ever split PDF files, even if larger than HDFS block size
  @Override
  protected boolean isSplitable(JobContext context, Path filename) {
    return false;
  }

}

然后 RecordReader 自行执行读取(我使用 PDFBox 来读取 PDF)。

public class PDFLineRecordReader extends RecordReader<Text, Text> {

private Text key = new Text();
private Text value = new Text();
private int currentLine = 0;
private List<String> lines = null;

private PDDocument doc = null;
private PDFTextStripper textStripper = null;

@Override
public void initialize(InputSplit split, TaskAttemptContext context)
        throws IOException, InterruptedException {

    FileSplit fileSplit = (FileSplit) split;
    final Path file = fileSplit.getPath();

    Configuration conf = context.getConfiguration();
    FileSystem fs = file.getFileSystem(conf);
    FSDataInputStream filein = fs.open(fileSplit.getPath());

    if (filein != null) {

        doc = PDDocument.load(filein);

        // Konnte das PDF gelesen werden?
        if (doc != null) {
            textStripper = new PDFTextStripper();
            String text = textStripper.getText(doc);

            lines = Arrays.asList(text.split(System.lineSeparator()));
            currentLine = 0;

        }

    }
}

    // False ends the reading process
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {

    if (key == null) {
        key = new Text();
    }

    if (value == null) {
        value = new Text();
    }

    if (currentLine < lines.size()) {
        String line = lines.get(currentLine);

        key.set(line);

        value.set("");
        currentLine++;

        return true;
    } else {

        // All lines are read? -> end
        key = null;
        value = null;
        return false;
    }
}

@Override
public Text getCurrentKey() throws IOException, InterruptedException {
    return key;
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {
    return value;
}

@Override
public float getProgress() throws IOException, InterruptedException {
    return (100.0f / lines.size() * currentLine) / 100.0f;
}

@Override
public void close() throws IOException {

    // If done close the doc
    if (doc != null) {
        doc.close();
    }

}

希望这可以帮助!

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在MapReduce程序中解析PDF文件? 的相关文章

随机推荐