hadoop和spark读取GBK编码乱码

2023-11-16

转自:http://www.cnblogs.com/teagnes/p/6112019.html

首先来看一下为什么会出现这个问题, 下面是一个最简单的spark的wordcount程序,sc.textFile(filePath)方法从文本文件创建RDD,传入文件路径filePath,查看textFile方法, 可以看到,实际上调用了TextInputformat类来解析文本文件,熟悉hadoop的一定知道,mapreudce默认的解析文件文件的类就是TextInputformat,并返回了K V键值对

1
2
3
4
5
6
7
8
9
10
11
object  Wordcount {
   def  main(args :  Array[String]) {
     val  filePath  =  "" ;
     val  conf  =  new  SparkConf()
       .setAppName( "WordCountApp" )
     val  sc  =  new  SparkContext(conf)
     val  line  =  sc.textFile(filePath)
     line.flatMap( _ .split( " " )).map(( _ 1 )).reduceByKey( _ + _ ).collect.foreach(println)
     sc.stop
   }
}

 

1
2
3
4
5
6
7
def  textFile(
      path :  String,
      minPartitions :  Int  =  defaultMinPartitions) :  RDD[String]  =  withScope {
      assertNotStopped()
      hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair  = > pair. _ 2 .toString).setName(path)
  }

   继续看TextInputFormat源码,TextInputFormat有两个作用。

   一是对输入文件分片,mapreduce会为每一个分片都起动一个map任务来处理,分片的任务由TextInputFormat的父类FileInputFormat完成,这里就不做深究了, TextInputFormat中只有读取数据的方法。

        二是从分片的数据,生成k v键值对也就是Recordreader ,createRecordReader方法不断的生成Recordreader对像并交给map端去处理 ,下面的代码中在delimiter.getBytes(Charsets.UTF_8)设置了字符集,很可惜这里并不是读取文件时使用的,而是指定了redcord的分割符,默认情况下是每一行生成一个record,一般情况下我们不需要使用到这个参数,只有在设置多行作为一个record输入的时候才会用到,可以通过设置参数“textinputformat.record.delimiter”来设置,那我们是不是可以在代码中指定我们的读取文件的字符集呢?

package org.apache.hadoop.mapreduce.lib.input;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import com.google.common.base.Charsets;

