HDFS的Java接口

至少有两种方法:

1、java.net.URL
把解析 " hdfs://localhost:9000:/user/liheyuan/xxx " 这种HDFS URL的解析器加到java.net.URL中,然后用传统Java的URL相关接口搞定。

import java.io.InputStream;
import java.net.URL;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;

public class HDFS1 {

	// Is a must and only called once each JVM
	static {
		URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
	}

	public static void main(String[] args) {
		InputStream in = null;
		try {
			in = new URL("hdfs://localhost:9000/user/liheyuan/test.txt").openStream();
			IOUtils.copyBytes(in, System.out, 1024);
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			IOUtils.closeStream(in);
		}
	}

}

2、HDFS专有的API访问

org.apache.hadoop.fs.Path HDFS里所有文件的路径。

org.apache.hadoop.fs.FileSystem HDFS文件系统主类,通过FileSystem.get(Configuration)获取,不能直接new。

步骤:
(1) 获取FileSytstem,
FileSystem fs = FileSystem.get("hdfs://ip:port",new Configuration());
(2) 读操作
FSDataInputStream in = fs.open("hdfs://ip:port/path/to/file");
然后这个in就是和其他InputStream一样处理啦!

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class HDFS2 {

	public static void main(String[] args) throws Exception {
		// The URI set the file you want from HDFS
		String hdfs_uri = "hdfs://localhost:9000";
		String file_uri = "hdfs://localhost:9000/user/liheyuan/test.txt";
		FileSystem fs = FileSystem.get(URI.create(hdfs_uri),
				new Configuration());
		FSDataInputStream in = null;
		try {
			in = fs.open(new Path(file_uri));
			IOUtils.copyBytes(in, System.out, 1024);
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			IOUtils.closeStream(in);
		}

	}
}

3、关于读数据接口:FSDataInputStream
它除了支持传统InputStream的接口特性外,还支持:
(1)任意绝对位置的seek
(2)从任意pos开始读取x字节数据

4、写数据接口:FSDataOutputStream
与input类似,获得这个对象是伴随着fs.create()而产生的。
然后按照使用OutputStream的方法使用即可。
然而,FSDataOutputStream不支持seek() !!
考虑到类似GFS的块存储架构,这也是在情理之中的。

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class HDFSCopy {

	public static void main(String[] args) throws Exception {
		if (args.length != 2) {
			System.err.println("Usage:");
			System.err.println("java HDFSCopy local_path hdfs_path");
			System.exit(-1);
		}

		// File on local and hdfs
		String local_path = args[0];
		String hdfs_path = args[1];
		String hdfs = "hdfs://localhost:9000";

		// Open Local Input
		InputStream in = new BufferedInputStream(
				new FileInputStream(local_path));
		// Open remote Output
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(hdfs), conf);
		FSDataOutputStream out = fs.create(new Path(hdfs_path));
		// Copy
		IOUtils.copyBytes(in, out, conf);
	}
}

5、创建目录
也是用FileSystem的接口:
fs.mkdirs()

6、删除文件/目录
fs.delete(Path path,boolean recursive)
第一个参数为要删除的路径。
第二个参数为是否递归删除,即rm -r。

6、其他API

(1) 获取文件属性
FileStatus stat = fs.getFileStatus(new Path("xxx"));
stat.getLen()/getOwner()....

(2) 列出目录文件
fs.FileStatus[] stats = fs.listStatus(Path f)
然后对stats遍历

(3) 列出匹配文件名的文件
方法1: 使用fs.globStatus
FileStatus[] stats = fs.globStatus(new Path("pattern"))
其中,pattern类似于正则,支持* [a-z] [^a-z] ?等

方法2:使用PathFilter,用于listStatus的第二个参数上

public interface PathFilter {
boolean accept(Path path);
}

7、FSDataOutputStream写块的时候,只有写满了一个块,其内容才能被Reader读取。如果要强制刷新,需要out.sync()函数。out.close()时会自动调用sync()。

8、显然,频繁调用sync()会影响性能,因此,在性能和正确性之间需要做出一定的权衡。

9、distcp可用于并行I/O操作。
比如,用于两个HDFS集群之间的复制。
可以在distcp后加参数:
-update 如果文件已经存在,只更新最新的
-overwrite 如果文件已经存在,直接覆盖

hadoop distcp hdfs://namenode1/foo hdfs://namenode2/bar

8、实际上,distcp是用特殊的Map/Reduce任务实现的:只有Map,没有Reduce。

9、使用distcp时,一定要注意保持两个HDFS集群的版本一样!否则可能因为RPC格式不同而失败!

10、Hadoop的Archieves,可以将许多小文件包装成一个大文件(Archieve),便于Hadoop的操作和处理。这样处理的时候直接处理这个大文件,而无需遍历N个小文件。
三个参数是这样的:
(1)Archieves文件
(2)小文件的文件夹
(3)Archieves目的目录

#将小文件/my/files打包为files.har,放置于/my下
hadoop archive -archiveName files.har /my/files /my
#列一下har文件
% hadoop fs -ls /my/files.har
0 2009-04-09 19:13 /my/files
0 2009-04-09 19:13 /my/files.har
165 2009-04-09 19:13 /my/files.har/_index
23 2009-04-09 19:13 /my/files.har/_masterindex
2 2009-04-09 19:13 /my/files.har/part-0

可以看到,har文件下又_index、_masterindex和part-0,part文件就是小文建concat连接起来的。

11、Archieves文件的缺点
(1)需要与小文建同等的目录
(2)Archieves文件一旦生成,就是只读的。
(3)Map/Reduce时,处理效率仍会比较低。

Leave a Reply

Your email address will not be published. Required fields are marked *