Hadoop3.x 之 MapReduce 框架原理(月薪过万 第九章下)

2023-10-27

一、MapTask工作机制

流程图源码
在这里插入图片描述

(1)Read 阶段:MapTask 通过 InputFormat 获得的 RecordReader,从输入 InputSplit 中解析出一个个 key/value。

(2)Map 阶段:该节点主要是将解析出的 key/value 交给用户编写 map()函数处理,并产生一系列新的 key/value。

(3)Collect 收集阶段:在用户编写 map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的 key/value 分区(调用Partitioner),并写入一个环形内存缓冲区中。

(4)Spill 阶段:即“溢写”,当环形缓冲区满后,MapReduce 会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。

溢写阶段详情:
步骤 1:
利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition 进行排序,然后按照 key 进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照 key 有序。
步骤 2:
按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件 output/spillN.out(N 表示当前溢写次数)中。如果用户设置了 Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。
步骤 3:
将分区数据的元信息写到内存索引数据结构 SpillRecord 中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过 1MB,则将内存索引写到文件 output/spillN.out.index 中。

(5)Merge 阶段:当所有数据处理完成后,MapTask 对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
当所有数据处理完后,MapTask 会将所有临时文件合并成一个大文件,并保存到文件output/file.out 中,同时生成相应的索引文件 output/file.out.index。

在进行文件合并过程中,MapTask 以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并 mapreduce.task.io.sort.factor(默认 10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。

让每个 MapTask 最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销

二、ReduceTask 工作机制

在这里插入图片描述

1、Copy 阶段:ReduceTask 从各个 MapTask 上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

2、Sort 阶段:在远程拷贝数据的同时,ReduceTask 启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。按照 MapReduce 语义,用户编写 reduce()函数输入数据是按 key 进行聚集的一组数据。为了将 key 相同的数据聚在一起,Hadoop 采用了基于排序的策略。由于各个 MapTask 已经实现对自己的处理结果进行了局部排序,因此,ReduceTask 只需对所有数据进行一次归并排序即可。

3、Reduce 阶段:reduce()函数将计算结果写到 HDFS 上。

三、ReduceTask 并行度决定机制

回顾:MapTask 并行度由切片个数决定,切片个数由输入文件和切片规则决定。
思考:ReduceTask 并行度由谁决定?

  1. 设置 ReduceTask 并行度(个数)
    ReduceTask 的并行度同样影响整个 Job 的执行并发度和执行效率,但与 MapTask 的并发数由切片数决定不同,ReduceTask 数量的决定是可以直接手动设置:
    // 默认值是 1,手动设置为 4
    job.setNumReduceTasks(4);

  2. 实验:测试 ReduceTask 多少合适
    (1)实验环境:1 个 Master 节点,16 个 Slave 节点:CPU:8GHZ,内存: 2G
    (2)实验结论:
    在这里插入图片描述

  3. 注意事项

1、ReduceTask=0,表示没有Reduce阶段,输出文件个数和Map个数一致。

2、ReduceTask默认值就是1,所以输出文件个数为一个。

3、如果数据分布不均匀,就有可能在Reduce阶段产生数据倾斜

4、ReduceTask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个ReduceTask。

5、具体多少个ReduceTask,需要根据集群性能而定。

6、如果分区数不是1,但是ReduceTask为1,是否执行分区过程。答案是:不执行分区过程。因为在MapTask的源码中,执行分区的前提是先判断ReduceNum个数是否大于1。不大于1肯定不执行。

四、MapTask 源码解析流程

在这里插入图片描述

五、ReduceTask 源码解析流程

=================== ReduceTask ===================

    if (isMapOrReduce()) //reduceTask324 行,提前打断点
    initialize() // reduceTask333 行,进入
    init(shuffleContext); // reduceTask375 行,走到这需要先给下面的打断点
 totalMaps = job.getNumMapTasks(); // ShuffleSchedulerImpl 第 120 行,提前打断点
         merger = createMergeManager(context); //合并方法,Shuffle 第 80 行 // MergeManagerImpl 第 232 235 行,提前打断点
         this.inMemoryMerger = createInMemoryMerger(); //内存合并
         this.onDiskMerger = new OnDiskMerger(this); //磁盘合并
         rIter = shuffleConsumerPlugin.run();
         eventFetcher.start(); //开始抓取数据,Shuffle 第 107 行,提前打断点
         eventFetcher.shutDown(); //抓取结束,Shuffle 第 141 行,提前打断点
         copyPhase.complete(); //copy 阶段完成,Shuffle 第 151 行
         taskStatus.setPhase(TaskStatus.Phase.SORT); //开始排序阶段,Shuffle 第 152 行
         sortPhase.complete(); //排序阶段完成,即将进入 reduce 阶段 reduceTask382 行
         reduce(); //reduce 阶段调用的就是我们自定义的 reduce 方法,会被调用多次
         cleanup(context); //reduce 完成之前,会最后调用一次 Reducer 里面的 cleanup 方法

六、Reduce Join

1)原理

  1. Map 端的主要工作:为来自不同表或文件的 key/value 对,打标签以区别不同来源的记录。然后用连接字段作为 key,其余部分和新加的标志作为 value,最后进行输出。

  2. Reduce 端的主要工作:在 Reduce 端以连接字段作为 key 的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在 Map 阶段已经打标志)分开,最后进行合并就 ok 了