/** An {@link InputFormat} for plain text files.  Files are broken into lines.
 * Either linefeed or carriage-return are used to signal end of line.  Keys are
 * the position in the file, and values are the line of text.. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {

  @Override
  public RecordReader<LongWritable, Text> 
    createRecordReader(InputSplit split,
                       TaskAttemptContext context) {
    String delimiter = context.getConfiguration().get(
        "textinputformat.record.delimiter");
    byte[] recordDelimiterBytes = null;
    if (null != delimiter)
      recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
    return new LineRecordReader(recordDelimiterBytes);
  }

  @Override
  protected boolean isSplitable(JobContext context, Path file) {
    final CompressionCodec codec =
      new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
    if (null == codec) {
      return true;
    }
    return codec instanceof SplittableCompressionCodec;
  }

}

  

  继续看LineRecordReader类,查看其中的nextKeyValue方法,该方法是具体生成k v记录时候使用的,这里有两个很意思的点,需要注意。

  一是skipUtfByteOrderMark()方法,该方法处理了当文件是有bom的utf-8格式的时候,读取程序自动跳过bom,有待具体测试一下

  二是如果我们读到的行跨块了怎么处理?因为hdfs是按文件的大小来切分文件的,难免一行数据被切分到两个块中去了,这里有相应的处理的逻辑,这里就不再详细说明了

 1  public boolean nextKeyValue() throws IOException {
 2     if (key == null) {
 3       key = new LongWritable();
 4     }
 5     key.set(pos);
 6     if (value == null) {
 7       value = new Text();
 8     }
 9     int newSize = 0;
10     // We always read one extra line, which lies outside the upper
11     // 具体读取记录的方法split limit i.e. (end - 1)
12     while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
13       if (pos == 0) {
14         newSize = skipUtfByteOrderMark();
15       } else {
16         newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
17         pos += newSize;
18       }
19 
20       if ((newSize == 0) || (newSize < maxLineLength)) {
21         break;
22       }
23 
24       // line too long. try again
25       LOG.info("Skipped line of size " + newSize + " at pos " + 
26                (pos - newSize));
27     }
28     if (newSize == 0) {
29       key = null;
30       value = null;
31       return false;
32     } else {
33       return true;
34     }
35   }

 这里的value就是在map端获得的value,看它是怎么被赋值的,可以看到是从输入流中读取数据,这里有两种读取的方法,默认readDefaultLine的读取一行和通过自定义readCustomLine的分隔符的跨行

 public int readLine(Text str, int maxLineLength,
                      int maxBytesToConsume) throws IOException {
    if (this.recordDelimiterBytes != null) {
      return readCustomLine(str, maxLineLength, maxBytesToConsume);
    } else {
      return readDefaultLine(str, maxLineLength, maxBytesToConsume);
    }
  }

默认的方式读取文件并没有用到自定义的分割符,而value获取到的还是输入流中的字节码,所以value的获得的依旧是文件的字节码,并没有做过处理,那么我们是不是可以在map端获取到的字节码按照“GBK”的方式来解码读取呢?经过测试之后发现的确是可以正常读取的

private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
  throws IOException {
    /* We're reading data from in, but the head of the stream may be
     * already buffered in buffer, so we have several cases:
     * 1. No newline characters are in the buffer, so we need to copy
     *    everything and read another buffer from the stream.
     * 2. An unambiguously terminated line is in buffer, so we just
     *    copy to str.
     * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
     *    in CR.  In this case we copy everything up to CR to str, but
     *    we also need to see what follows CR: if it's LF, then we
     *    need consume LF as well, so next call to readLine will read
     *    from after that.
     * We use a flag prevCharCR to signal if previous character was CR
     * and, if it happens to be at the end of the buffer, delay
     * consuming it until we have a chance to look at the char that
     * follows.
     */
    str.clear();
    int txtLength = 0; //tracks str.getLength(), as an optimization
    int newlineLength = 0; //length of terminating newline
    boolean prevCharCR = false; //true of prev char was CR
    long bytesConsumed = 0;
    do {
      int startPosn = bufferPosn; //starting from where we left off the last time
      if (bufferPosn >= bufferLength) {
        startPosn = bufferPosn = 0;
        if (prevCharCR) {
          ++bytesConsumed; //account for CR from previous read
        }
        bufferLength = fillBuffer(in, buffer, prevCharCR);
        if (bufferLength <= 0) {
          break; // EOF
        }
      }
      for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
        if (buffer[bufferPosn] == LF) {
          newlineLength = (prevCharCR) ? 2 : 1;
          ++bufferPosn; // at next invocation proceed from following byte
          break;
        }
        if (prevCharCR) { //CR + notLF, we are at notLF
          newlineLength = 1;
          break;
        }
        prevCharCR = (buffer[bufferPosn] == CR);
      }
      int readLength = bufferPosn - startPosn;
      if (prevCharCR && newlineLength == 0) {
        --readLength; //CR at the end of the buffer
      }
      bytesConsumed += readLength;
      int appendLength = readLength - newlineLength;
      if (appendLength > maxLineLength - txtLength) {
        appendLength = maxLineLength - txtLength;
      }
      if (appendLength > 0) {
        str.append(buffer, startPosn, appendLength);
        txtLength += appendLength;
      }
    } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);

    if (bytesConsumed > Integer.MAX_VALUE) {
      throw new IOException("Too many bytes before newline: " + bytesConsumed);
    }
    return (int)bytesConsumed;
  }

 

解决方法:

spark读取GBK编码文件

将value的字节码按照GBK的方式读取变成字符串,运行之后能够正常显示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
object  GBKtoUTF 8  {
 
