HDFS中API的使用

在项目开发中,有时我们需要通过HDFS的api来对文件进行操作,比如将数据上传到HDFS或者从HDFS获取数据等。本篇来介绍一下HDFS中API的具体使用。直接上代码:

package com.lzb.hdfs.fs;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;

import java.io.IOException;

public class HDFSHelper {

    private FileSystem fs;

    public HDFSHelper() {
        fs = getFileSystem();
    }

    /**
     * Configuration是配置对象,conf可以理解为包含了所有配置信息的一个集合,可以认为是Map,
     * 在初始化的时候底层会加载一堆配置文件 core-site.xml;hdfs-site.xml;mapred-site.xml;yarn-site.xml
     * 如果需要项目代码自动加载配置文件中的信息,那么就必须把配置文件改成-default.xml或者-site.xml的名称,
     * 而且必须放置在src下,如果不叫这个名,或者不在src下,也需要加载这些配置文件中的参数,必须使用conf对象提供的方法手动加载.
     * 依次加载的参数信息的顺序是:
     * 1.加载core/hdfs/mapred/yarn-default.xml
     * 2.加载通过conf.addResource()加载的配置文件
     * 3.加载conf.set(name,value)
     */
    private Configuration getConfiguration(){
        Configuration conf = new Configuration();
        //conf.addResource("xxx");
        //conf.set("xxx","xxx");
        //Configuration.addDefaultResource("core-site.xml");
        //Configuration.addDefaultResource("hdfs-site.xml");
        //conf.set("fs.default.name","hdfs://probd01:8020");

        //HA模式的配置
        conf.set("fs.defaultFS", "hdfs://probd");
        conf.set("dfs.nameservices", "probd");
        conf.set("dfs.ha.namenodes.probd", "nn1,nn2");
        conf.set("dfs.namenode.rpc-address.probd.nn1", "probd01:8020");
        conf.set("dfs.namenode.rpc-address.probd.nn2", "probd02:8020");
        conf.set("dfs.client.failover.proxy.provider.probd", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");

        //防止报错:no FileSystem for scheme: hdfs...
        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
        return conf;
    }

    /**
     * 获取文件系统
     * 本地文件系统为LocalFileSystem,URL形式:    file:///c:myProgram
     * HDFS文件系统为DistributedFileSystem,URL形式:    fs.defaultFS=hdfs://hadoop01:9000
     */
    public FileSystem getFileSystem(){
        Configuration conf = getConfiguration();
        FileSystem fs = null;
        try {
            fs = FileSystem.get(conf);
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println(fs);
        return fs;
    }

    /**
     * 上传本地文件到HDFS,底层就是采用流的方式
     * @param localPath 本地文件路径
     * @param remotePath HDFS文件路径
     * @return 是否上传成功
     */
    public boolean copyFromLocal(String localPath,String remotePath){
        if(fs == null) return false;
        try {
            fs.copyFromLocalFile(new Path(localPath),new Path(remotePath));
        } catch (IOException e) {
            e.printStackTrace();
        }
        return true;
    }

    /**
     * 从HDFS下载文件,底层就是采用流的方式
     * @param remotePath HDFS文件路径
     * @param localPath 本地路径
     * @return 是否下载成功
     */
    public boolean copyToLocal(String remotePath,String localPath){
        if(fs == null) return false;
        try {
            fs.copyToLocalFile(new Path(remotePath),new Path(localPath));
        } catch (IOException e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }

    /**
     * 获取目录下的文件
     * @param remotePath HDFS文件路径
     * @param recursive 是否级联(该文件夹下面如果还有子文件 要不要看,注意没有 子文件夹!!)
     */
    public void listFiles(String remotePath,boolean recursive){
        if(fs == null) return;
        try {
            RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path(remotePath), recursive);
            while (iterator.hasNext()){
                LocatedFileStatus fileStatus = iterator.next();

                //文件的存储路径,以hdfs://开头的全路径 ==> hdfs://hadoop01:9000/a/gg.txt
                System.out.println( "file path === " + fileStatus.getPath());

                //文件名
                System.out.println("file name === " + fileStatus.getPath().getName());

                //文件长度
                System.out.println("file size === "+fileStatus.getLen());

                //文件所有者
                System.out.println("file owner === "+fileStatus.getOwner());

                //分组信息
                System.out.println("file group === " + fileStatus.getGroup());

                //文件权限信息
                System.out.println("file permission === " + fileStatus.getPermission());

                //文件副本数
                System.out.println("file blocks === " + fileStatus.getReplication());

                //块大小
                System.out.println("file block size === " + fileStatus.getBlockSize());

                //块位置相关信息
                BlockLocation[] blockLocations = fileStatus.getBlockLocations();

                //块的数量
                System.out.println("file block nums === " + blockLocations.length);

                for (BlockLocation bl : blockLocations) {
                    String[] hosts = bl.getHosts();
                    for (String host: hosts) {
                        System.out.println("block host === " + host);
                    }
                    //块的一个逻辑路径
                    bl.getTopologyPaths();
                }

            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }


    /**
     * 获取目录下的文件
     * 此方法与listFiles不同,不支持传true或false,即不能级联,如果想实现级联就采用递归的方式
     * @param remotePath HDFS文件路径
     */
    public void listStatus(String remotePath){
        if(fs == null) return;
        try {
            FileStatus[] listStatus = fs.listStatus(new Path(remotePath));
            for (FileStatus fss : listStatus) {
                //判断是不是文件夹
                boolean directory = fss.isDirectory();

                //判断是不是文件
                boolean file = fss.isFile();

                String name = fss.getPath().getName();

                if(file) {
                    System.out.println(name+":文件");
                }else {
                    System.out.println(name+":文件夹");
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    /**
     * 删除空文件夹或空文件
     * @param path
     */
    public void deleteEmptyDirAndFile(Path path){
        if(fs == null) return;
        try {

            FileStatus[] listStatus = fs.listStatus(path);
            if(listStatus.length == 0){
                //删除空文件夹
                fs.delete(path,true);
                return;
            }

            RemoteIterator<LocatedFileStatus> iterator = fs.listLocatedStatus(path);

            while (iterator.hasNext()) {
                LocatedFileStatus next = iterator.next();
                Path currentPath = next.getPath();
                Path parentPath = next.getPath().getParent();


                if (next.isDirectory()) {
                    // 如果是空文件夹
                    if (fs.listStatus(currentPath).length == 0) {
                        // 删除掉
                        fs.delete(currentPath, true);
                    } else {
                        // 不是空文件夹,那么则继续遍历
                        if (fs.exists(currentPath)) {
                            deleteEmptyDirAndFile(currentPath);
                        }
                    }
                } else {
                    // 获取文件的长度
                    long fileLength = next.getLen();
                    // 当文件是空文件时, 删除
                    if (fileLength == 0) {
                        fs.delete(currentPath, true);
                    }
                }

                // 当空文件夹或者空文件删除时,有可能导致父文件夹为空文件夹,
                // 所以每次删除一个空文件或者空文件的时候都需要判断一下,如果真是如此,那么就需要把该文件夹也删除掉
                int length = fs.listStatus(parentPath).length;
                if (length == 0) {
                    fs.delete(parentPath, true);
                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        }

    }


    /**
     * 创建文件夹
     * @param remotePath HDFS文件路径
     * @return 是否创建成功
     */
    public boolean mkdir(String remotePath){
        if(fs == null) return false;
        boolean success = false;
        try {
            success = fs.mkdirs(new Path(remotePath));
        } catch (IOException e) {
            e.printStackTrace();
        }
        return success;
    }

    /**
     * 写入文件
     * @param remotePath HDFS文件路径
     * @param content 内容
     * @return 是否写入成功
     */
    public boolean writeToFile(String remotePath,String content){
        if(fs == null) return false;
        try {
            FSDataOutputStream out = fs.create(new Path(remotePath));
            out.writeUTF(content);
            out.close();
        } catch (IOException e) {
            e.printStackTrace();
            return false;
        }
        return true;

    }

    /**
     * 读取文件数据
     * @param remotePath HDFS文件路径
     * @return 读取的结果数据
     */
    public String readFromFile(String remotePath){
        String result = null;
        if(fs == null) return null;
        try {

            FSDataInputStream in = fs.open(new Path(remotePath));
            result = in.readUTF();
            in.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return result;
    }


    /**
     * 重命名文件
     * @param oldPath 旧文件路径
     * @param newPath 新文件路径
     * @return 是否重命名成功
     */
    public boolean renameFile(String oldPath,String newPath){
        if(fs == null) return false;
        Path old=new Path(oldPath);
        Path now=new Path(newPath);
        boolean rename = false;
        try {
            rename = fs.rename(old, now);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return rename;
    }

    /**
     * 删除目录和文件
     * @param remotePath HDFS文件路径
     * @return 是否删除成功
     */
    public boolean deleteFile(String remotePath){
        if(fs == null) return false;
        boolean success = false;
        try {
            success = fs.delete(new Path(remotePath), true);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return success;
    }

    /**
     * 检查文件是否存在
     * @param remotePath HDFS文件路径
     * @return 是否存在
     */
    public boolean existFile(String remotePath){
        if(fs == null) return false;
        boolean exist = false;
        try {
            exist = fs.exists(new Path(remotePath));
        } catch (IOException e) {
            e.printStackTrace();
        }
        return exist;
    }


    /**
     * 关闭FileSystem
     */
    public void closeFileSystem(){
        if(fs != null){
            try {
                fs.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}

上面代码都有注释,这里就不过多解释了,下面来看主类的执行代码:

package com.lzb.hdfs;


import com.lzb.hdfs.fs.HDFSHelper;

public class Demo {
    public static void main(String[] args) {

        HDFSHelper hdfsHelper = new HDFSHelper();

        String dir = "/test";
        String filename = "hello.txt";
        String path = dir + "/" + filename;

        boolean exist = hdfsHelper.existFile(path);
        System.out.println(path + " exist file ==> " + exist);

        if(!exist){
            boolean mkdir = hdfsHelper.mkdir(dir);
            System.out.println(dir + " create success ==> " + mkdir);

            boolean copyFromLocal = hdfsHelper.copyFromLocal("/"+filename, dir);
            System.out.println("upload success ==> " + copyFromLocal);

            hdfsHelper.listFiles(dir,false);

            String content = "hello world new";
            boolean write = hdfsHelper.writeToFile(path, content);
            System.out.println("write success ==> " + write);

            String data = hdfsHelper.readFromFile(path);
            System.out.println("read the data ==> " + data);

            String newPath = dir + "/hello2.txt";
            boolean renameFile = hdfsHelper.renameFile(path, newPath);
            System.out.println("rename success ==> " + renameFile);

            boolean copyToLocal = hdfsHelper.copyToLocal(newPath, "/hello2.txt");
            System.out.println("download success ==> " + copyToLocal);

            //boolean deleteFile = hdfsHelper.deleteFile(newPath);
            //System.out.println("delete success ==> " + deleteFile);

        }

        hdfsHelper.closeFileSystem();

    }
}

执行结果如下:

log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1866182384_1, ugi=root (auth:SIMPLE)]]
/test/hello.txt exist file ==> false
/test create success ==> true
upload success ==> true
file path === hdfs://probd/test/hello.txt
file name === hello.txt
file size === 12
file owner === root
file group === supergroup
file permission === rw-r--r--
file blocks === 3
file block size === 134217728
file block nums === 1
block host === Probd01
block host === Probd03
block host === Probd02
write success ==> true
read the data ==> hello world new
rename success ==> true
download success ==> true
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342