背景
从事大数据以来,主要是spark的开发,时不时总是会用到一些hdfs的上接口;半途开始大数据,对hdfs不是很熟悉。每次用都是百度。所以这里做个总结,为了以后能更好的用好hdfs。
先说下几个重要的类
FileSystem
FileSystem这个类非常重要,相当于一个连接,类似数据库中一个连接;连接还是比较费资源的,因此可以弄一个连接池,后面复用这些连接就可以了。
一般获取的代码都是固定不变的。如下:
public static FileSystem getDFS() throws IOException {
conf = new Configuration();
//设置安全验证方式为kerberos
conf.set("hadoop.security.authentication", "kerberos");
conf.set("fs.defaultFS", "hdfs://...");
return FileSystem.get(conf);
}
- 首先需要new一个Configuration,如果configuration中没有设置相关的参数,那么会取集群的配置文件,获取里面的配置信息。(一般我们也建议,不设置;因为主节点可能会变)
- 然后直接调用FileSystem.get()方法就可以得到一个FileSystem对象了。
FileSystem中有很多方法,跟File中的方法非常类似,如exists、delete、mkdir、create等等一些常用的文件操作方法。
这里主要看读和写方法,读是open方法
/**
* Opens an FSDataInputStream at the indicated Path.
* @param f the file to open
*/
public FSDataInputStream open(Path f) throws IOException {
return open(f, getConf().getInt("io.file.buffer.size", 4096));
}
这个方法有一个读缓存,默认是4096,如果想要设置这个值,可以使用DistributedFileSystem类中open方法
open方法就是得到一个输入流,这里再次强调java中的io相当重要啊,要是理解了java中的io这里的操作看下api就会了。
写的话,只有append方法,而且一般是不推荐不适用该方法的,这个代价会比较大。hdfs文件系统也是不支持修改操作的。append方法见名知意,就是在文件后面进行追加。(因为文件是分块存放的,而且还有几个副本,修改的代价会非常大)
create方法也要说一下,这个创建文件也分为两种,覆盖和不覆盖;create有很多重载的方法,选择一个自己用的就可以了。
还有几种比较重要的方法;后面会提到,什么globStatus之类的
FileStatus
字面意思是文件的状态,其实我更倾向于理解为文件的属性,FileStatus中有一系列的方法,可以得到文件的信息。
像一些getLen()得到文件的长度,以字节的形式。isFile、isDirectory、getReplication一些见名知意的方法,就不多说了。
setOwner、setGroup、setPermission这些修改的方法,在外面没法使用,就不多说了。
之所有要先说这个FileStatus,主要是为了好介绍下面非常有用的方法。
globStatus
这个不是一个类,是一个方法,但是很重要。
我们经常需要处理一批文件,有通配符来匹配是非常方便的。FileSystem提供了两个通配符的方法
public FileStatus[] globStatus(Path pathPattern) throws IOException
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException
globStatus()方法返回于其路径匹配的所有文件的FileStatus对象数组,并按路径排序。PathFilter可以作为选项进一步对匹配结果进行限制。对于通配符的支持可以看另一篇文章hadoop支持的通配符
IOUtils
IOUtils这样类,在文件系统中一般都会有,这里主要是介绍hadoop中的。
下面来看个例子,将本地文件复制到Hadoop文件系统
public static void main(String[] args) throws IOException {
File file = new File("E:\\文件\\学习/apache_hbase_reference_guide.pdf");
InputStream in = new BufferedInputStream(new FileInputStream(file));
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://...:9000");
FileSystem fileSystem = FileSystem.get(conf);
float fileSize = file.length();
FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path("hdfs://.../world.pdf"), new Progressable() {
int count = 1;
@Override
public void progress() {
System.out.println((count * 1024 * 64)/fileSize);
count ++;
}
});
IOUtils.copyBytes(in, fsDataOutputStream, 4096, true);
}
Progressable类主要是展示进度的,重写的 progress 方法在每次上传了64KB(不是绝对的64KB,会有一定的偏差)字节大小的文件之后会自动调用一次;因此我们给出一个大概的上传进度。
我们这里主要是IkanIOUtils.copyBytes方法,这个方法有很多重载的。
先看刚刚使用的
/**
* Copies from one stream to another.
*
* @param in InputStrem to read from
* @param out OutputStream to write to
* @param buffSize the size of the buffer
* @param close whether or not close the InputStream and
* OutputStream at the end. The streams are closed in the finally clause.
*/
public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) throws IOException
复制整个流,可以设置缓冲的大小。
注释写的非常清楚,一般也是建议使用这个方法,特别是boolean close这个参数,最好传递true,免得流没有关闭,导致其他的一些问题。有一个方法和这个非常类似
/**
* Copies count bytes from one stream to another.
*
* @param in InputStream to read from
* @param out OutputStream to write to
* @param count number of bytes to copy
* @param close whether to close the streams
* @throws IOException if bytes can not be read or written
*/
public static void copyBytes(InputStream in, OutputStream out, long count, boolean close) throws IOException
这个方法是复制long count的byte的数据。这个用的非常少,但是很容易和上面弄混。
FileUtil
和其他文件系统一样,hadoop中也有FileUtil这个工具类。先来看看这个stat2Paths,这个方法会将一个数组的status转化为数组的path对象。
/**
* convert an array of FileStatus to an array of Path
*
* @param stats
* an array of FileStatus objects
* @return an array of paths corresponding to the input
*/
public static Path[] stat2Paths(FileStatus[] stats) {
if (stats == null)
return null;
Path[] ret = new Path[stats.length];
for (int i = 0; i < stats.length; ++i) {
ret[i] = stats[i].getPath();
}
return ret;
}
很多方法,看下api基本上就会用了,下面主要介绍一下,上传和下载;首先来看看上传,也就是将文件本地拷贝到hadoop文件系统,上面用IOUtils实现了一遍,下面用FileUtil也来实现一遍
private static void copyFileByFile() throws IOException {
File file = new File("E:\\文件\\学习/apache_hbase_reference_guide.pdf");
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs:/...:9000");
FileSystem fileSystem = FileSystem.get(conf);
FileUtil.copy(file, fileSystem, new Path("/dxy/hello.pdf"), false, conf);
}
这个使用比IOUtils来的要方便点,但是没有进度的展示。根据实际需求使用吧
从hadoop中下载和上面非常类似,就是hadoop文件系统拷贝到本地
private static void copyFileToLocal() throws IOException {
File file = new File("E:\\文件\\学习/hello.pdf");
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://...:9000");
FileSystem fileSystem = FileSystem.get(conf);
FileUtil.copy(fileSystem, new Path("/dxy/hello.pdf"), file, false, conf);
}
还有hadoop上复制文件,和上面的也非常类似,只是目标的参数变了。
下面要介绍一个比较重要的方法,合并文件,因为hadoop中对小文件的处理是非常不擅长的,因此我们可能需要对小文件进行合并。FileUtil中提供了一个方法copyMerge方法,
/** Copy all files in a directory to one output file (merge). */
public static boolean copyMerge(FileSystem srcFS, Path srcDir,
FileSystem dstFS, Path dstFile,
boolean deleteSource,
Configuration conf, String addString) throws IOException {
// 简直合并之后的文件名是否满足要求
dstFile = checkDest(srcDir.getName(), dstFS, dstFile, false);
// 如果要合并的目录,下面还是目录,则返回false
if (!srcFS.getFileStatus(srcDir).isDirectory())
return false;
// 创建目标文件
OutputStream out = dstFS.create(dstFile);
try {
FileStatus contents[] = srcFS.listStatus(srcDir);
Arrays.sort(contents); // 按照文件名进行排序
for (int i = 0; i < contents.length; i++) {
if (contents[i].isFile()) {
InputStream in = srcFS.open(contents[i].getPath());
try {
// 将要合并的文件复制到目标文件中,这里conf传递过去,就是为了得到io.file.buffer.size参数,作为写缓存的大小
IOUtils.copyBytes(in, out, conf, false);
if (addString!=null)
// 每个合并文件完了之后,添加一个addString
out.write(addString.getBytes("UTF-8"));
} finally {
in.close();
}
}
}
} finally {
out.close();
}
if (deleteSource) {
return srcFS.delete(srcDir, true);
} else {
return true;
}
}
代码还是比较简单的。如果只是简单的合并,这个方法完全够用了。如果有个需求,合并之后,仍然可以找到之前文件名对应的文件内容;当然我们也可以改写这个方法,将addString改为文件的名称,只需将添加addString代码去掉,添加上下面这行就行了。
out.write(contents[i].getPath().getName().getBytes("UTF-8"));
当然为了更快的读到想要的内容,我也可以弄一个类似的目录的东西,快速定位到文件内容在哪。
hadoop的api还是很容易上手的,多用多总结。