   def  main(args :  Array[String]) :  Unit  =  {
     val  conf  =  new  SparkConf()
       .setAppName( " GBK TO UTF8" )
       .setMaster( "local" )
 
     val  sc  =  new  SparkContext(conf)
 
     val  rdd  =  sc.hadoopFile( "F:\\data\\score.txt" , classOf[TextInputFormat], classOf[LongWritable], classOf[Text],  1 )
       .map(p  = new  String(p. _ 2 .getBytes,  0 , p. _ 2 .getLength,  "GBK" ))
       .flatMap(s  = > s.split( "," ))
       .map(x  = > (x,  1 ))
       .reduceByKey( _  _ )
       .collect
       .foreach(println)
   }
 
 
}

 

hadoop读取GBK编码文件

public void map(LongWritable key, Text value, Context context) {
        try {

            String line;
            line = new String(value.getBytes(), 0, value.getLength(), "GBK");//使用GBK解析字节码 ,转成String
            logger.info("gbkstr " + line);
            
            //不要使用toStirng方法来获取字符串
            //line = value.toString();    
            //logger.info("str " + line);
            
              String[] item = line.split(",");
            for (String str : item) {
                outkey = new Text(str);
                context.write(outkey, outvalue);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

 


本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

hadoop和spark读取GBK编码乱码 的相关文章

  • 海致大数据京信_5G行情再度点燃,6大细分领域龙头股曝光!(名单)

    今年的的主线行情依然是科技股 而科技当以看5G和华为为主 5G板块已经走了三周的震荡 空头明显有衰竭之像 5G产业链的细分行业龙头 基站天线 拥有5G大规模天线技术的核心厂家 以及天线和射频一体化趋势的行业变革对天线厂家的影响 000063
  • PowerDesigner15的PDM中表图显示NAME和CODE

    在POWERDESIGNER15里 设计PDM中的表默认只显示字段的解析名和数据类型 如果需要显示NAME的同时也显示CODE 按下面的方法操作设置 在空白处右键 选Display Preferences 在弹出对话框中的左边树选择TABL
  • bitmap 位图 头结构

    转自 http www cnblogs com lzlsky archive 2012 08 16 2641698 html 位图BITMAPINFOHEADER 与BITMAPFILEHEADER 先来看BITMAPINFOHEADER
  • 使用QGIS软件对面数据顶点编辑实现数据的微改动

    在平时的项目中 会出现网上下载的数据与实际不符 有一些小小的偏差 这时候需要对一些区域数据进行微调 这就可以利用QGIS的顶点工具进行操作 目录 1 第一步 数据加载 2 第二步 图层编辑 3 第三步 顶点工具 4 第四步 具体操作 1 第
  • js中的分割截取方法

    在我们平时写项目以及做算法题时 经常会遇到截取字符串 截取数组 然后我发现每次用的方法都不一样 但是又长得很像 很容易搞不清楚 所以这篇博客就浅浅地来说一说这个问题 我们常用的分割方法主要有split splice slice substr
  • 【蒸汽冷凝器型号和PI控制】具有PID控制的蒸汽冷凝器的动力学模型(Matlab&Simulink)

    欢迎来到本博客 博主优势 博客内容尽量做到思维缜密 逻辑清晰 为了方便读者 座右铭 行百里者 半于九十 本文目录如下 目录 1 概述 2 运行结果 3 参考文献 4 Matlab代码 Simulink及文章 1 概述 摘要 建立了蒸汽冷凝器
  • 【Oracle】使用DataGrip连接Oracle数据库

    前言 因为PLSQL Developer试用期过期了 于是使用DataGrip连接Oracle进行学习使用 连接 1 创建连接 2 配置连接 2 1 Name 随便改 改一个合适的名字即可 2 2 Host Port SID Driver
  • C++ - 使用sort函数实现自定义排序

    1 背景 给你一些学生的资料数据 单个学生的资料数据包括如下内容 class student public int m age int m sex string m name public student int age int sex s
  • 算法题5

    题目 给定一个单词 你需要判断单词的大写使用是否正确 我们定义 在以下情况时 单词的大写用法是正确的 全部字母都是大写 比如 USA 单词中所有字母都不是大写 比如 leetcode 如果单词不只含有一个字母 只有首字母大写 比如 Goog
  • 最高成绩的输出(结构体)

    题目描述 从键盘输入若干个学生的信息 每个学生信息包括学号 姓名 3门课的成绩 计算每个学生的总分 输出总分最高的学生的信息 输入 首先输入一个整数n 1 lt n lt 100 表示学生人数 然后输入n行 每行包含一个学生的信息 学号 1
  • C++基础:

    什么是多态 哑巴了吧 你知道你倒是说呀 所谓多态也就是一个接口的多种实现方式 多态包括 虚函数 纯虚函数 覆盖 模板 重载与多态没有关系 虚函数 虚函数是带有virtual关键字的函数 定义一个函数为虚函数 定义他为虚函数是为了允许用基类的
  • deeplearning.ai课程作业:Course 1 Week 2

    deeplearning ai课程作业 Course 1 Week 2 原始作业在GitHub上下载 本文仅作为本人学习过程的记录 含答案 不喜勿看 全部自己跑过 保证可行 Part 1 Python Basics with Numpy o
  • input的复选框

  • redis 查看所有的key方式介绍

    本文主要介绍了redis 查看所有的key方式 具有很好的参考价值 希望对大家有所帮助 一起跟随微点阅读小编过来看看吧 可以使用KEYS 命令 1 KEYS pattern 例如 列出所有的key 1 redis gt keys 列出匹配的
  • SpringBoot漏洞大全

    原文出处 SpringBoot漏洞 qq com 前段时间做渗透 发现了一个很眼熟的页面 长这个样子 页面log是 去世界最大的同性交友网github com搜了一下 发现了一个十分详细的文章 存在大量接口信息泄露 成功交差 我打的网站有h
  • 【Python】Windows 11下更改python默认的pip install包安装路径

    Windows 11下更改python默认的pip install包安装路径 看到CSDN和知乎上有很多文章写如何修改pip包的默认安装路径 看了一遍基本都不管用 经过一定时间的摸爬滚打 终于找到了如何修改pip install默认安装路径
  • pychram 安装大三方库总是提示pip版本不匹配

    1 查看pip版本号 terminal终端执行pip list查看当前pip版本号 file settings project pychramproject python interpreter目录下查看搜索pip最新版本号 2 在文件夹地
  • This file isn‘t in your working directory. Teammates you share this request with won‘t be able to us

    postman上传图片文件问题 解决方案 进入设置 file gt settings 上传的文件必须在设置的工作区中 不然会报错 选择body file

随机推荐

  • python入门--Vscode创建python项目

    在VS Code中创建Python项目可以通过以下步骤实现 1 打开VS Code 2 点击左侧的 资源管理器 图标 3 选择一个文件夹 右键点击新建文件夹 命名为你的项目名称 4 打开终端 使用以下命令创建虚拟环境 python m ve
  • 杂项知识

    挂载 img 文件 mount t proc o loop initrd 2 6 23 1 42 fc8 img mnt img mount t debugfs o loop initrd 2 6 23 1 42 fc8 img mnt i
  • CCF-CSP真题《202305-2 矩阵运算》思路+python,c++满分题解

    想查看其他题的真题及题解的同学可以前往查看 CCF CSP真题附题解大全 试题编号 202305 2 试题名称 矩阵运算 时间限制 5 0s 内存限制 512 0MB 问题描述 题目背景 Softmax Q KTd V 是 Transfor
  • 基于openwrt平台搭建局域网技术验证之二

    1 测试目的 验证l2tp服务器模式的可行性 提供vpn l2tp模式的服务器功能 供客户端连接访问内网 2 参考资料 参考连接1 https www jianshu com p ccf8f2cca70e 参考连接2 https openw
  • win10小课堂:如何彻底关闭windows defender

    win10小课堂 如何彻底关闭windows defender Windows10系统中自带了windows defender杀毒软件 但是不少用户对它的评价褒贬不一 其一是扫描的频率太高 占用大量CPU 其二是有些文件 不经过任何提示就直
  • 解决 Command "python setup.py egg_info" failed with error code 1 问题

    解决 Command python setup py egg info failed with error code 1 问题 参考 pip install unroll python setup py egg info failed wi
  • 第七章软件静态测试

    7 1静态测试 静态测试 静态测试是指不运行被测程序本身 通过分析或检查源程序的语法 结构 过程 接口等来检查程序的正确性 其被测对象是各种与软件相关的有必要进行测试的产物 是对需求规格说明书 软件设计说明书 源程序做结构分析 流程图分析
  • Flex 布局

    一 Flex布局 Flex 是Flexible Box的缩写 意思是弹性布局 用来为盒模型布局提供最大的灵活性 任何一个容器都可以指定为flex布局 box display flex 行内元素也可以使用flex布局 box display
  • TCP报文段结构

    TCP报文段结构 源端口号和目的端口号 含义从名字就能看出来 序号和确认号 这二个字段被 TCP 发送方和接收方用来实现可靠数据传输服务 每个字段都是32比特 接收窗口 该字段用于流量控制 大小为16比特 首部长度 该字段指示了以 32 比
  • 12串口通信的定义-2

    1 设备状态信号线 数据装置准备好 DSR 高电平有效 数据终端准备好 DTR 高电平有效 2 请求发送 RTS 当数据终端设备 DTE 要发 允许发送 CTS 是对请求发送信号 RTS 的 3 接收控制线 载波检测 DCD 当数据通信设备
  • ultraiso 下载+破解+Linux U盘启动制作

    1 到官网下载ultraiso https cn ultraiso net xiazai html 2 将该软件安装到windows上 打开输入注册码进行破解 用户名 Guanjiu 注册码 A06C 83A7 701D 6CFC 3 破解
  • 处理雪花算法等造成的精度丢失问题

    前端js精度丢失因为number处理的是16位 雪花算法是19位 在前后端交互的时候就会造成精度损失 方法一 如果是专门针对某一个Id的话 JsonSerialize using ToStringSerializer class 注解可以实
  • c++数据结构第六周(图),深搜、广搜(stl版)

    本方法皆用vector进行邻接表模拟 7 1 图的先深搜索 作者 唐艳琴 单位 中国人民解放军陆军工程大学 输出无向图的给定起点的先深序列 输入格式 输入第一行给出三个正整数 分别表示无向图的节点数N 1
  • 秒杀系统中常见问题及解决方案

    秒杀中的常见问题的解决 1 解决超卖的问题 1 Redis预减库存 有一个下单请求过来时预减库存 若减完后的redis库存小于0说明已经卖完 此时直接返回客户端已经卖完 后续使用内存标记 减少Redis访问 若预减库存成功 则异步下单 请求
  • DateFormat setLenient

    SimpleDateFormat df new SimpleDateFormat MMddyyyy With lenient parsing the parser may use heuristics to interpret inputs
  • 惠普服务器关机自动重启,HP笔记本关机自动重启的解决办法

    部分型号的HP笔记本会在点击关机按钮的时候出现自动重启的现象 如 DV2803 V3608TX 这个是网卡的Wake On Lan功能而引起的 也就是网络唤醒功能 方法一 解决办法是在开机启动时按F10 进入bios设置界面 选择 系统设定
  • 为什么现在不看好 CV 方向了呢?

    来源 https www zhihu com question 383486199 编辑 深度学习与计算机视觉 声明 仅做学术分享 侵删 作者 匿名用户https www zhihu com question 383486199 answe
  • Android--图片轮播(banner)

    推荐第三方框架banner 地址 https github com youth5201314 banner 使用步骤 Step 1 依赖banner Gradle dependencies compile com youth banner
  • 进程控制-进程终止(exit、_exit)

    知道了进程怎么创建 接下来就来看看怎么终止一个进程终止函数exit 和 exit 函数 头文件 声明 exit stdlib h void exit int status exit unistd h void exit int status
  • hadoop和spark读取GBK编码乱码

    转自 http www cnblogs com teagnes p 6112019 html 首先来看一下为什么会出现这个问题 下面是一个最简单的spark的wordcount程序 sc textFile filePath 方法从文本文件创