2)Reduce Join案例实操

需求①

在这里插入图片描述

需求分析 ②

通过将关联条件作为Map输出的key,将两表满足Join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联

在这里插入图片描述

3)操作代码

TableBean

package org.example.reducejoin;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @ClassName TableBean
 * @Author 小坏
 * @Date 2021/7/29、16:56
 * @Version 1.0
 * <p>
 * Writable 序列化
 */
public class TableBean implements Writable {

    /**
     * order表
     * id   pid     amount
     * <p>
     * pd表
     * pid      pname
     */

    private String id; // 订单id
    private String pid; // 产品id
    private int amount; // 产品数量
    private String pname; // 产品名称
    private String flag; // 表的标记

    public TableBean() {
    }


    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getPid() {
        return pid;
    }

    public void setPid(String pid) {
        this.pid = pid;
    }

    public int getAmount() {
        return amount;
    }

    public void setAmount(int amount) {
        this.amount = amount;
    }

    public String getPname() {
        return pname;
    }

    public void setPname(String pname) {
        this.pname = pname;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }

    /**
     * 序列化 的顺序必须和反序列化一致
     *
     * @param out
     * @throws IOException
     */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(id); //string 使用writeUTF
        out.writeUTF(pid); //string 使用writeUTF
        out.writeInt(amount); //string 使用writeUTF
        out.writeUTF(pname); //string 使用writeUTF
        out.writeUTF(flag); //string 使用writeUTF
    }

    /**
     * 反序列化
     *
     * @param in
     * @throws IOException
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        this.id = in.readUTF();
        this.pid = in.readUTF();
        this.amount = in.readInt();
        this.pname = in.readUTF();
        this.flag = in.readUTF();
    }


    @Override
    public String toString() {
        return id + "\t" + pname + "\t" + amount;
    }
}

TableMapper

package org.example.reducejoin;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

/**
 * @ClassName TableMapper
 * @Author 小坏
 * @Date 2021/7/29、17:09
 * @Version 1.0
 * <p>
 * LongWritable: 偏移量,
 * Text:一行的内容
 */
public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {

    private String fileName;

    private Text outK = new Text();
    private TableBean outV = new TableBean();

    /**
     * 初始化的时候想获取到文件的名称
     *
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //初始化 order pd 只调用一次获取一个名称就够了
        FileSplit inputSplit = (FileSplit) context.getInputSplit(); //得到一个切片信息
        //ctrl+alt +f 提取全局
        fileName = inputSplit.getPath().getName(); //获取文件名称
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString(); //获取一行

        //判断是哪个文件
        if (fileName.contains("order")) {
            //处理订单表
            String[] split = line.split("\t");

            /**
             *  order
             * 1001	01	1
             * 1002	02	2
             * 1003	03	3
             * 1004	01	4
             * 1005	02	5
             * 1006	03	6
             */

            //封装K V
            outK.set(split[1]);
            outV.setId(split[0]);
            outV.setPid(split[1]);
            outV.setAmount(Integer.parseInt(split[2]));
            outV.setPname("");
            outV.setFlag("order");

        } else {
            //处理商品表

            /**
             *  bd 表
             *  01	小米
             *  02	华为
             *  03	格力
             */
            String[] split = line.split("\t");
            outK.set(split[0]);
            outV.setId("");
            outV.setPid(split[0]);
            outV.setAmount(0);
            outV.setPname(split[1]);
            outV.setFlag("pd");
        }

        //写出
        context.write(outK, outV);

    }
}

