HDFS的API简介

背景

从事大数据以来,主要是spark的开发,时不时总是会用到一些hdfs的上接口;半途开始大数据,对hdfs不是很熟悉。每次用都是百度。所以这里做个总结,为了以后能更好的用好hdfs。

先说下几个重要的类

FileSystem

FileSystem这个类非常重要,相当于一个连接,类似数据库中一个连接;连接还是比较费资源的,因此可以弄一个连接池,后面复用这些连接就可以了。
一般获取的代码都是固定不变的。如下:

public static FileSystem getDFS() throws IOException {
    conf = new Configuration();
    //设置安全验证方式为kerberos
    conf.set("hadoop.security.authentication", "kerberos");
    
    conf.set("fs.defaultFS", "hdfs://...");
    
    return FileSystem.get(conf);
}
  1. 首先需要new一个Configuration,如果configuration中没有设置相关的参数,那么会取集群的配置文件,获取里面的配置信息。(一般我们也建议,不设置;因为主节点可能会变)
  2. 然后直接调用FileSystem.get()方法就可以得到一个FileSystem对象了。

FileSystem中有很多方法,跟File中的方法非常类似,如exists、delete、mkdir、create等等一些常用的文件操作方法。

这里主要看读和写方法,读是open方法

  /**
   * Opens an FSDataInputStream at the indicated Path.
   * @param f the file to open
   */
  public FSDataInputStream open(Path f) throws IOException {
    return open(f, getConf().getInt("io.file.buffer.size", 4096));
  }

这个方法有一个读缓存,默认是4096,如果想要设置这个值,可以使用DistributedFileSystem类中open方法

open方法就是得到一个输入流,这里再次强调java中的io相当重要啊,要是理解了java中的io这里的操作看下api就会了。

写的话,只有append方法,而且一般是不推荐不适用该方法的,这个代价会比较大。hdfs文件系统也是不支持修改操作的。append方法见名知意,就是在文件后面进行追加。(因为文件是分块存放的,而且还有几个副本,修改的代价会非常大)

create方法也要说一下,这个创建文件也分为两种,覆盖和不覆盖;create有很多重载的方法,选择一个自己用的就可以了。

还有几种比较重要的方法;后面会提到,什么globStatus之类的

FileStatus

字面意思是文件的状态,其实我更倾向于理解为文件的属性,FileStatus中有一系列的方法,可以得到文件的信息。

像一些getLen()得到文件的长度,以字节的形式。isFile、isDirectory、getReplication一些见名知意的方法,就不多说了。

setOwner、setGroup、setPermission这些修改的方法,在外面没法使用,就不多说了。

之所有要先说这个FileStatus,主要是为了好介绍下面非常有用的方法。

globStatus

这个不是一个类,是一个方法,但是很重要。
我们经常需要处理一批文件,有通配符来匹配是非常方便的。FileSystem提供了两个通配符的方法

public FileStatus[] globStatus(Path pathPattern) throws IOException

public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException

globStatus()方法返回于其路径匹配的所有文件的FileStatus对象数组,并按路径排序。PathFilter可以作为选项进一步对匹配结果进行限制。对于通配符的支持可以看另一篇文章hadoop支持的通配符

IOUtils

IOUtils这样类,在文件系统中一般都会有,这里主要是介绍hadoop中的。

下面来看个例子,将本地文件复制到Hadoop文件系统

public static void main(String[] args) throws IOException {
        File file = new File("E:\\文件\\学习/apache_hbase_reference_guide.pdf");
        InputStream in = new BufferedInputStream(new FileInputStream(file));

        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://...:9000");
        FileSystem fileSystem = FileSystem.get(conf);
        float fileSize = file.length();
        FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path("hdfs://.../world.pdf"), new Progressable() {
            int count = 1;
            @Override
            public void progress() {
                System.out.println((count * 1024 * 64)/fileSize);
                count ++;
            }
        });

        IOUtils.copyBytes(in, fsDataOutputStream, 4096, true);
    }

Progressable类主要是展示进度的,重写的 progress 方法在每次上传了64KB(不是绝对的64KB,会有一定的偏差)字节大小的文件之后会自动调用一次;因此我们给出一个大概的上传进度。
我们这里主要是IkanIOUtils.copyBytes方法,这个方法有很多重载的。

先看刚刚使用的

  /**
   * Copies from one stream to another.
   *
   * @param in InputStrem to read from
   * @param out OutputStream to write to
   * @param buffSize the size of the buffer 
   * @param close whether or not close the InputStream and 
   * OutputStream at the end. The streams are closed in the finally clause.  
   */
  public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close)  throws IOException 

