至少有两种方法:
1、java.net.URL
把解析 " hdfs://localhost:9000:/user/liheyuan/xxx " 这种HDFS URL的解析器加到java.net.URL中,然后用传统Java的URL相关接口搞定。
import java.io.InputStream; import java.net.URL; import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; import org.apache.hadoop.io.IOUtils; public class HDFS1 { // Is a must and only called once each JVM static { URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); } public static void main(String[] args) { InputStream in = null; try { in = new URL("hdfs://localhost:9000/user/liheyuan/test.txt").openStream(); IOUtils.copyBytes(in, System.out, 1024); } catch (Exception e) { e.printStackTrace(); } finally { IOUtils.closeStream(in); } } }
2、HDFS专有的API访问
org.apache.hadoop.fs.Path HDFS里所有文件的路径。
org.apache.hadoop.fs.FileSystem HDFS文件系统主类,通过FileSystem.get(Configuration)获取,不能直接new。
步骤:
(1) 获取FileSytstem,
FileSystem fs = FileSystem.get("hdfs://ip:port",new Configuration());
(2) 读操作
FSDataInputStream in = fs.open("hdfs://ip:port/path/to/file");
然后这个in就是和其他InputStream一样处理啦!
import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; public class HDFS2 { public static void main(String[] args) throws Exception { // The URI set the file you want from HDFS String hdfs_uri = "hdfs://localhost:9000"; String file_uri = "hdfs://localhost:9000/user/liheyuan/test.txt"; FileSystem fs = FileSystem.get(URI.create(hdfs_uri), new Configuration()); FSDataInputStream in = null; try { in = fs.open(new Path(file_uri)); IOUtils.copyBytes(in, System.out, 1024); } catch (Exception e) { e.printStackTrace(); } finally { IOUtils.closeStream(in); } } }
3、关于读数据接口:FSDataInputStream
它除了支持传统InputStream的接口特性外,还支持:
(1)任意绝对位置的seek
(2)从任意pos开始读取x字节数据
4、写数据接口:FSDataOutputStream
与input类似,获得这个对象是伴随着fs.create()而产生的。
然后按照使用OutputStream的方法使用即可。
然而,FSDataOutputStream不支持seek() !!
考虑到类似GFS的块存储架构,这也是在情理之中的。
import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.InputStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; public class HDFSCopy { public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage:"); System.err.println("java HDFSCopy local_path hdfs_path"); System.exit(-1); } // File on local and hdfs String local_path = args[0]; String hdfs_path = args[1]; String hdfs = "hdfs://localhost:9000"; // Open Local Input InputStream in = new BufferedInputStream( new FileInputStream(local_path)); // Open remote Output Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(hdfs), conf); FSDataOutputStream out = fs.create(new Path(hdfs_path)); // Copy IOUtils.copyBytes(in, out, conf); } }
5、创建目录
也是用FileSystem的接口:
fs.mkdirs()
6、删除文件/目录
fs.delete(Path path,boolean recursive)
第一个参数为要删除的路径。
第二个参数为是否递归删除,即rm -r。
6、其他API
(1) 获取文件属性
FileStatus stat = fs.getFileStatus(new Path("xxx"));
stat.getLen()/getOwner()....
(2) 列出目录文件
fs.FileStatus[] stats = fs.listStatus(Path f)
然后对stats遍历
(3) 列出匹配文件名的文件
方法1: 使用fs.globStatus
FileStatus[] stats = fs.globStatus(new Path("pattern"))
其中,pattern类似于正则,支持* [a-z] [^a-z] ?等
方法2:使用PathFilter,用于listStatus的第二个参数上
public interface PathFilter { boolean accept(Path path); }
7、FSDataOutputStream写块的时候,只有写满了一个块,其内容才能被Reader读取。如果要强制刷新,需要out.sync()函数。out.close()时会自动调用sync()。
8、显然,频繁调用sync()会影响性能,因此,在性能和正确性之间需要做出一定的权衡。
9、distcp可用于并行I/O操作。
比如,用于两个HDFS集群之间的复制。
可以在distcp后加参数:
-update 如果文件已经存在,只更新最新的
-overwrite 如果文件已经存在,直接覆盖
hadoop distcp hdfs://namenode1/foo hdfs://namenode2/bar
8、实际上,distcp是用特殊的Map/Reduce任务实现的:只有Map,没有Reduce。
9、使用distcp时,一定要注意保持两个HDFS集群的版本一样!否则可能因为RPC格式不同而失败!
10、Hadoop的Archieves,可以将许多小文件包装成一个大文件(Archieve),便于Hadoop的操作和处理。这样处理的时候直接处理这个大文件,而无需遍历N个小文件。
三个参数是这样的:
(1)Archieves文件
(2)小文件的文件夹
(3)Archieves目的目录
#将小文件/my/files打包为files.har,放置于/my下 hadoop archive -archiveName files.har /my/files /my #列一下har文件 % hadoop fs -ls /my/files.har 0 2009-04-09 19:13 /my/files 0 2009-04-09 19:13 /my/files.har 165 2009-04-09 19:13 /my/files.har/_index 23 2009-04-09 19:13 /my/files.har/_masterindex 2 2009-04-09 19:13 /my/files.har/part-0
可以看到,har文件下又_index、_masterindex和part-0,part文件就是小文建concat连接起来的。
11、Archieves文件的缺点
(1)需要与小文建同等的目录
(2)Archieves文件一旦生成,就是只读的。
(3)Map/Reduce时,处理效率仍会比较低。