TableReducer

package org.example.reducejoin;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;

/**
 * @ClassName TableReducer
 * @Author 小坏
 * @Date 2021/7/29、18:32
 * @Version 1.0
 * <p>
 * reducer k v就是 map输出的k v
 * <p>
 * 最终打印控制台 打印 TableBean的toString方法、V不要设置为NullWritable
 * TableBean, NullWritable
 */
public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {

        ArrayList<TableBean> orderBeans = new ArrayList<>();
        TableBean pdBean = new TableBean();

        //hadoop循环添加对象存在问题、循环一个覆盖一个地址、hadoop框架里面改这个迭代器、
        //解决:创建个对象循环一次赋值一个再存
        for (TableBean value : values) {

            /**
             * id相同 下面可以直接设置名称
             * 01 1001 1 order
             * 01 1004 4 order
             * 01 小米 pd
             */
            if ("order".equals(value.getFlag())) {
                //订单表
                TableBean tableBean = new TableBean();
                try {
                    BeanUtils.copyProperties(tableBean, value);
                    System.out.println("tableBean........." + tableBean);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
                orderBeans.add(tableBean);
            } else {
                /**
                 * pid 产品名称
                 * 01 小米
                 * 02 华为
                 * 03 格力
                 */
                //商品表、只是一行、过来直接copy过去就可以了
                try {
                    BeanUtils.copyProperties(pdBean, value);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }
        }

        //循环遍历orderBeans ,赋值pdname
        for (TableBean orderBean : orderBeans) {
            System.out.println(orderBean);
            orderBean.setPname(pdBean.getPname());
            context.write(orderBean, NullWritable.get());
        }

    }
}

TableDriver

package org.example.reducejoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @ClassName TableDriver
 * @Author 小坏
 * @Date 2021/7/29、18:57
 * @Version 1.0
 */
public class TableDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(TableDriver.class);
        job.setMapperClass(TableMapper.class);
        job.setReducerClass(TableReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(TableBean.class);

        job.setOutputKeyClass(TableBean.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path("D:\\input\\inputtable"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\output2"));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

4) 测试

运行程序查看结果
1004 小米 4
1001 小米 1
1005 华为 5
1002 华为 2
1006 格力 6
1003 格力 3

5) 总结

缺点:这种方式中,合并的操作是在 Reduce 阶段完成,Reduce 端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在 Reduce 阶段极易产生数据倾斜。

解决方案:Map 端实现数据合并。

七、Map Join

1、使用场景

Map Join 适用于一张表十分小、一张表很大的场景。

综合六的实例他两个做了对比、一个在Reduce 阶段处理、一个在Map阶段处理

2、优点

思考:在 Reduce 端处理过多的表,非常容易产生数据倾斜。怎么办?
在 Map 端缓存多张表,提前处理业务逻辑,这样增加 Map 端业务,减少 Reduce 端数据的压力,尽可能的减少数据倾斜。

3、具体办法:采用 DistributedCache

① 在 Mapper 的 setup 阶段,将文件读取到缓存集合中。

② 在 Driver 驱动类中加载缓存。

//缓存普通文件到 Task 运行节点。
job.addCacheFile(new URI("file:///e:/cache/pd.txt"));
//如果是集群运行,需要设置 HDFS 路径
job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));

4、Map Join 案例实操

① 、需求

在这里插入图片描述

② 需求分析

MapJoin 适用于关联表中有小表的情形。

Map端表合并案例分析(Distributedcache)

在这里插入图片描述

5、代码实例

MapJoinMapper

package org.example.mapjoin;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;

/**
 * @ClassName MapJoinMapper
 * @Author 小坏
 * @Date 2021/8/1、11:34
 * @Version 1.0
 */
