从Hadoop URL 读取数据
使用java.net.URL对象文件打开数据流,从hadoop文件系统中读取文件。【注意:不支持通过URL方式进行写操作】
让java程序能识别Hadoop中的hdfs URL方案还需要做一些工作:
通过 FsUrlStreamHandlerFactory 实例调用 java.net.URL对象的 setURLStreamHandlerFactory()方法
每个Java虚拟机只能调用一次这个方法,因此通常在静态方法中使用。
代码如下:
package com.dragon.hadoop.hdfs;
import java.io.InputStream;
import java.net.URL;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;
public class FromURLHadoop {
static{
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}
public static void main(String[] args) {
InputStream io=null;
try{
io =new URL("hdfs://master:9000/zijian.txt").openStream();
IOUtils.copyBytes(io, System.out,4000,false);
}catch(Exception e){
e.printStackTrace();
}finally{
IOUtils.closeStream(io);
}
}
}
HDFS的Java-API操作
一、了解一下Configuration类
-
org.apache.hadoop.conf.Configuration类的作用
1)Resources:资源定义与加载
资源默认按顺序加载:core-default.xml --> core-site.xml --> 代码段
资源的描述通过name/value键值对定义
2)Final Parameters:不变参数
配置参数可以声明为final。一旦资源声明了一个值final,以后加载的资源就不能更改该值
3)Variable Expansion:参数表达式
值字符串首先处理变量展开
-
Configuration用法:如果访问hdfs,在java API中需调用以下几种方式,来获取Resources
a.通过conf.set(“fs.defaultFS”, “hdfs://master:9000”);【“属性”,“属性值”】
b.通过conf.addResource(“core”); 将core文件【你的集群中core-site.xml文件】添加至classpath路径下
c.默认将core-site.xml文件添加至classpath路径下,直接拖放到你的src包下
-
默认情况下,先加载core-default.xml,然后加载core-site.xml,在静态代码块中执行
static{
ClassLoader cL = Thread.currentThread().getContextClassLoader();
if (cL == null) {
cL = Configuration.class.getClassLoader();
}
if(cL.getResource("hadoop-site.xml")!=null) {
LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. " +
"Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, "
+ "mapred-site.xml and hdfs-site.xml to override properties of " +
"core-default.xml, mapred-default.xml and hdfs-default.xml " +
"respectively");
}
addDefaultResource("core-default.xml");
addDefaultResource("core-site.xml");
}
二、FileSystem
- FileSystem:一个相当通用的文件系统的抽象基类,相当于一个连接。它可以作为分布式文件系统实现,也可以作为反映本地连接磁盘的“本地”文件系统实现。
- 一般分为两类:HDFS:【org.apache.hadoop.hdfs.DistributedFileSystem】和Local【org.apache.hadoop.fs.LocalFileSystem】
- FileSyastem的使用:
1)首先new一个Configuration,如果configuration中没有设置相关的参数,那么会取集群的配置文件,获取里面的配置信息。
2)然后直接调用FileSystem.get()方法得到一个FileSystem对象 - FileSystem中有很多方法,如exists、delete、mkdir、create等等一些常用的文件操作方法。
通过FileSystem API对HDFS进行读操作
public static void readDatafromHDFS(){
Configuration conf=new Configuration();
System.out.println(conf.get("fs.defaultFS"));
try {
FileSystem fs =FileSystem.get(conf);
Path file=new Path("hdfs://master:9000/zijian.txt");
FSDataInputStream fsinput=fs.open(file);
IOUtils.copyBytes(fsinput, System.out,4096,false);
fsinput.close();
fs.close();
} catch (IOException e) {
e.printStackTrace();
}
}
通过HDFS API创建文件夹
public static void mkdirByAPI(){
Configuration conf =new Configuration();
try {
DistributedFileSystem fs=(DistributedFileSystem) FileSystem.get(conf);
fs.mkdirs(new Path("/fromapi"));
fs.close();
} catch (IOException e) {
e.printStackTrace();
}
}
获取文件的信息【FileStatus对象】
FileStatus:用来描述文件的属性,FileStatus中有一系列的方法,可以得到文件的信息。
public static void getFileStatus(){
Configuration conf=new Configuration();
try {
DistributedFileSystem dfs =(DistributedFileSystem) FileSystem.get(conf);
FileStatus fst =dfs.getFileStatus(new Path("hdfs://master:9000/zijian.txt"));
System.out.println("getAccessTime:"+fst.getAccessTime());
System.out.println("getBlockSize:"+fst.getBlockSize());
System.out.println("getGroup:"+fst.getGroup());
System.out.println("getLen:"+fst.getLen());
System.out.println("getReplication:"+fst.getReplication());
} catch (IOException e) {
e.printStackTrace();
}
}
public static void getFileStatusArray(){
Configuration conf =new Configuration();
try {
DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.get(conf);
FileStatus[] fs_arr=dfs.listStatus(new Path("hdfs://master:9000/"));
Path[] path_arr=FileUtil.stat2Paths(fs_arr);
for (Path path : path_arr) {
System.out.println(path);
}
} catch (IOException e) {
e.printStackTrace();
}
}
通过HDFS API进行写操作+动态设置相关参数
public static void writeByAPI(){
Configuration conf=new Configuration();
try {
DistributedFileSystem dfs =(DistributedFileSystem) FileSystem.get(conf);
Path file =new Path("hdfs://master:9000/zijian.txt");
FSDataOutputStream fsos=dfs.create(file);
fsos.writeBytes("hello world,the best for you");
fsos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void writeByAPIBlocksize(){
Configuration conf=new Configuration();
conf.set("dfs.namenode.fs-limits.min-block-size","1");
conf.set("dfs.bytes-per-checksum", "1");
try {
DistributedFileSystem dfs =(DistributedFileSystem) FileSystem.get(conf);
Path file =new Path("hdfs://master:9000/hello");
FSDataOutputStream fsos=dfs.create(file,true,4096,(short)2,3);
fsos.writeBytes("hensi the world");
fsos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
通过HDFS API进行append追加操作
public static void appendByAPI(){
Configuration conf =new Configuration();
conf.set("dfs.bytes-per-checksum", "1");
conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
try {
DistributedFileSystem dfs = (DistributedFileSystem)FileSystem.get(conf);
Path file=new Path("hdfs://master:9000/hello");
FSDataOutputStream fsos=dfs.append(file);
fsos.writeBytes("hello world");
fsos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
获取文件的块位置信息
FileStatus对象和Blocklocation[]对象
getFileBlockLocations(FileStatus源数据,偏移量的起始位置,字节长度)
public static void getBlockLocation(){
Configuration conf =new Configuration();
try {
DistributedFileSystem dfs =(DistributedFileSystem) FileSystem.get(conf);
FileStatus fst=dfs.getFileStatus(new Path("hdfs://master:9000/hello"));
BlockLocation[] bl_arr=dfs.getFileBlockLocations(fst, 4, 2);
for (BlockLocation blockLocation : bl_arr) {
String[] hosts=blockLocation.getHosts();
for(String host:hosts){
System.out.println(host);
}
String[] names=blockLocation.getNames();
for (String name : names) {
System.out.println(name);
}
String[] topaths=blockLocation.getTopologyPaths();
for (String topath : topaths) {
System.out.println(topath);
}
}
} catch (IOException e) {
e.printStackTrace();
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)