master和reduce之间共享数据

2023-11-30

我需要使用所有reduce 任务的结果执行聚合。基本上,reduce 任务会找到总和、计数以及一个值。我需要将所有总和和计数相加并找到最终平均值。

我尝试使用conf.setInt在减少。但是当我尝试从主函数访问它时它失败了

class Main {

public static class MyReducer 
extends Reducer<Text, Text,Text,IntWritable> {

    public void reduce(Text key, Iterable<Text> values, 
            Context context
            ) throws IOException, InterruptedException {
        int i = 0;
        int fd = 0, fc = 0;
        fd = context.getConfiguration().getInt("fd", -1);
        fc = context.getConfiguration().getInt("fc", -1);
        //when I check the value of fd, fc here they are fine. fc fd is shared across all reduce tasks and the updated value is seen by all reduce task. Only main function doesnt seem to have access to it.
    }
}

public static void main(String[] args) throws Exception{
    Configuration conf = new Configuration();
    conf.setInt("fc", 5);

    Job job = new Job(conf, "Flight Data");
    job.setJarByClass(FlightData.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setReducerClass(MyReducer.class);

    job.setPartitionerClass(FirstPartitioner.class);
    job.setGroupingComparatorClass(GroupComparator.class);
    job.setSortComparatorClass(KeyComparator.class);


    job.setNumReduceTasks(10);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);


    flightCount = job.getConfiguration().getInt("fc", -1);
    flightDelay = job.getConfiguration().getInt("fd", -1);
    //here when I access fc, fd, I get back 5 & 5
    System.out.println("Final " + flightCount +" " + flightDelay+ " " + flightDelay/flightCount);
}

覆盖run()使用新的映射器和减速器org.apache.hadoop.mapreduceAPI。在这些方法中,您可以从每个映射器或缩减器发出累积的总和/计数。

此外,您还需要将减速器计数限制为 1,以便获得多个映射器生成的所有总和的全局总和。

请参阅下面的代码以更清楚地了解:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AggregationExample extends Configured implements Tool {

    /**
     * This is Mapper.
     * 
     */
    public static class MapJob extends Mapper<LongWritable, Text, Text, Text> {

        private Text outputKey = new Text();
        private Text outputValue = new Text();
        private double sum;

        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            try {
                // say that you need to sum up the value part
                sum+= Double.valueOf(value);
        }

        @Override
        public void run(Context context) throws IOException, InterruptedException {

            setup(context);
            while (context.nextKeyValue()) {
                map(context.getCurrentKey(), context.getCurrentValue(), context);
            }

            // emit out the sum per mapper
            outputKey.set(sum);
            context.write(outputKey, outputValue);// Notice that the outputValue is empty
            cleanup(context);

        }
    }

    /**
     * This is Reducer.
     * 
     */
    public static class ReduceJob extends Reducer<Text, Text, Text, Text> {

        private Text outputKey = new Text();
        private Text outputValue = new Text();
        private double sum;

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
                InterruptedException {


            // summation of values from each mapper
            sum += Double.valueOf(key.toString());

        }

        @Override
        public void run(Context context) throws IOException, InterruptedException {

            setup(context);
            while (context.nextKey()) {
                reduce(context.getCurrentKey(), context.getValues(), context);
            }

            // emit out the global sums
            outputKey.set(sum);
            context.write(outputKey, outputValue);
            cleanup(context);
        }
    }

    @Override
    public int run(String[] args) throws Exception {

        try {
            Configuration conf = getConf();

            // output key and value separator is empty as in final output only
            // key is emitted and value is empty
            conf.set("mapred.textoutputformat.separator", "");

            // Configuring mapred to have just one reducer as we need to find
            // single sum values from all the inputs
            conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
            conf.setInt("mapred.reduce.tasks", 1);

            Job job = new Job(conf);

            job.setJarByClass(AggregationExample.class);
            job.setJobName("Aggregation Example");

            job.setMapperClass(MapJob.class);
            job.setReducerClass(ReduceJob.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            FileInputFormat.setInputPaths(job, args[0]);
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            boolean success = job.waitForCompletion(true);
            return success ? 0 : 1;
        } catch (Exception e) {
            e.printStackTrace();
            return 1;
        }

    }

    public static void main(String[] args) throws Exception {

        if (args.length < 2) {
            System.out
                    .println("Usage: AggregationExample <comma sparated list of input directories> <output dir>");
            System.exit(-1);
        }

        int result = ToolRunner.run(new AggregationExample(), args);
        System.exit(result);
    }

}

您可以很好地将这种方法映射到您的问题。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

master和reduce之间共享数据 的相关文章

  • 如何在有或没有 Pig 的情况下使用 Cassandra 的 Map Reduce?

    有人可以解释 MapReduce 如何与 Cassandra 6 配合使用吗 我已经阅读了字数统计示例 但我不太明白 Cassandra 端与 客户端 端发生的情况 https svn apache org repos asf cassan
  • CouchDB 视图:MapReduce 中可以接受多少处理?

    我一直在尝试使用 CouchDB 进行 MapReduce 一些示例显示了映射归约函数中可能存在的一些繁重逻辑 在一种特殊情况下 他们在映射内执行 for 循环 在发出您选择的文档之前 MapReduce 是否会在每个可能的文档上运行 如果
  • Cloudera 5.1下作业在LocalJobRunner中保持运行

    需要一些快速帮助 我们的作业在 MapR 下运行良好 但是当我们在 Cloudera 5 1 上启动相同的作业时 它继续以本地模式运行 我确信这是某种配置问题 它是哪个配置设置 14 08 22 12 16 58 INFO mapreduc
  • 在spark中设置textinputformat.record.delimiter

    在 Spark 中 可以设置一些 hadoop 配置设置 例如 System setProperty spark hadoop dfs replication 1 这有效 复制因子设置为 1 假设是这种情况 我认为这种模式 在常规 hado
  • 如何用hadoop实现自连接/叉积?

    对成对的项目进行评估是常见的任务 示例 重复数据删除 协同过滤 相似项目等 这基本上是具有相同数据源的自连接或叉积 要进行自连接 您可以遵循 减少端连接 模式 映射器将连接 外键作为键发出 将记录作为值发出 因此 假设我们想要对以下数据的
  • couchdb 视图使用另一个视图?

    我对 couchdb 中的视图有疑问 目前 我有许多视图 例如 view A view B view Z 对于每个视图 它们包含相同范围的键但具有不同的值 IE view A key key 1 value 10 key key 2 val
  • 流数据和 Hadoop? (不是 Hadoop 流)

    我想使用 MapReduce 方法分析连续的数据流 通过 HTTP 访问 因此我一直在研究 Apache Hadoop 不幸的是 Hadoop 似乎希望以固定大小的输入文件开始作业 而不是在新数据到达时将其传递给消费者 事实确实如此 还是我
  • 仅使用一个映射器的 Hadoop gzip 输入文件[重复]

    这个问题在这里已经有答案了 可能的重复 为什么 hadoop 不能分割一个大文本文件 然后使用 gzip 压缩分割的内容 https stackoverflow com questions 6511255 why cant hadoop s
  • Hadoop 性能

    我安装了hadoop 1 0 0并尝试了字数统计示例 单节点集群 完成时间为 2 分 48 秒 然后我尝试了标准的 Linux 字数统计程序 该程序在同一组 180 kB 数据 上运行只需 10 毫秒 是我做错了什么 还是 Hadoop 非
  • MapReduce 排序和洗牌如何工作?

    我正在使用 yelps MRJob 库来实现映射缩减功能 我知道 MapReduce 有一个内部排序和洗牌算法 它根据键对值进行排序 所以如果我在地图阶段后得到以下结果 1 24 4 25 3 26 我知道排序和洗牌阶段将产生以下输出 1
  • 使用 Hadoop MapReduce 的计算语言学项目构想

    我需要做一个关于计算语言学课程的项目 是否有任何有趣的 语言 问题 其数据密集程度足以使用 Hadoop MapReduce 来解决 解决方案或算法应尝试分析并提供 语言 领域的一些见解 但是它应该适用于大型数据集 以便我可以使用 hado
  • 如何在hadoop/map reduce中创建固定行数的输出文件?

    假设我们有 N 个具有不同行数的输入文件 我们需要生成输出文件 使得每个输出文件恰好有 K 行 最后一个输出文件可以有 是否可以使用单个 MR 作业来完成此操作 我们应该打开文件以便在reducer中显式写入 输出中的记录应该被打乱 tha
  • mongodb 聚合随机化(shuffle)结果

    我正在浏览一堆 mongo 文档 但找不到洗牌或随机化结果内容的可能性 有没有 特别是对于聚合框架本身来说 实际上并没有任何本地方法 因为还没有可用的运算符来执行诸如生成随机数之类的操作 因此 无论您可能投射一个字段进行排序的任何匹配 都不
  • 从 Eclipse 在 AWS-EMR 上运行 MapReduce 作业

    我在 Eclipse 中有 WordCount MapReduce 示例 我将其导出到 Jar 然后将其复制到 S3 然后我在 AWS EMR 上运行它 成功地 然后 我读到了这篇文章 http docs aws amazon com El
  • 在 Amazon EMR 上使用 java 中的 hbase 时遇到问题

    因此 我尝试使用作为 MapReduce 步骤启动的自定义 jar 来查询 Amazon ec2 上的 hbase 集群 我的 jar 在地图函数内 我这样调用 Hbase public void map Text key BytesWri
  • mongodb - 检索数组子集

    看似简单的任务对我来说是一个挑战 我有以下 mongodb 结构 services TCP80 data status 1 delay 3 87 ts 1308056460 status 1 delay 2 83 ts 1308058080
  • RavenDB:为什么我会在此多重映射/归约索引中获得字段空值?

    受到 Ayende 文章的启发https ayende com blog 89089 ravendb multi maps reduce indexes https ayende com blog 89089 ravendb multi m
  • 适用于 Hadoop 的 DynamoDB 输入格式

    我必须使用 Hadoop mapreduce 处理保留在 Amazon Dynamodb 中的一些数据 我在互联网上搜索 Dynamo DB 的 Hadoop InputFormat 但找不到它 我对 Dynamo DB 不熟悉 所以我猜测
  • Sqoop - 绑定到 YARN 队列

    因此 使用 MapReduce v2 您可以使用绑定到某些 YARN 队列来管理资源和优先级 基本上通过使用 hadoop jar xyz jar D mapreduce job queuename QUEUE1 input output
  • MongoDB - 使用聚合框架或 MapReduce 来匹配文档中的字符串数组(配置文件匹配)

    我正在构建一个可以比作约会应用程序的应用程序 我有一些结构如下的文档 db profiles find pretty id 1 firstName John lastName Smith fieldValues favouriteColou

随机推荐

  • Python lambda函数下划线冒号语法解释?

    在以下 Python 脚本中 其中 aDict 是字典 0 在 lambda 函数中起什么作用 sorted aDict items key lambda 0 让我们把它分开 1 假设你有一个字典 di di one 1 two 2 thr
  • 如何判断会话是否处于活动状态? [复制]

    这个问题在这里已经有答案了 根据请求 您可以通过几种不同的方式来判断会话是否已启动 例如 isSessionActive session id Or isSessionActive defined SID 然而 如果您启动一个会话然后关闭它
  • Xamarin WKWebView 和 Cookie

    我有一个 Xamarin Forms 应用程序 它使用 cookie 来跟踪登录状态并同时使用 HTTPRequest 和 Webview 因此两者都需要共享 cookie 对于 UIWebView 这些 cookie 是共享的 无需我进行
  • cpuid命令显示信息的问题

    的信息llc缓存显示使用cpuid command在Linux上是 cache 3 cache type unified cache 3 cache level 0x3 3 self initializing cache level tru
  • Android NDK 中缺少 std::wstring 支持的解决方案?

    我有一个游戏 它在数千个地方使用 std wstring 作为其基本字符串类型 并使用 wchar t 及其函数进行操作 wcsicmp wcslen vsprintf 等 问题是 R5c 撰写本文时的最新 ndk 不支持 wstring
  • 值和引用类型混淆

    我在每个关于这个主题的网站上都读过 并且在过去几天里阅读了 Jon Skeets 的文章参考文献和值 and 参数传递 我了解这两种类型的概念 它们代表什么以及值和参考参数的区别是什么是另一个规范 我了解它的工作原理以及如何使用它 但我不明
  • Android 清单将支持 Galaxy S4、HTC One

    我正在尝试创建 Android 清单谷歌游戏将显示三星 Galaxy S4 HTC One 等 因为当前的设备未在兼容设备列表中列出 我确实搜索了 Stack Overflow 但这里的所有建议都没有帮助我 以下是我用于我们的应用程序的清单
  • AVAudioPlayer Swift 3 不播放声音[重复]

    这个问题在这里已经有答案了 我将 AVFoundation framework 添加到我的项目中 在我的项目导航器中 我添加了文件 Horn mp3 这是一个 1 秒的声音 当按下按钮 带有喇叭图像 时 应该播放声音 标签也应该更改其文本
  • 使用相对路径复制文件

    我想将某种类型的所有文件从某个子目录及其相对路径从该子目录复制到另一个目录 并且相对路径保持不变 例如 源子目录 c temp sourcedirectory 源文件 c temp sourcedirectory tonymontana f
  • “尝试附加自动命名数据库”错误

    我正在 Windows XP SP2 Professional 上使用 C Visual Studio 2010 制作桌面应用程序 如果我直接从发布的文件夹运行该应用程序而不运行安装程序或从调试文件夹运行它或只是调试该应用程序 它不会给出此
  • 使用 json 数据异步填充 AngularJS ngTable

    我正在尝试构建一个 AngularJS 应用程序 它输出一个用 json 填充的 HTML 表 该 表的 HTML 位于这个问题的底部 我在用着application json我从服务器检索的数据 当我做一个简单的curl http myu
  • 创建ipa以分发给客户端

    我想知道当我们可以轻松地压缩构建的应用程序并分发压缩的 app 文件时 创建 ipa 并将其分发给客户端进行测试有什么用 创建有什么好处 ipa 是不是创造了ipa不需要移动配置文件 请澄清我的疑问 我会告诉你我的经验的一个优点 当您向客户
  • 如何自定义 JSONSchema 的错误消息?

    有没有办法根据给定的条件提供自定义错误消息 我在用着https github com networknt json schema validator 版本1 0 43 这是我的 JSON 架构 id https configurations
  • 异常过滤器之外的异常处理?

    使用 Asp net WebApi RC 如何捕获未捕获的错误异常过滤器 or Application Error 在全局 asax 中 有了这两个条件 似乎还有一类例外情况尚未涵盖 例如 ApiControllerActionSelect
  • 如何将 1970 年以来的秒数转换为 C++ 中的 DateTime?

    如何将 1970 年以来的秒数转换为 C 中的 DateTime 我得到的时间格式如下 1296575549 573352 冒号的左侧部分以秒为单位 右侧部分以微秒为单位 请帮忙 Thanks Syd 尝试使用 gmtime 参见http
  • hcmap 与 R Shiny 中的本地文件 javascript

    我正在shiny ui和服务器 上开发一个应用程序 我想用Highchart包的hcmap函数实现一个地图 但是 在实现地图时 我发现函数 hcmap 与站点存在依赖关系 https code highcharts com mapdata
  • 动态方法调度

    网上有很多关于动态调度的信息 我感觉自己像个胆小鬼 因为我无法实现它 请帮我 这就是我正在尝试做的事情 ClassA public void createReq public String postReq ClassB Test publi
  • 将动态参数传递给 SQL Server 2008 中的存储过程

    我有这个过程执行另一个由参数及其参数传递的过程datefrom and dateto CREATE procedure dbo execute proc procs varchar 200 pdatefrom date pdateto da
  • 启用 Google API OAuth 范围

    对于使用 Google Apps 脚本创建的应用程序之一 会在我的应用程序中自动添加一些范围 如下所示 https www googleapis com auth drive https www googleapis com auth sc
  • master和reduce之间共享数据

    我需要使用所有reduce 任务的结果执行聚合 基本上 reduce 任务会找到总和 计数以及一个值 我需要将所有总和和计数相加并找到最终平均值 我尝试使用conf setInt在减少 但是当我尝试从主函数访问它时它失败了 class Ma