public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    private HashMap<String, String> map = new HashMap<>();
    private Text outK = new Text();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //获取缓存文件,并把文件内容封装到集合 pd.txt
        URI[] cacheFiles = context.getCacheFiles();

        FileSystem fs = FileSystem.get(context.getConfiguration());
        FSDataInputStream fis = fs.open(new Path(cacheFiles[0]));

        //从流中读数据
        BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));
        String line;
        //获取一行、判断这一行是否为空
        while (StringUtils.isNotEmpty(line = reader.readLine())) {
            String[] fields = line.split("\t");
            map.put(fields[0], fields[1]);
        }
        //关闭流
        IOUtils.closeStream(reader);
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //处理 order.txt
        String line = value.toString();
        String[] fields = line.split("\t");

        //获取订单id, 和订单数量
        //获取pid
        String pname = map.get(fields[1]);

        outK.set(fields[0] + "\t" + pname + "\t" + fields[2]);
        context.write(outK, NullWritable.get());
    }
}

MapJoinDriver

核心就是设置缓存数据、没redure 阶段

package org.example.mapjoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

/**
 * @ClassName MapJoinDriver
 * @Author 小坏
 * @Date 2021/8/1、11:32
 * @Version 1.0
 */
public class MapJoinDriver {
    public static void main(String[] args) throws IOException,
            URISyntaxException, ClassNotFoundException, InterruptedException {
        // 1 获取 job 信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 2 设置加载 jar 包路径
        job.setJarByClass(MapJoinDriver.class);
        // 3 关联 mapper
        job.setMapperClass(MapJoinMapper.class);
        // 4 设置 Map 输出 KV 类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        // 5 设置最终输出 KV 类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        // 加载缓存数据
        job.addCacheFile(new URI("file:///D:/input/tablecache/pd.txt"));
        // Map 端 Join 的逻辑不需要 Reduce 阶段,设置 reduceTask 数量为 0
        job.setNumReduceTasks(0);
        // 6 设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path("D:\\input\\inputtable2"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\output777"));
        // 7 提交
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

八、数据清洗(ETL)

1、数据清洗解释

“ETL,是英文 Extract-Transform-Load 的缩写,用来描述将数据从来源端经过抽取(Extract)、转换(Transform)、加载(Load)至目的端的过程。ETL 一词较常用在数据仓库,但其对象并不限于数据仓库

在运行核心业务 MapReduce 程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行 Mapper 程序,不需要运行 Reduce 程序。

2、需求

在这里插入图片描述

3、需求分析

需要在 Map 阶段对输入的数据根据规则进行过滤清洗。

4、实现代码

编写 WebLogMapper 类

package org.example.etl;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @ClassName WebLogMapper
 * @Author 小坏
 * @Date 2021/8/2、10:48
 * @Version 1.0
 */
public class WebLogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //1获取一行
        String line = value.toString();

        //2ETL
        boolean result = parseLog(line, context);
        if (!result) {
            return;
        }

        //3写出
        context.write(value, NullWritable.get());

    }

    private boolean parseLog(String line, Context context) {
        //58.215.204.118 - - [18/Sep/2013:06:51:41 +0000] "-" 400 0 "-" "-"
        //切割
        String[] split = line.split(" ");

        //判断日志长度是否大于十一
        if (split.length > 11) {
            return true;
        } else {
            return false;
        }
    }
}

WebLogDriver

package org.example.etl;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.example.outputformat.LogDriver;

/**
 * @ClassName WebLogDriver
 * 数据清洗ETL
 * @Author 小坏
 * @Date 2021/8/2、10:58
 * @Version 1.0
 */
public class WebLogDriver {