复制整个流,可以设置缓冲的大小。
注释写的非常清楚,一般也是建议使用这个方法,特别是boolean close这个参数,最好传递true,免得流没有关闭,导致其他的一些问题。有一个方法和这个非常类似

  /**
   * Copies count bytes from one stream to another.
   *
   * @param in InputStream to read from
   * @param out OutputStream to write to
   * @param count number of bytes to copy
   * @param close whether to close the streams
   * @throws IOException if bytes can not be read or written
   */
  public static void copyBytes(InputStream in, OutputStream out, long count, boolean close) throws IOException 

这个方法是复制long count的byte的数据。这个用的非常少,但是很容易和上面弄混。

FileUtil

和其他文件系统一样,hadoop中也有FileUtil这个工具类。先来看看这个stat2Paths,这个方法会将一个数组的status转化为数组的path对象。

  /**
   * convert an array of FileStatus to an array of Path
   * 
   * @param stats
   *          an array of FileStatus objects
   * @return an array of paths corresponding to the input
   */
  public static Path[] stat2Paths(FileStatus[] stats) {
    if (stats == null)
      return null;
    Path[] ret = new Path[stats.length];
    for (int i = 0; i < stats.length; ++i) {
      ret[i] = stats[i].getPath();
    }
    return ret;
  }

很多方法,看下api基本上就会用了,下面主要介绍一下,上传和下载;首先来看看上传,也就是将文件本地拷贝到hadoop文件系统,上面用IOUtils实现了一遍,下面用FileUtil也来实现一遍

private static void copyFileByFile() throws IOException {
        File file = new File("E:\\文件\\学习/apache_hbase_reference_guide.pdf");
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs:/...:9000");
        FileSystem fileSystem = FileSystem.get(conf);
        FileUtil.copy(file, fileSystem, new Path("/dxy/hello.pdf"), false, conf);
    }

这个使用比IOUtils来的要方便点,但是没有进度的展示。根据实际需求使用吧

从hadoop中下载和上面非常类似,就是hadoop文件系统拷贝到本地

private static void copyFileToLocal() throws IOException {
        File file = new File("E:\\文件\\学习/hello.pdf");
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://...:9000");
        FileSystem fileSystem = FileSystem.get(conf);
        FileUtil.copy(fileSystem, new Path("/dxy/hello.pdf"), file, false, conf);
    }

还有hadoop上复制文件,和上面的也非常类似,只是目标的参数变了。

下面要介绍一个比较重要的方法,合并文件,因为hadoop中对小文件的处理是非常不擅长的,因此我们可能需要对小文件进行合并。FileUtil中提供了一个方法copyMerge方法,

 /** Copy all files in a directory to one output file (merge). */
  public static boolean copyMerge(FileSystem srcFS, Path srcDir, 
                                  FileSystem dstFS, Path dstFile, 
                                  boolean deleteSource,
                                  Configuration conf, String addString) throws IOException {
    // 简直合并之后的文件名是否满足要求
    dstFile = checkDest(srcDir.getName(), dstFS, dstFile, false);   

    // 如果要合并的目录,下面还是目录,则返回false
    if (!srcFS.getFileStatus(srcDir).isDirectory())
      return false;
   // 创建目标文件
    OutputStream out = dstFS.create(dstFile);
    
    try {
      FileStatus contents[] = srcFS.listStatus(srcDir);
      Arrays.sort(contents);  // 按照文件名进行排序
      for (int i = 0; i < contents.length; i++) {
        if (contents[i].isFile()) {
          InputStream in = srcFS.open(contents[i].getPath());
          try {
            // 将要合并的文件复制到目标文件中,这里conf传递过去,就是为了得到io.file.buffer.size参数,作为写缓存的大小
            IOUtils.copyBytes(in, out, conf, false);
            if (addString!=null)
              // 每个合并文件完了之后,添加一个addString
              out.write(addString.getBytes("UTF-8"));
                
          } finally {
            in.close();
          } 
        }
      }
    } finally {
      out.close();
    }
    

    if (deleteSource) {
      return srcFS.delete(srcDir, true);
    } else {
      return true;
    }
  }  

代码还是比较简单的。如果只是简单的合并,这个方法完全够用了。如果有个需求,合并之后,仍然可以找到之前文件名对应的文件内容;当然我们也可以改写这个方法,将addString改为文件的名称,只需将添加addString代码去掉,添加上下面这行就行了。

     out.write(contents[i].getPath().getName().getBytes("UTF-8"));

当然为了更快的读到想要的内容,我也可以弄一个类似的目录的东西,快速定位到文件内容在哪。

hadoop的api还是很容易上手的,多用多总结。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,444评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,421评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,363评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,460评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,502评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,511评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,280评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,736评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,014评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,190评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,848评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,531评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,159评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,411评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,067评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,078评论 2 352

推荐阅读更多精彩内容