Hadoop:Reducer 将 Mapper 输出写入输出文件

2024-04-06

我遇到了一个非常非常奇怪的问题。减速器确实可以工作,但是如果我检查输出文件,我只找到了映射器的输出。 当我尝试调试时,在将映射器的输出值类型从 Longwritable 更改为 Text 后,我​​发现字数示例存在相同的问题

    package org.myorg;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;

public class WordCount extends Configured implements Tool {

   public static class Map
       extends Mapper<LongWritable, Text, Text, Text> {
     private final static IntWritable one = new IntWritable(1);
     private Text word = new Text();

     public void map(LongWritable key, Text wtf, Context context)
         throws IOException, InterruptedException {
       String line = wtf.toString();
       StringTokenizer tokenizer = new StringTokenizer(line);
       while (tokenizer.hasMoreTokens()) {
         word.set(tokenizer.nextToken());
         context.write(word, new Text("frommapper"));
       }
     }
   }

   public static class Reduce
       extends Reducer<Text, Text, Text, Text> {
     public void reduce(Text key, Text wtfs,
         Context context) throws IOException, InterruptedException {
/*
       int sum = 0;
       for (IntWritable val : wtfs) {
         sum += val.get();
       }
       context.write(key, new IntWritable(sum));*/
    context.write(key,new Text("can't output"));
     }
   }

   public int run(String [] args) throws Exception {
     Job job = new Job(getConf());
     job.setJarByClass(WordCount.class);
     job.setJobName("wordcount");


     job.setOutputKeyClass(Text.class);
     job.setMapOutputValueClass(Text.class);
       job.setOutputValueClass(Text.class);
     job.setMapperClass(Map.class);
     //job.setCombinerClass(Reduce.class);
     job.setReducerClass(Reduce.class);

     job.setInputFormatClass(TextInputFormat.class);
     job.setOutputFormatClass(TextOutputFormat.class);

     FileInputFormat.setInputPaths(job, new Path(args[0]));
     FileOutputFormat.setOutputPath(job, new Path(args[1]));

     boolean success = job.waitForCompletion(true);
     return success ? 0 : 1;
         }

   public static void main(String[] args) throws Exception {
     int ret = ToolRunner.run(new WordCount(), args);
     System.exit(ret);
   }
}

这是结果

JobClient:     Combine output records=0
12/06/13 17:37:46 INFO mapred.JobClient:     Map input records=7
12/06/13 17:37:46 INFO mapred.JobClient:     Reduce shuffle bytes=116
12/06/13 17:37:46 INFO mapred.JobClient:     Reduce output records=7
12/06/13 17:37:46 INFO mapred.JobClient:     Spilled Records=14
12/06/13 17:37:46 INFO mapred.JobClient:     Map output bytes=96
12/06/13 17:37:46 INFO mapred.JobClient:     Combine input records=0
12/06/13 17:37:46 INFO mapred.JobClient:     Map output records=7
12/06/13 17:37:46 INFO mapred.JobClient:     Reduce input records=7

然后我在输出文件中发现了奇怪的结果。无论是否更改reduce输出值的类型,将map的输出值类型和reducer的输入键类型更改为Text后都会出现此问题。我也被迫改变 job.setOutputValue(Text.class)

a   frommapper
a   frommapper
a   frommapper
gg  frommapper
h   frommapper
sss frommapper
sss frommapper

Help!


您的reduce 函数参数应如下所示:

public void reduce(Text key, Iterable <Text> wtfs,
     Context context) throws IOException, InterruptedException {

按照定义参数的方式,reduce 操作不会获取值列表,因此它只输出从 map 函数获取的任何输入,因为

sum+ = val.get()

每次都是从 0 到 1 因为每个<key, value>表格中的配对<word, one>单独到减速机。

另外,映射器函数通常不会写入输出文件(我从未听说过它,但我不知道这是否可能)。在通常情况下,总是由reducer写入输出文件。 Mapper 输出是由 Hadoop 透明处理的中间数据。因此,如果您在输出文件中看到某些内容,那一定是减速器输出,而不是映射器输出。如果您想验证这一点,您可以转到您运行的作业的日志,并分别检查每个映射器和减速器中发生的情况。

希望这能为您清除一些事情。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Hadoop:Reducer 将 Mapper 输出写入输出文件 的相关文章

  • 两个相等的组合键不会到达同一个减速器

    我正在使用 MapReduce 框架用 Java 制作 Hadoop 应用程序 我仅使用文本键和值进行输入和输出 在减少最终输出之前 我使用组合器进行额外的计算步骤 但我有一个问题 钥匙没有进入同一个减速器 我在组合器中创建并添加键 值对
  • Hadoop:间隔和 JOIN

    我很新Hadoop我目前正在尝试加入两个数据源 其中关键是interval 说 开始日期 结束日期 例如 input1 20091001 20091002 A 20091011 20091104 B 20080111 20091103 C
  • 大师必须从纱线、火花开始

    当我想要运行 SparkPi 示例时 我收到此错误 beyhan beyhan spark 1 2 0 bin hadoop2 4 home beyhan spark 1 2 0 bin hadoop2 4 bin spark submit
  • hadoop map reduce 中的错误处理

    根据文档 有几种方法可以在 MapReduce 中执行错误处理 以下是一些 A 使用枚举的自定义计数器 每个失败记录的增量 b 记录错误并稍后分析 计数器给出失败记录的数量 然而 为了获取失败记录的标识符 可能是其唯一键 以及发生异常的详细
  • 删除 Pig 输出中的括号和逗号

    目前我的输出如下 130 1 131 1 132 1 133 1 137 1 138 2 139 1 140 1 142 2 143 1 我想要这样 130 1 131 1 132 1 我的代码如下 A LOAD user links sm
  • java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.createDirectoryWithMode0

    我无法解决这个异常 我已经阅读了 hadoop 文档以及我能找到的所有相关的 stackoverflow 问题 我的 fileSystem mkdirs 抛出 Exception in thread main java lang Unsat
  • 如何在linux中的hdfs超级组中添加用户? [关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我正在研究 hdfs 然后我发现某些内容没有为超级用户执行权限检查 如果我的 linux 用户是 sandy 并且我想将 sandy 添加
  • 根据javascript中的条件在数组中插入具有相同键值的多个对象

    例如 我有一个具有多个值的对象 let obj a day1 b c day3 aa 10 bb 11 cc 12 let data let item for let i in obj if i a data title obj a dat
  • 如何获取hive中的数据库用户名和密码

    正在编写jdbc程序来连接hive数据库 我希望在连接 url 中提供用户名和密码 我不知道如何使用 hive QL 获取用户名和密码 有人可以帮我吗 Exception in thread main java sql SQLNonTran
  • MapReduce 排序和洗牌如何工作?

    我正在使用 yelps MRJob 库来实现映射缩减功能 我知道 MapReduce 有一个内部排序和洗牌算法 它根据键对值进行排序 所以如果我在地图阶段后得到以下结果 1 24 4 25 3 26 我知道排序和洗牌阶段将产生以下输出 1
  • 在hbase中创建表

    我是 hbase 和 hadoop 的新手 无论如何 我已经成功建立了一个由3台机器组成的hadoop集群 现在我需要一些帮助来建立数据库 我有一个表 评论 包含字段 user id comments 对评论的评论 可以多个 和状态字段相同
  • 在 Zookeeper 中创建路径的最有效方法,其中路径的根元素可能存在也可能不存在?

    想象一条路径 root child1 child2 child3 想象一下 在动物园管理员中 可能存在其中的一部分 比如 root child1 Zookeeper 中没有等效的 mkdir p 此外 如果任何一个操作失败 ZooKeepe
  • 是否可以直接从文件加载镶木地板表?

    如果我有一个二进制数据文件 可以转换为 csv 格式 有什么方法可以直接从中加载镶木地板表吗 许多教程显示将 csv 文件加载到文本表 然后从文本表加载到镶木地板表 从效率的角度来看 是否可以像我已有的那样直接从二进制文件加载镶木地板表 理
  • HDFS容量:如何阅读“dfsadmin报告”

    我使用的是 Hadoop 2 6 0 当我运行 hdfs dfsadmin report 时 我得到类似这样的信息 简化 Configured Capacity 3 TB Present Capacity 400GB DFS Remaini
  • Spark 2.0 弃用了“DirectParquetOutputCommitter”,没有它如何生活?

    最近 我们从 HDFS 上的 EMR gt S3 上的 EMR 启用了一致视图的 EMRFS 迁移 我们意识到 Spark SaveAsTable 镶木地板格式 写入 S3 的速度比 HDFS 慢约 4 倍 但我们发现使用 DirectPa
  • Hive如何存储数据,什么是SerDe?

    当查询表时 SerDe 将将文件中的字节中的一行数据反序列化为 Hive 内部使用的对象来操作该行数据 执行 INSERT 或 CTAS 时 请参阅第 441 页上的 导入数据 表的 SerDe 将将 Hive 的一行数据的内部表示序列化为
  • 使用 python 从 HDFS 获取文件名列表

    这里是 Hadoop 菜鸟 我搜索了一些有关 hadoop 和 python 入门的教程 但没有取得太大成功 我还不需要使用映射器和缩减器进行任何工作 但这更多是一个访问问题 作为Hadoop集群的一部分 HDFS 上有一堆 dat 文件
  • 无法使用 PDI 步骤连接到 HDFS

    我已经配置成功了Hadoop 2 4 in an Ubuntu 14 04 虚拟机 from a 视窗8系统 Hadoop 安装工作绝对正常 而且我还可以从 Windows 浏览器查看 Namenode 附图如下 所以 我的主机名是 ubu
  • 一个目录下可以有两个oozieworkflow.xml文件吗?

    一个目录下可以有两个oozieworkflow xml文件吗 如果是这样 我如何指示 oozie runner 运行哪一个 您可以有两个工作流程文件 只需为它们指定唯一的名称 然后您可以通过设置oozie wf application pa
  • 为什么组合器输入记录的数量比映射的输出数量多?

    Combiner 在 Mapper 之后 Reducer 之前运行 它将接收给定节点上的 Mapper 实例发出的所有数据作为输入 然后它将输出发送到Reducers 因此组合器输入的记录应小于映射输出的记录 12 08 29 13 38

随机推荐