    public static void main(String[] args) throws Exception {
        // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
        args = new String[]{"D:/input/inputlog", "D:/hadoop/output11111111"};
         // 1 获取 job 信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
          // 2 加载 jar 包
        job.setJarByClass(WebLogDriver.class);
          // 3 关联 map
        job.setMapperClass(WebLogMapper.class);
        // 4 设置最终输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
         // 设置 reducetask 个数为 0
        job.setNumReduceTasks(0);
        // 5 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 6 提交
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Hadoop3.x 之 MapReduce 框架原理(月薪过万 第九章下) 的相关文章

  • 使用 Hadoop MapReduce 的计算语言学项目构想

    我需要做一个关于计算语言学课程的项目 是否有任何有趣的 语言 问题 其数据密集程度足以使用 Hadoop MapReduce 来解决 解决方案或算法应尝试分析并提供 语言 领域的一些见解 但是它应该适用于大型数据集 以便我可以使用 hado
  • Amazon MapReduce 日志分析最佳实践

    我正在解析 Apache Nginx Darwin 视频流服务器 生成的访问日志 并按日期 引用者 用户代理聚合每个交付文件的统计信息 每小时都会生成大量日志 而且这个数字在不久的将来可能会急剧增加 因此通过 Amazon Elastic
  • 远程执行hadoop作业时出现异常

    我正在尝试在远程 hadoop 集群上执行 Hadoop 作业 下面是我的代码 Configuration conf new Configuration conf set fs default name hdfs server 9000 c
  • 使用 python 从 HDFS 获取文件名列表

    这里是 Hadoop 菜鸟 我搜索了一些有关 hadoop 和 python 入门的教程 但没有取得太大成功 我还不需要使用映射器和缩减器进行任何工作 但这更多是一个访问问题 作为Hadoop集群的一部分 HDFS 上有一堆 dat 文件
  • 无法使用 PDI 步骤连接到 HDFS

    我已经配置成功了Hadoop 2 4 in an Ubuntu 14 04 虚拟机 from a 视窗8系统 Hadoop 安装工作绝对正常 而且我还可以从 Windows 浏览器查看 Namenode 附图如下 所以 我的主机名是 ubu
  • 将 hadoop fs 路径转换为 ​​EMR 上的 hdfs:// 路径

    我想知道如何将数据从 EMR 集群的 HDFS 文件系统移动到 S3 存储桶 我认识到我可以直接在 Spark 中写入 S3 但原则上 之后执行它也应该很简单 到目前为止 我还没有发现在实践中这是正确的 AWS 文档建议s3 dist cp
  • java.io.IOException:无法获取 LocationBlock 的块长度

    我正在使用 HDP 2 1 对于集群 我遇到了以下异常 并且 MapReduce 作业因此失败 实际上 我们定期使用 Flume 版本的数据创建表 1 4 我检查了映射器尝试读取的数据文件 但我找不到任何内容 2014 11 28 00 0
  • HDP 3.1.0.0-78 升级后无法使用 ResourceManager UI 终止 YARN 应用程序

    我最近将 HDP 从 2 6 5 升级到 3 1 0 它运行 YARN 3 1 0 并且我无法再使用旧的 8088 cluster apps 或新的 8088 从 YARN ResourceManager UI 终止应用程序 ui2 ind
  • 在 Hive 中分解一行 XML 数据

    我们将 XML 数据作为名为 XML 的单个字符串列加载到 Hadoop 中 我们正在尝试检索数据级别 并将其标准化或分解为单行进行处理 你知道 就像表格一样 已经尝试过分解功能 但没有得到我们想要的 示例 XML
  • 使用 Hadoop 映射两个数据集

    假设我有两个键值数据集 数据集A和B 我们称它们为数据集A和B 我想用 B 组的数据更新 A 组中的所有数据 其中两者在键上匹配 因为我要处理如此大量的数据 所以我使用 Hadoop 进行 MapReduce 我担心的是 为了在 A 和 B
  • Couchbase/hadoop 连接器:sqoop 作业失败“找到接口 org.apache.hadoop.mapreduce.TaskAttemptContext,但需要类”

    我的配置 CouchBase服务器2 0 Sqoop 1 4 2 针对hadoop版本2 0 0编译 堆栈Hadoop CDH4 1 2 我想使用 CouchBase Hadoop 连接器 http www couchbase com de
  • hadoop2.2.0追加文件发生AlreadyBeingCreatedException

    我遇到了一个关于hadoop2 2 0追加操作的问题 我通过 HDFS java API 将一些字节附加到 hdfs 文件 首先 如果在附加操作之前文件不存在 我将创建目标文件 代码如下 String fileUri hdfs hadoop
  • 这个 Java 语法是什么意思? [复制]

    这个问题在这里已经有答案了 可能的重复 java中的是什么意思 https stackoverflow com questions 12649572 what does the type in java mean 在下面的代码中 Itera
  • Hive“添加分区”并发

    我们有一个外部 Hive 表 用于处理原始日志文件数据 这些文件每小时一次 并按日期和源主机名分区 目前 我们正在使用简单的 python 脚本导入文件 这些脚本每小时触发几次 该脚本根据需要在 HDFS 上创建子文件夹 从临时本地存储复制
  • MiniDFSCluster UnsatisfiedLinkError org.apache.hadoop.io.nativeio.NativeIO$Windows.access0

    做时 new MiniDFSCluster Builder config build 我得到这个异常 java lang UnsatisfiedLinkError org apache hadoop io nativeio NativeIO
  • 纱线上的火花,连接到资源管理器 /0.0.0.0:8032

    我正在我的开发机器 Mac 上编写 Spark 程序 hadoop的版本是2 6 spark的版本是1 6 2 hadoop集群有3个节点 当然都在linux机器上 我在idea IDE中以spark独立模式运行spark程序 它运行成功
  • 将数据从 oracle 移动到 HDFS,处理并从 HDFS 移动到 Teradata

    我的要求是 将数据从 Oracle 移至 HDFS 处理HDFS上的数据 将处理后的数据移至 Teradata 还需要每 15 分钟执行一次整个处理 源数据量可能接近50GB 处理后的数据也可能相同 在网上搜索了很多之后 我发现 PRARO
  • 适用于 Hadoop 的 DynamoDB 输入格式

    我必须使用 Hadoop mapreduce 处理保留在 Amazon Dynamodb 中的一些数据 我在互联网上搜索 Dynamo DB 的 Hadoop InputFormat 但找不到它 我对 Dynamo DB 不熟悉 所以我猜测
  • 猪的组连接等效吗?

    试图在 Pig 上完成这个任务 寻找 MySQL 的 group concat 等效项 例如 在我的表中 我有以下内容 3fields userid clickcount pagenumber 155 2 12 155 3 133 155
  • hive - 在值范围之间将一行拆分为多行

    我在下面有一张表 想按从开始列到结束列的范围拆分行 即 id 和 value 应该对开始和结束之间的每个值重复 包括两者 id value start end 1 5 1 4 2 8 5 9 所需输出 id value current

随机推荐

  • Iot开发

    OTA升级 protobuf 物联网协议 Infiuxdb TCP协议 传输层通信协议 socket 对协议实现了一层抽象与封装 通用是用IP地址 端口号 描述通信双方 server服务的提供方 client服务的使用方 http协议 应用
  • Git 6. 版本切换(重点)

    使用命令 git reset hard 版本号 查看一下当前的版本日志 如图 工作区中的hello py文件内容如下 版本2 Git版本切换版底层移动的其实是指针 首先我们来开一下保存指正的文件 在工作区的的 git文件夹下打开HEAD文件
  • linux里面查看llvm的版本,linux llvm安装

    1 1 Numba的约5分钟指南 Numba是Python的即时编译器 它最适用于使用NumPy数组和函数以及循环的代码 使用Numba的最常用方法是通过其装饰器集合 可以应用于您的函数来指示Numba编译它们 当调用Numba修饰函数时
  • 【算法笔记】PAT B1001

    题目 卡拉兹 Callatz 猜想 对任何一个正整数 n 如果它是偶数 那么把它砍掉一半 如果它是奇数 那么把 3n 1 砍掉一半 这样一直反复砍下去 最后一定在某一步得到 n 1 卡拉兹在 1950 年的世界数学家大会上公布了这个猜想 传
  • 日志服务器修改web登录密码,centos7通过Web查看日志服务器

    实验需要 两台虚拟机 一台服务器 一台主机 服务器的IP地址为192 168 142 100 主机的地址为192 168 142 200 实验步骤 配置rsyslog 服务 配置lamp 安装loganalyzer 具体操作 1 给服务器配
  • 使用Nuget管理第三方库吧

    前言 用visual studio开发的童鞋们应该都有这样的感受 经常在copy别人的项目后 出现找不到xxx h 找不到xxx lib 找不到balabala 甚至还有 模块计算机类型 x86 与目标计算机类型 x64 冲突 模块计算机类
  • Nacos 配置变更http异步通知集群其他服务,并不是长链接

    Nacos配置中心更改变更配置后 异步http通知其他集群更新配置 同时当客户端 ClientWorker 心跳的时候检查配置是否改变checkListenerMd5 改变了就发布监听事件RefreshEvent执行监听器RefreshEv
  • excel单元格下拉选项怎么设置_EXCEL表格里设置好下拉选项,让老板更加青睐

    使用Excel录入数据的时候 如果需要录入大量相同数据 我们通常可以使用下拉列表来限定输入的数据 这样录入数据就很少发生错误了而且还能提高效率 下面来跟小编学习一下 如何在EXCEL中设置下拉选项吧 一 打开EXCEL软件后 我们进去EXC
  • t检验.医学统计实例详解

    t检验是医学统计学中常用的一种假设检验方法 用于比较两个样本均值是否有显著差异 它可以帮助医学研究者确定一个治疗方法或药物是否显著地改善了患者的症状或生理指标 在医学研究中 t检验常被用来 比较两个独立样本的均值 例如 比较一个治疗组和一个
  • proto 编译命令

    proto 编译命令 protoc I python out src test test proto
  • 深入理解Linux网络技术内幕——网络设备初始化

    概述 内核的初始化过程过程中 与网络相关的工作如下所示 内核引导时执行 start kernel start kernel结束之前会调用rest init rest init初始化 内核线程init 在Linux3 12中为kernel i
  • C语言第四章第2节用if语句实现选择结构学习导案

    课 题 4 2 用if语句实现选择结构 课时安排 2课时 课 型 新授 学 习目标 掌握if语句 if else语句 if else if else语句的一般形式 掌握if语句 if else语句 if else if else语句的执行过
  • 微信小程序云开发源码(垃圾分类源码)

    目录 微信小程序云开发源码 垃圾分类源码 小程序云数据库介绍 小程序界面 可搜索名称 大众垃圾分类 小程序体验 微信小程序源码 源码地址 https pan baidu com s 1U19Suzs3nZnMt5OHNGUahQ 提取码 z
  • Mysql表关系 连接查询

    表关联查询 文章目录 表关联查询 内连接 左连接 右连接 如果多个表存在一定关联关系 可以多表在一起进行查询操作 其实表的关联整理与外键约束之间并没有必然联系 但是基于外键约束设计的具有关联性的表往往会更多使用关联查询查找数据 简单多表查询
  • C++ default constructor 讨论

    豆瓣是个好地方 可以找到很多好书 最近翻到了Lippman的inside the c object model 今天看了关于默然构造函数部分 对这个东西有了新的理解 又找出c standard对照着看了看 首先看c standard 12
  • webview加载完成监听

    最近由于产品需要 一个页面上部分是一个WebView 下面是一些文字介绍 但是在赋值时 HTML网页加载会消耗一定时间 在其加载过程中 文字已经展示出来 给用户的体验很不好 所以我就想在webview加载成功结束后再给文字赋值 于是在网上搜
  • 云服务器Docker安装ElasticSearch却启动不了怎么办?

    下载镜像 docker pull elasticsearch 启动容器 docker run d name es p 9200 9200 p 9300 9300 e discovery type single node elasticsea
  • 用WinDbg断点调试FFmpeg

    本文主要讲解 WinDbg 调试器的使用 WinDbg 在 Windows 里面的地位 就跟 GDB 在 Linux 的地位一样 可以通过 微软的官方网站 下载 安装 WinDbg WinDbg 是比较轻量级的调试工具 在一些场景下比较实用
  • 联想服务器esxi虚拟化,企业服务器管理必备——VMware ESXI虚拟化服务器搭建

    现在企业虚拟化服务器使用越来越多 这是一篇VMware虚拟化服务器搭建教程 让我们开始吧 制作U盘安装ESXI镜像 1 使用Linux系统制作U盘启动镜像 UltraISO制作的镜像不能用 必须依赖Linux系统 安装镜像制作工具 yum
  • Hadoop3.x 之 MapReduce 框架原理(月薪过万 第九章下)

    Hadoop3 x 之 MapReduce 框架原理 一 MapTask工作机制 二 ReduceTask 工作机制 三 ReduceTask 并行度决定机制 四 MapTask 源码解析流程 五 ReduceTask 源码解析流程 六 R