hadoop的DFSOutputStream

2023-11-07

当我们用命令:

hadoop fs -copyFromLocal localfile hdfs://...

将本地文件复制到HDFS时,其背后的复制过程是怎样的?本地文件通过什么方式传输到datanode上的呢?

这里面很显然的是:
1、文件在多个电脑之间进行了传输(至少有2台电脑:本地电脑和一个datanode节点)。
2、如果文件超过一个block的大小(默认是64M),那么将一个文件分割成多个block是在哪里发生的?

带着这些疑问,我们来解读一下源代码。

一、找到“幕后英雄”

通过简单的跟踪,就会发现这一功能是由FileSystem类的copyFromLocalFile方法完成的。

继续跟踪,会发现拷贝其实是在2个文件系统中进行的,下面是FileUtil类的一段代码:

  /** Copy files between FileSystems. */
  public static boolean copy(FileSystem srcFS, Path src, 
                             FileSystem dstFS, Path dst, 
                             boolean deleteSource,
                             boolean overwrite,
                             Configuration conf) throws IOException {
      ... // 为了突出重要代码,这里省略了部分代码
      InputStream in=null;
      OutputStream out = null;
      try {
        in = srcFS.open(src);
        out = dstFS.create(dst, overwrite);
        IOUtils.copyBytes(in, out, conf, true);
      } catch (IOException e) {
        IOUtils.closeStream(out);
        IOUtils.closeStream(in);
        throw e;
      }
      ... 
  }

copyFromLocalFile的实质是将文件从LocalFileSystem复制到DistributedFileSystem

从上面的代码可以看出,复制的关键是:
1、获得本地文件系统的输入流(用来读取本地文件)
2、获得HDFS的输出流(用来向HDFS写入数据)
3、从第一流读取数据,写入第二个流。

从LocalFileSystem获取输入流很简单。问题是,如何获取DistributedFileSystem的输出流呢?

继续读代码,会发现:
DistributedFileSystem借助了DFSClient类,来实现客户端与HDFS之间文件的传输任务。

在DFSClient类中,创建流的是这一句:
OutputStream result = new DFSOutputStream(src, masked,
        overwrite, replication, blockSize, progress, buffersize,
        conf.getInt("io.bytes.per.checksum", 512));

所以,所有的奥秘就应该在类DFSOutputStream中了。


二、解读DFSOutputStream

DFSOutputStream负责将数据传输到HDFS中,既然数据是在本地读取的,又要保存在另外一台机器(datanode)上,所以这里面一定会涉及到Socket。

通过阅读DFSOutputStream源码,果然发现了DFSOutputStream对底层的socket通信进行的包装的细节,先说说DFSOutputStream中的几个变量:
	private Socket s;	// 与datanode之间建立的socket连接
	private DataOutputStream blockStream;	// socket的输出流(client->datanode),用于将数据传输给datanode
	private DataInputStream blockReplyStream; // socket的输入流(datanode->client),用户收到datanode的确认包

除了socket和流以外,DFSOutputStream还有2个队列和2个线程:
    private LinkedList<Packet> dataQueue = new LinkedList<Packet>();
    // dataQueue是数据队列,用于保存等待发送给datanode的数据包
    private LinkedList<Packet> ackQueue = new LinkedList<Packet>();
    // ackQueue是确认队列,保存还没有被datanode确认接收的数据包
    ...
    private DataStreamer streamer = new DataStreamer();;
    // streamer线程,不停的从dataQueue中取出数据包,发送给datanode
    private ResponseProcessor response = null;
    // response线程,用于接收从datanode返回的反馈信息

所以,在向DFSOutputStream中,写入数据(通常是byte数组)的时候,实际的传输过程是:
1、byte[]被封装成64KB的Packet,然后扔进dataQueue中
2、DataStreamer线程不断的从dataQueue中取出Packet,通过socket发送给datanode(向blockStream写数据)
    发送前,将当前的Packet从dataQueue中移除,并addLast进ackQueue
3、ResponseProcessor线程从blockReplyStream中读出从datanode的反馈信息
       反馈信息很简单,就是一个seqno,再加上每个datanode返回的标志(成功标志为DataTransferProtocol.OP_STATUS_SUCCESS)
       通过判断seqno(序列号,每个Packet有一个序列号),判断datanode是否接收到正确的包。
       只有收到反馈包中的seqno与ackQueue.getFirst()的包seqno相同时,说明正确。否则可能出现了丢包的情况。

