hadoop之HDFS:通过Java API访问HDFS

2023-05-16

HDFS是一个分布式文件系统,可以通过Java API接口对HDFS进行操作,下面记录实现Java API的过程和出现的一些问题及解决方案

环境搭建

导入jar包

#common包中的jar文件导入
hadoop-2.8.1\share\hadoop\common\lib\*.jar
hadoop-2.8.1\share\hadoop\common\hadoop-common-2.8.1.jar

#客户端需要配置,里面有hdfs封装的client,不导入会报错
hadoop-2.8.1\share\hadoop\hdfs\hadoop-hdfs-client-2.8.1.jar

导入jar包,上面是需要导入的jar所在目录,关于common\lib\中jar,可能有部分jar包不用导入,查了很多资料没有说明,现在都导入。

配置 log4j日志

log4j.rootLogger=WARN, stdout, R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
# Pattern to output the caller's file name and line number.
#log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n
# Print the date in ISO 8601 format
log4j.appender.stdout.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=example.log
log4j.appender.R.MaxFileSize=100KB
# Keep one backup file
log4j.appender.R.MaxBackupIndex=1
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%p %t %c - %m%n
# Print only messages of level WARN or above in the package com.foo.
log4j.logger.com.foo=WARN

放在src目录下

访问HDFS Java代码

package com.yodosmart.hadoop;

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;
import java.io.Writer;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

/**
 * 利用HDFS java API操作文件
 * 
 */
public class HdfsDAO {
	// 修改成自己的HDFS主机地址
	private static final String HDFS = "hdfs://192.168.3.39:9000";
	private static final String utf8 = "UTF-8";
	private static final String gbk = "GBK";

	/**
	 * 两个构造器
	 * 
	 * @param conf
	 */
	public HdfsDAO(Configuration conf) {
		this(HDFS, conf);
	}

	public HdfsDAO(String hdfs, Configuration conf) {
		this.hdfsPath = hdfs;
		this.conf = conf;
	}

	private String hdfsPath;
	private Configuration conf;

	/**
	 * 测试方法入口
	 */
	public static void main(String[] args) throws IOException {
		Configuration conf = config();
		HdfsDAO hdfs = new HdfsDAO(conf);
		// hdfs.mkdirs("/dir1");
		// hdfs.rm("/123");
		// hdfs.rename("/dir", "/out");
		// hdfs.createFile("/test", "测试");
		// hdfs.copyFile("D:/agera.png", "/agera.png");
		// convert("D:/111.txt",gbk,"D:/123.txt",utf8);
		// hdfs.download("/agera.png", "D:/agera1.png");
		// hdfs.cat("/hello1");
		// hdfs.ls("/");
	}

	public static Configuration config() {
		Configuration conf = new Configuration();
		return conf;
	}

	/**
	 * 创建目录
	 * 
	 * @param folder
	 *            hdfs路径
	 * @throws IOException
	 */
	public void mkdirs(String folder) throws IOException {
		Path path = new Path(folder);
		FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
		if (!fs.exists(path)) {
			fs.mkdirs(path);
			System.out.println("Create: " + folder);
		}
		fs.close();
	}

	/**
	 * 删除文件或目录
	 * 
	 * @param folder
	 *            hdfs路径
	 * @throws IOException
	 */
	public void rm(String folder) throws IOException {
		Path path = new Path(folder);
		FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
		fs.deleteOnExit(path);
		System.out.println("Delete: " + folder);
		fs.close();
	}

	/**
	 * 重命名文件
	 * 
	 * @param src
	 *            hdfs路径原文件名
	 * @param dst
	 *            hdfs路径新文件名
	 * @throws IOException
	 */
	public void rename(String src, String dst) throws IOException {
		Path name1 = new Path(src);
		Path name2 = new Path(dst);
		FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
		fs.rename(name1, name2);
		System.out.println("Rename: from " + src + " to " + dst);
		fs.close();
	}