如果一切OK,则从ackQueue中移出:ackQueue.removeFirst(); 说明这个Packet被datanode成功接收了。


三、datanode端是怎么接收数据的?

上面分析的代码都位于客户端,那么datanode端的代码又如何呢?

答案是:

在DataNode端,有一个Daemon线程:dataXceiverServer,它有一个用于数据传输的ServerSocket一直开在那里。

每当有client连接到datanode时,datanode会new一个DataXceiver

DataXceiver负责数据的传输工作。

如果是写操作(client->datanode),则调用writeBlock方法:
	case DataTransferProtocol.OP_WRITE_BLOCK:
        	writeBlock( in );

writeBlock方法负责:将数据写入本地磁盘,并负责将数据传输给其他datanode,保证数据的拷贝数目(可以通过dfs.replication设置)。

具体负责数据接收的是这一行:

	blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
                                 mirrorAddr, null, targets.length);

	几个参数的含义:
		      DataOutputStream mirrOut, // output to next datanode
						// 下一个datanode的输出流
		      DataInputStream mirrIn,   // input from next datanode
						// 下一个datanode的输入流
		      DataOutputStream replyOut,  // output to previous datanode
						// 数据来源节点(可能是最初的client)的输出流
						// 用来发送反馈通知包
		      String mirrAddr, BlockTransferThrottler throttlerArg,
		      int numTargets) throws IOException {

在receiveBlock方法中,循环接收数据:
   
  /* 
       * Receive until packet length is zero.
       */
      while (receivePacket() > 0) {}

在receivePacket方法中:
	不断地从输入流中读取Packet数据:
		int payloadLen = readNextPacket();

	并将数据传输至下一个datanode节点:
		mirrorOut.write(buf.array(), buf.position(), buf.remaining());
		mirrorOut.flush();


	写入磁盘:
		out.write(pktBuf, dataOff, len);



四、如果一个文件超过1个block大小,怎么重定向到新的datanode的?在哪里分割的(file分割成blocks)?

答案是:在DFSOutputStream类的writeChunk方法里。

line 3043:
	if (bytesCurBlock == blockSize) {  // 问题是:它们能正好相等吗?万一bytesCurBlock > blockSize了怎么办?
            currentPacket.lastPacketInBlock = true;
            bytesCurBlock = 0;
            lastFlushOffset = -1;
	}
再往下几行:
int psize = Math.min((int)(blockSize-bytesCurBlock), writePacketSize);
        computePacketChunkSize(psize, bytesPerChecksum);

就是说,在new每个新的Packet之前,都会重新计算一下新的Packet的大小,
以保证新的Packet大小不会超过Block的剩余大小
如果block还有不到一个Packet的大小(比如还剩3kb的空间),则最后一个Packet的大小就是:
blockSize-bytesCurBlock,也就是3kb


line 2285:
              // get new block from namenode.
              if (blockStream == null) {
                LOG.debug("Allocating new block");
                nodes = nextBlockOutputStream(src); 
                this.setName("DataStreamer for file " + src +
                             " block " + block);
                response = new ResponseProcessor(nodes);
                response.start();
              }


在DataStreamer中,如果遇到one.lastPacketInBlock==true,则将blockStream设为null,之后会重新写入新的block。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

hadoop的DFSOutputStream 的相关文章

随机推荐

  • Git:Unable to negotiate with错误

    今天更新了一下Git版本 发现推送代码的时候提示 Unable to negotiate with xx xx xx xx port 29418 no matching host key type found Their offer ssh
  • JS 如何将数组对象中多个特定值取出形成新数组

    有如下数据 list mainId 581 errMsg null sort null status null nowFarmId null nowPigstyId null insideEarCode MT5687G pigEntityC
  • Linux操作系统原理与应用实验 实验三 实验四 问题总结

    本文是在进行Linux实验三和实验四所遇到的问题或学到的小知识进行总结 以调通程序为内容导向 问题的解决方法或许不难 也都能搜到 但是如果笔者自己总结下来能够节省很多人去搜索解决方法的繁琐步骤 节省大家的时间 实验三 文件操作算法 新版本带
  • gdb调试动态链接库

    转自 http www cnblogs com ybgame archive 2012 03 23 2414078 html 在 Linux 可以用 gdb 来调试应用程序 当然前提是用 gcc 编译程序时要加上 g 参数 我这篇文章里将讨
  • ML学习笔记(二)----交叉验证、偏差和方差分析

    什么是交叉验证 交叉验证是一种模型验证技术 可用于评估统计分析 模型 结果在其它独立数据集上的泛化能力 它主要用于预测 我们可以用它来评估预测模型在实践中的准确度 交叉验证的目标是定义一个数据集 以便于在训练阶段 例如 验证数据集 中测试模
  • JAVA中类的加载过程

    摘自 http soft chinabyte com database 312 12643812 shtml http www cnblogs com yshb archive 2012 11 05 2756194 html 类的生命周期
  • 0.96寸oled显示坏苹果(badapple)

    前言 俗话说 有屏幕的地方就会有badapple 下面带来使用0 96寸OLED屏幕显示badapple的教程 1 获取视频 首先从网上下载badapple的视频 下载地址 badapple 2 抓取视频图片 使用OLED播放视频的思想就是
  • 微信小程序开发课程学习总结(关于电影的内容)

    微信小程序开发课程学习总结 关于电影的内容 最终效果 课程学习网址 让我们开始第一步 一堆页面简介 app js 文件 app json文件 app wxss文件 好了正式开始第一个页面 新闻详情页面 电影模块 关于Template 模板
  • Asp.net可输入下拉框服务器控件 C#版

    备注 改自Ryan Liu dpliu cbdsystem com cn vb net using System using System Collections using System ComponentModel using Syst
  • 错误码:events.js:183 throw er; // Unhandled ‘error‘ event—解决方案

    显示内容 events js 291 throw er Unhandled error event Error listen EADDRINUSE address already in use 80 at Server setupListe
  • watch监听(普通和深度监听)

    普通 data user 定义要监听的对象 watch 普通监听值有变化就打印 newVal oldVal新旧值参数 user newVal oldVal console log user this user 深度监听 监听对象里面的数组或
  • 架构师日记-深入理解软件设计模式

    作者 京东零售 刘慧卿 一 设计模式与编程语言 1 1 什么是设计模式 设计模式 Design pattern 由软件开发人员在软件开发中面临常见问题的解决方案 是经过长时间的试验积累总结出来的 它使设计更加灵活和优雅 复用性更好 从实用的
  • vue2里设置input光标位置

    人狠话不多 直接上业务需求 垃圾需求 凑合看 我的业务是在企微应用里 图片识别 然后点客户姓名 手机号 输入框 识别的结果可以点击回填到输入框内 这里思考回填的情况 1 可能是 直接输入 然后点下面识别的字回填 输入框是空的 直接点一个字拼
  • 【小甲鱼C语言】课后笔记第一章第一节——打印(printf)

    目录 1 打印 就是 输出 的意思 2 使用 GCC 编译程序 gcc 源代码 o 可执行文件 3 printf 是格式化输出函数 a 函数概要 b 函数原型 c 参数分析 d 返回值 e 演示 4 转义字符 5 反斜杠的奥义 6 课后习题
  • Qt自定义标题栏-移动窗口

    前情提要 众所周知 一个最简单的窗口也是有标题栏的 Windows默认提供的标题栏上有 图标 窗口标题 Min Max Close按钮 但是 这未免太过局限 高自由度的自定义是极客 Geek 精神不可或缺的一部分 如果你想在标题栏上增加 减
  • spring.jpa.hibernate.ddl-auto的配置

    spring jpa hibernate ddl auto 可以显式设置 spring jpa hibernate ddl auto 标准的Hibernate属性值有 none validate update create create d
  • mysql之 mysql 5.6不停机双主一从搭建(活跃双主一从基于日志点复制)

    环境说明 版本 version 5 6 25 log 主1库ip 10 219 24 25主2库ip 10 219 24 22从1库ip 10 219 24 26os 版本 centos 6 7已安装热备软件 xtrabackup 防火墙已
  • A template class for binding C++ to Lua

    A template class for binding C to Lua 标签 classc bindingconstructorluafunction 2006 09 09 15 50 1397人阅读 评论 0 收藏 举报 目录 htt
  • OpenMMLab-AI实战营第二期-人体关键点检测与MMPose

    人体关键点检测与MMPose 课程链接 https www bilibili com video BV1kk4y1L7Xb 这个课程的大致内容是介绍如何从给定的二维影像中恢复出人体的姿态 2D或者3D 大纲如下所示 基本上可以认为流程是 先
  • hadoop的DFSOutputStream

    当我们用命令 hadoop fs copyFromLocal localfile hdfs 将本地文件复制到HDFS时 其背后的复制过程是怎样的 本地文件通过什么方式传输到datanode上的呢 这里面很显然的是 1 文件在多个电脑之间进行