	/**
	 * 遍历文件 hdfs路径
	 * 
	 * @param folder
	 * @throws IOException
	 */
	public void ls(String folder) throws IOException {
		Path path = new Path(folder);
		FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
		FileStatus[] list = fs.listStatus(path);
		System.out.println("ls: " + folder);
		System.out
				.println("==========================================================");
		for (int i = 0; i < list.length; i++) {
			System.out.println("name:" + list[i].getPath() + ", folder: "
					+ list[i].isDir() + ", size:" + list[i].getLen() + "\n");
		}
		System.out
				.println("==========================================================");
		fs.close();
	}

	/**
	 * 创建文件
	 * 
	 * @param file
	 *            hdfs路径文件名
	 * @param content
	 *            文件内容
	 * @throws IOException
	 */
	public void createFile(String file, String content) throws IOException {
		FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
		byte[] buff = content.getBytes();
		FSDataOutputStream os = null;
		try {
			os = fs.create(new Path(file));
			os.write(buff, 0, buff.length);
			System.out.println("Create: " + file);
		} finally {
			if (os != null)
				os.close();
		}
		fs.close();
	}

	/**
	 * 拷贝文件到HDFS
	 * 
	 * @param local
	 *            本地路径文件名
	 * @param remote
	 *            hdfs路径文件名
	 * @throws IOException
	 */
	public void copyFile(String local, String remote) throws IOException {
		// convert(local, gbk, local, utf8);
		FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
		FSDataOutputStream out = fs.create(new Path(remote));
		FileInputStream in = new FileInputStream(local);
		IOUtils.copyBytes(in, out, 2048, true);
		// fs.copyFromLocalFile(new Path(local), new Path(remote));
		System.out.println("copy from: " + local + " to " + remote);
		fs.close();
	}

	/**
	 * 转换编码
	 * 
	 * @param oldFile
	 *            原文件
	 * @param oldCharset
	 *            原文件编码
	 * @param newFlie
	 *            新文件
	 * @param newCharset
	 *            新文件编码
	 */
	public static void convert(String oldFile, String oldCharset,
			String newFlie, String newCharset) {
		BufferedReader bin;
		FileOutputStream fos;
		StringBuffer content = new StringBuffer();
		try {
			bin = new BufferedReader(new InputStreamReader(new FileInputStream(
					oldFile), oldCharset));
			String line = null;
			while ((line = bin.readLine()) != null) {
				content.append(line);
				content.append(System.getProperty("line.separator"));
			}
			bin.close();

			File dir = new File(newFlie.substring(0, newFlie.lastIndexOf("/")));
			if (!dir.exists()) {
				dir.mkdirs();
			}
			fos = new FileOutputStream(newFlie);
			Writer out = new OutputStreamWriter(fos, newCharset);
			out.write(content.toString());
			out.close();
			fos.close();
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		} catch (FileNotFoundException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	/**
	 * 从HDFS中下载文件到本地中
	 * 
	 * @param remote
	 *            hdfs文件路径
	 * @param local
	 *            本地路径
	 * @throws IOException
	 */
	public void download(String remote, String local) throws IOException {
		Path path = new Path(remote);
		FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
		fs.copyToLocalFile(path, new Path(local));
		System.out.println("download: from" + remote + " to " + local);
		fs.close();
	}

	/**
	 * 查看文件中的内容
	 * 
	 * @param remoteFile
	 *            文件路径
	 * @return
	 * @throws IOException
	 */
	public String cat(String remoteFile) throws IOException {
		Path path = new Path(remoteFile);
		FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
		FSDataInputStream fsdis = null;
		System.out.println("cat: " + remoteFile);
		OutputStream baos = new ByteArrayOutputStream();
		String str = null;
		String trans = null;
		try {
			fsdis = fs.open(path);
			IOUtils.copyBytes(fsdis, baos, 4096, false);
			str = baos.toString();
			// 转码
			// trans = new String(str.getBytes(), 0, str.getBytes().length,
			// utf8);
		} finally {
			IOUtils.closeStream(fsdis);
			fs.close();
		}
		System.out.println(str);
		return str;
	}
}

常见错误

java.net.MalformedURLException: unknown protocol: hdfs

默认的情况下,URL只支持http协议的,所以需要设定开启HDFS协议

//设定开启HDFS协议
        URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());

Unable to load native-hadoop library for your platform

win系统下调用的hadoop客户端,导致不能使用本地的hadoop类,需要导入hadoop-hdfs-client-2.8.1.jar,这样就可以解决问题了

ConnectException: 拒绝连接

修改 core-site.xml文件和etc/hosts文件,修改后,stop-all.sh、start-all.sh重启生效
1、修改core-site.xml文件,将fs.defaultFS 修改为ip地址就可以了(基于IP访问)
2、将/etc/hosts中添加 DHCP获取或者自己设置的IP地址 到localhost主机名的映射(基于域名,主机名称访问)

參考 hadoop之HDFS:CentOS安装和部署HDFS中的Hadoop的伪分布式环境搭建部分

**注意:**无论做哪一个步,都需要开启9000端口,或者关闭防火墙

HDFS客户端的权限错误:Permission denied

1、在hdfs的配置文件中,将dfs.permissions修改为False
2、执行这样的操作 hadoop fs -chmod 777 /user/hadoop
我是用了第一种hdfs-site.xml中添加:

 <property>
      <name>dfs.permissions</name>
      <value>false</value>
 </property>

Hadoop bin directory does not exist和Did not find winutils.exe

Windows下eclipse开发hadoop程序会报错,原因是因为hadoop没有发布winutils.exe造成的,需要编译发布出来;在环境变量中配置 HADOOP_HOME。

  1. 下载 hadoop-common-2.8.1-bin 地址:hadoop-common-2.8.1-bin下载地址

  2. 用户变量新建:
    用户变量

    注意:图片是2.2.0版的,上面的资源是2.8.1,需要改变量值,我起 初用的2.2.0版的,后来直接把2.2.0的bin覆盖了,所以没改

  3. 环境变量添加:

环境变量

参考资料:
1 http://blog.csdn.net/xiaoshunzi111/article/details/52062640
2 http://blog.csdn.net/yelllowcong/article/details/77409604

欢迎关注我的公众号,持续分析优质技术文章
欢迎关注我的公众号

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

hadoop之HDFS:通过Java API访问HDFS 的相关文章

  • 我需要在 Spring 中检查每个控制器中的有效会话吗? [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 假设在 Spring Mvc 的 Web 应用程序中 我们是否需要检查每个控制器或 jsps 中的有效会话 我该如何解决 MVC 中的
  • Java程序中的数组奇怪的行为[重复]

    这个问题在这里已经有答案了 我遇到了这个 Java 程序及其以意想不到的方式运行 以下程序计算 int 数组中元素对之间的差异 import java util public class SetTest public static void
  • ExceptionConverter:java.io.IOException:文档没有页面。我正在使用 iText

    当我执行下面的代码时 File f new File c sample pdf PdfWriter getInstance document new FileOutputStream f document open System out p
  • 如何查找 Android 设备中的所有文件并将它们放入列表中?

    我正在寻求帮助来列出 Android 外部存储设备中的所有文件 我想查找所有文件夹 包括主文件夹的子文件夹 有办法吗 我已经做了一个基本的工作 但我仍然没有得到想要的结果 这不起作用 这是我的代码 File files array file
  • IntelliJ IDEA 创建的 JAR 文件无法运行

    我在 IntelliJ 中编写了一个跨越几个类的程序 当我在 IDE 中测试它时它运行良好 但是 每当我按照教程将项目制作成 jar 可执行文件时 它就不会运行 双击 out 文件夹中的文件时 该文件不会运行 并显示 无法启动 Java J
  • java中删除字符串中的特殊字符?

    如何删除字符串中除 之外的特殊字符 现在我用 replaceAll w s 它删除了所有特殊字符 但我想保留 谁能告诉我我该怎么办 Use replaceAll w s 我所做的是将下划线和连字符添加到正则表达式中 我添加了一个 连字符之前
  • 序列化对象以进行单元测试

    假设在单元测试中我需要一个对象 其中所有 50 个字段都设置了一些值 我不想手动设置所有这些字段 因为这需要时间而且很烦人 不知何故 我需要获得一个实例 其中所有字段都由一些非空值初始化 我有一个想法 如果我要调试一些代码 在某个时候我会得
  • 归并排序中的递归:两次递归调用

    private void mergesort int low int high line 1 if low lt high line 2 int middle low high 2 line 3 mergesort low middle l
  • 检查 protobuf 消息 - 如何按名称获取字段值?

    我似乎无法找到一种方法来验证 protobuf 消息中字段的值 而无需显式调用其 getter 我看到周围的例子使用Descriptors FieldDescriptor实例到达消息映射内部 但它们要么基于迭代器 要么由字段号驱动 一旦我有
  • 将 Long 转换为 DateTime 从 C# 日期到 Java 日期

    我一直尝试用Java读取二进制文件 而二进制文件是用C 编写的 其中一些数据包含日期时间数据 当 DateTime 数据写入文件 以二进制形式 时 它使用DateTime ToBinary on C 为了读取 DateTime 数据 它将首
  • Java直接内存:在自定义类中使用sun.misc.Cleaner

    在 Java 中 NIO 直接缓冲区分配的内存通过以下方式释放 sun misc Cleaner实例 一些比对象终结更有效的特殊幻像引用 这种清洁器机制是否仅针对直接缓冲区子类硬编码在 JVM 中 或者是否也可以在自定义组件中使用清洁器 例
  • Keycloak - 自定义 SPI 未出现在列表中

    我为我的 keycloak 服务器制作了一个自定义 SPI 现在我必须在管理控制台上配置它 我将 SPI 添加为模块 并手动安装 因此我将其放在 module package name main 中 并包含 module xml 我还将其放
  • Android JNI C 简单追加函数

    我想制作一个简单的函数 返回两个字符串的值 基本上 java public native String getAppendedString String name c jstring Java com example hellojni He
  • Java - 不要用 bufferedwriter 覆盖

    我有一个程序可以将人员添加到数组列表中 我想做的是将这些人也添加到文本文件中 但程序会覆盖第一行 因此这些人会被删除 如何告诉编译器在下一个空闲行写入 import java io import java util import javax
  • 如何配置eclipse以保持这种代码格式?

    以下代码来自 playframework 2 0 的示例 Display the dashboard public static Result index return ok dashboard render Project findInv
  • 查看Jasper报告执行的SQL

    运行 Jasper 报表 其中 SQL 嵌入到报表文件 jrxml 中 时 是否可以看到执行的 SQL 理想情况下 我还想查看替换每个 P 占位符的值 Cheers Don JasperReports 使用 Jakarta Commons
  • 休眠以持久保存日期

    有没有办法告诉 Hibernate java util Date 应该持久保存 我需要这个来解决 MySQL 中缺少的毫秒分辨率问题 您能想到这种方法有什么缺点吗 您可以自己创建字段long 或者使用自定义的UserType 实施后User
  • java迭代器内部是如何工作的? [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我有一个员工列表 List
  • com.jcraft.jsch.JSchException:身份验证失败

    当我从本地磁盘上传文件到远程服务器时 出现这样的异常 com jcraft jsch JSchException Auth fail at org apache tools ant taskdefs optional ssh Scp exe
  • java8 Collectors.toMap() 限制?

    我正在尝试使用java8Collectors toMap on a Stream of ZipEntry 这可能不是最好的想法 因为在处理过程中可能会发生异常 但我想这应该是可能的 我现在收到一个我不明白的编译错误 我猜是类型推理引擎 这是

随机推荐

  • ImportError: DLL load failed while importing Qsci

    pyqt5报错 xff1b ImportError span class token operator span DLL load failed span class token keyword while span importing Q
  • 关于编译QGIS3.22.12配置的python库

    注意 xff1a 安装了多个python环境的很容易编译出错 xff1b 其中3 22需要配置python3 9的环境 在编译QGIS过程中的报错 报错1 xff1a No module named span class token cha
  • Qt中设置删除提示框

    Qt中设置删除提示框 QMessageBox StandardButton sButton 61 QMessageBox question NULL QObject tr 34 提示 34 QObject tr 34 该数据是否删除 34
  • 你真的对图像格式了解么?

    图像是人类视觉的基础 xff0c 是自然景物的客观反映 xff0c 是人类认识世界和人类本身的重要源泉 图 是物体反射或透射光的分布 xff0c 像 是人的视觉系统所接受的图在人脑中所形成的印象或认识 xff0c 照片 绘画 剪贴画 地图
  • 配置Qt中pro、pri文件

    002helloWorld pro span class token macro property span class token directive hash span span class token directive keywor
  • 构建一个字符串类

    文章目录 1 构建字符串数据 2 创建那些函数给外界调用 1 构造函数以及设计构造函数初值 2 类内带有指针 考虑3个特殊重要函数 3 考虑辅助函数 3 定义具体函数 1 设计构造函数内容
  • PostgreSQL查询

    PostgreSQL 数据库连接 QT xff1a 建立到数据库的连接 QSqlDatabase db 61 QSqlDatabase addDatabase 34 QPSQL 34 db setHostName 34 localhost
  • Conda install 报错:An HTTP error occurred when trying to retrieve this URL. HTTP errors are often...

    1 问题描述 xff1a 准备在Anaconda prompt执行以下命令 xff1a conda install c stellargraph stellargraph 报错 xff1a An HTTP error occurred wh
  • svn原理----revert,回滚

    一 子命令Svn revert 取消所有的本地编辑 下面我们来看一下子命令Svn revert例子 xff1a 丢弃对一个文件的修改 xff1a Svn revert foo c Reverted foo c 如果你希望恢复一整个目录的文件
  • Qt 自定义控件提升,头文件找不到的问题

    Qt 自定义控件提升 xff0c 头文件找不到的问题 在附加包含目录添加 xff1a
  • 分析int(*p)[4] = a

    面试题 xff1a 二级指针 include lt iostream gt int main int a 3 4 61 0 1 2 3 4 5 6 7 8 9 10 11 int p 4 61 a std cout lt lt p 43 1
  • af::convolve在CUDA中局限性

    使用在Cuda出现访问冲突问题 xff08 opengcl正常 xff09 xff1a af convolve I I kernel 报错 xff1a 0x00007FFC6443ADAC af dll 处 位于 XXXX exe 中 引发
  • 2016

    2016 最近 xff0c 许多朋友兴起总结2016了 xff0c 看得我心痒 xff0c 心热 我自己不禁也总结起来了 别人的总结要么是 2016XXXX 要么是 2016OOOO 我苦思2秒 xff0c 却想不起一个标题来 xff0c
  • gdb反汇编disassemble

    GDB Command Reference disassemble command gdb反汇编可用disassemble disass命令 用法如下 xff1a disassemble disassemble Function 指定要反汇
  • S.M.A.R.T. 参数(smartctl)计算硬盘精确健康值

    参考 xff1a Acronis Drive Monitor Disk Health Calculation 文章目录 1 背景2 smartctl a dev sda3 计算健康值3 1 关键参数3 1 1 公式说明3 2 2 计算举例
  • python脚本——通过telnet连接设备

    文章目录 一 说明二 代码三 用法总结 一 说明 通过telnetlib库 xff0c telnet到设备上并做一些测试 包括重启设备 等待重启完成 其它测试操作等 二 代码 span class token comment usr bin
  • lspci 命令详解及常用命令

    文章目录 一 说明二 参数说明三 用法举例 一 说明 lspci是查看设备上pcie设备信息的命令 该命令的不同参数配合 xff0c 在查看pcie设备和定位pcie问题时很有用 包括查看pcie设备中断号 查看配置空间内容 修改配置空间寄
  • 中断模式和polling模式 && 硬件中断和软件中断

    文章目录 一 总结在前二 中断2 1 硬件中断与软件中断2 1 1 对比2 1 2 硬件中断2 1 3 软件中断 三 polling 一 总结在前 S NOInterruptPolling1中断模式下 xff0c 设备通知CPU有业务需要被
  • dma_alloc_coherent 申请内存用法和问题总结

    文章目录 1 dma alloc coherent用法2 问题3 解决方法方法一 xff0c 走CMA空间配置3 1 内核配置 96 96 CONFIG CMA 96 96 3 2 修改cma起始地址3 3 设置cma空间 xff08 大小
  • hadoop之HDFS:通过Java API访问HDFS

    HDFS是一个分布式文件系统 xff0c 可以通过Java API接口对HDFS进行操作 xff0c 下面记录实现Java API的过程和出现的一些问题及解决方案 环境搭建 导入jar包 common包中的jar文件导入 hadoop 2