HDFS-3.1.1 分布式文件系统 实验示例

大家好,我是Iggi。

今天我给大家分享的是HDFS-3.1.1版本的实验示例。

首先用一段文字简介HDFS:

HDFS是英文Hadoop Distributed File System的缩写。作为Hadoop的核心组件之一,它的设计思路参考于Google的GFS论文,是GFS的开源实现。由于HDFS自身的成熟稳定,加之拥有众多用户,现在已经成为当前分布式存储的事实标准。HDFS为Hadoop生态圈中的其它组件提供最基本的存储功能。它具有高容错性、高可靠性、高扩展性、高获得性、高吞吐率等特征为大数据存储和处理提供了强大的底层存储架构,可以说它是一切大数据平台的基础。HDFS采用Master/Slave架构,主服务器运行Master进程NameNode,从服务器Slave进程DataNode。它将集群中所有服务器的存储空间连接到一起,构成了一个统一的、海量的存储空间。下图是HDFS的架构图。

image

如果大家还想了解更多的有关HDFS的知识请访问:http://hadoop.apache.org/docs/r3.1.1/hadoop-project-dist/hadoop-hdfs/HdfsUserGuide.html

好,下面进入正题。介绍Java操作HDFS组件完成对文件系统的操作。

第一步:使用IDE建立Maven工程,建立工程时没有特殊说明,按照向导提示点击完成即可。重要的是在pom.xml文件中添加依赖包,内容如下图。

image.png

等待系统下载好依赖的jar包后便可以编写程序了。

以下代码段是操作HDFS的测试类:

package linose.hdfs;

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;

/**
 * HDFS 服务类
 * @author Iggi
 *
 */
public class HdfsService {

    protected String user;
    protected String defaultFS;
    protected Configuration conf;
    protected FileSystem fileSystem;
    
    /**
     * 构造函数
     * @param defaultFS hdfs://ip:port
     * @throws IOException 
     */
    public HdfsService(String defaultFS) throws IOException {
        this.defaultFS = defaultFS;
        conf = new Configuration();
        conf.set("fs.defaultFS", defaultFS);
        fileSystem = FileSystem.get(conf);
    }
    /**
     * 构造函数
     * @param user hdfs用户名
     * @param defaultFS hdfs://ip:port
     * @throws URISyntaxException 
     * @throws InterruptedException 
     * @throws IOException 
     */
    public HdfsService(String user, String defaultFS) throws IOException, InterruptedException, URISyntaxException {
        this.user = user;
        this.defaultFS = defaultFS;
        conf = new Configuration();
        conf.set("fs.defaultFS", defaultFS);
        fileSystem = FileSystem.get(new URI(defaultFS), conf, user);
    }
    
    /**
     * 获取HDFS文件系统对象,拿到该对象后可以跳过封装,直接调用HDFS提供的方法。
     * @return HDFS文件系统对象
     */
    public FileSystem getFileSystem() {
        return fileSystem;
    }
    
    /**
     * 析构资源
     */
    public void clear() {
        fileSystem = null;
        conf = null;
    }
    
    /**
     * 将输入路径构建成hdfs路径
     * @param path 输入路径
     * @return hdfs路径
     */
    protected Path buildHdfsPath(String path) {
        String hdfsPath = defaultFS;
        if (path.startsWith("/")) {
            hdfsPath += path;
        } else {
            hdfsPath += ("/" + path);
        }
        return new Path(hdfsPath);
    }
    
    /**
     * 输入路径是否存在
     * @param path 输入路径
     * @return 是否存在
     * @throws IOException
     * @throws URISyntaxException 
     * @throws InterruptedException 
     */
    public boolean IsExists(String path) throws IOException, InterruptedException, URISyntaxException {
        Path hdfsPath = buildHdfsPath(path);
        return fileSystem.exists(hdfsPath);
    }
    
    /**
     * 创建路径,如果该路径存在返回成功,否则创建新目录
     * @param path 路径
     * @return 成功或失败
     * @throws IOException
     * @throws URISyntaxException 
     * @throws InterruptedException 
     */
    public boolean createDirection(String path) throws IOException, InterruptedException, URISyntaxException {
        if (IsExists(path)) {
            return true;
        }
        Path hdfsPath = buildHdfsPath(path);
        return fileSystem.mkdirs(hdfsPath);
    }
    
    /**
     * 创建文件返回流对象,如果该文件已存在就已追加形式打开并返回流对象
     * @param path 文件路径
     * @return 成功或失败
     * @throws IOException
     * @throws InterruptedException
     * @throws URISyntaxException
     */
    public FSDataOutputStream createFile(String path) throws IOException, InterruptedException, URISyntaxException {
        if (IsExists(path)) {
            Path hdfsPath = buildHdfsPath(path);
            return fileSystem.append(hdfsPath);
        }
        Path hdfsPath = buildHdfsPath(path);
        return fileSystem.create(hdfsPath);
    }
    
    /**
     * 创建新文件,没有写文件操作。
     * @param path 文件路径
     * @return 成功或失败
     * @throws IOException
     * @throws InterruptedException
     * @throws URISyntaxException
     */
    public boolean createNewFile(String path) throws IOException, InterruptedException, URISyntaxException {
        if (IsExists(path)) {
            return true;
        }
        Path hdfsPath = buildHdfsPath(path);
        return fileSystem.createNewFile(hdfsPath);
    }
    
    /**
     * 显示制定路径下的文件与目录名称
     * @param path 制定路径
     * @return 制定目录内的文件与文件夹名称链表
     * @throws FileNotFoundException
     * @throws IOException
     * @throws InterruptedException
     * @throws URISyntaxException
     */
    public RemoteIterator<LocatedFileStatus> showFiles(String path) throws FileNotFoundException, IOException  {
        Path hdfsPath = buildHdfsPath(path);
        RemoteIterator<LocatedFileStatus> fsIterator = fileSystem.listFiles(hdfsPath, true);
        return fsIterator;
    }
    
    /**
     * 重命名文件
     * @param srcName 源文件名称 /test/t1
     * @param dstName 目标文件名称 /test/t2
     * @return 成功或失败
     * @throws IOException
     * @throws InterruptedException
     * @throws URISyntaxException
     */
    public boolean rename(String fileName, String newFileName) throws IOException {
        Path hdfsPath = buildHdfsPath(fileName);
        Path newHdfsPath = buildHdfsPath(newFileName);
        return fileSystem.rename(hdfsPath, newHdfsPath);
    }
    
    /**
     * 删除文件
     * @param path 文件路径
     * @return 成功或失败
     * @throws IOException
     * @throws InterruptedException
     * @throws URISyntaxException
     */
    public boolean delete(String path) throws IOException {
        Path hdfsPath = buildHdfsPath(path);
        return fileSystem.delete(hdfsPath, true);
    }
    
    /**
     * 打开文件,获取流对象后自行转换要使用的格式例如byte[]或者String,使用后需要关闭流对象
     * @param path
     * @return 流对象
     * @throws IOException
     * @throws InterruptedException
     * @throws URISyntaxException
     */
    public FSDataInputStream open(String path) throws IOException {
        Path hdfsPath = buildHdfsPath(path);
        return fileSystem.open(hdfsPath);
    }
    
    /**
     * 获取文件或文件夹路径在集群中的位置
     * @param path 文件或文件夹路径
     * @return 节点对象
     * @throws IOException
     * @throws InterruptedException
     * @throws URISyntaxException
     */
    public BlockLocation[] getFileBlockLocation(String path) throws IOException, InterruptedException, URISyntaxException {
        if (!IsExists(path)) {
            return null;
        }
        Path hdfsPath = buildHdfsPath(path);
        FileStatus fileStatus = fileSystem.getFileStatus(hdfsPath);
        return fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
    }
    
    /**
     * 关闭文件系统
     * @throws IOException
     * @throws InterruptedException
     * @throws URISyntaxException
     */
    public void close() throws IOException {
        fileSystem.close();
    }
    
    /**
     * 上传文件到HDFS
     * @param localFile 本地文件
     * @param hdfsDirection HDFS目录
     * @throws IOException
     * @throws InterruptedException
     * @throws URISyntaxException
     */
    public void uploadFileToHdfs(String localFile, String hdfsDirection) throws IOException {
        Path localPath = new Path(localFile);
        Path hdfsPath = buildHdfsPath(hdfsDirection);
        fileSystem.copyFromLocalFile(localPath, hdfsPath);
    }
    
    /**
     * 拷贝文件输出简单的进度条,如果需要百分比,请自己实现
     * @param localFile 本地文件
     * @param hdfsDirection HDFS目录
     * @throws IOException
     */
    public void uploadFileToHdfsWithProgress(String localFile, String hdfsDirection) throws IOException {
        Path localPath = new Path(localFile); 
        Path hdfsPath = buildHdfsPath(hdfsDirection);
        FileInputStream inputStream = new FileInputStream(localPath.toString());
        InputStream input = new BufferedInputStream(inputStream);
        FSDataOutputStream outputStream = fileSystem.create(hdfsPath, new Progressable() {
            public void progress() {
                // 进度条的输出
                System.out.print(".");
            }
        });
        IOUtils.copyBytes(input, outputStream, 4096);
        input.close();
        outputStream.close();
    }
    
    /**
     * 从HDFS目录下载文件
     * @param localFile 本地文件位置
     * @param hdfsPath HDFS文件位置
     * @throws IOException
     * @throws InterruptedException
     * @throws URISyntaxException
     */
    public void downloadFileFormHdfs(String localFile, String hdfsFile) throws IOException {
        Path localPath = new Path(localFile);
        Path hdfsPath = buildHdfsPath(hdfsFile);
        fileSystem.copyToLocalFile(hdfsPath, localPath);
    }
}

以下代码段是测试代码:

package linose.hdfs;

import java.io.IOException;
import java.net.URISyntaxException;

import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
//import org.apache.log4j.BasicConfigurator;

/**
 * Hello HDFS!
 * 本示例演示如何在分布式文件系统中进行:
 * 创建目录、创建文件、写入数据、打开文件、读取数据、追加数据、重命名文件、删除文件、重命名目录、删除目录
 * 查询指定路径中的文件与目录信息,查询指定文件在那个集群节点
 * 文件上传、下载、进度展示功能。
 */
public class App 
{
    public static void main( String[] args ) throws IOException, InterruptedException, URISyntaxException 
    {
        /**
         * 为了清楚的看到输出结果,暂将集群调试信息缺省。
         * 如果想查阅集群调试信息,取消注释即可。
         */
        //BasicConfigurator.configure();
        
        /**
         * 操作HDFS基础信息赋值
         */
        String user = "hdfs";
        String defaultFS = "hdfs://master2.linose.cloud.beijing.com:8020";
        
        /**
         * 获取HDFS服务对象
         */
        HdfsService service = new HdfsService(user, defaultFS);
        
        /**
         * 创建目录示例
         */
        String path = "index.dirs";
        if (service.createDirection(path)) {
            System.out.println("创建目录成功");
        } else {
            System.out.println("创建目录失败");
        }
        
        /**
         * 创建文件示例1,创建文件后返回流对象并写入新信息。
         */
        String file1 = "index.dirs/test1";
        FSDataOutputStream outputStream = service.createFile(file1);
        for (int i = 0; i < 5; ++i) {
            outputStream.writeUTF("index: "+ i + " hello HDFS! \n");
        }
        outputStream.close();
        System.out.println("创建文件,写入Hello HDFS后,关闭流对象");
        
        /**
         * 创建文件示例2,创建新文件。
         */
        String file2 = "index.dirs/test8";
        if (service.createNewFile(file2)) {
            System.out.println("创建新文件成功");
        } else {
            System.out.println("创建新文件失败");
        }
        
        /**
         * 追加数据
         */
        outputStream = service.createFile(file1);
        outputStream.writeUTF("这是一条追加数据 \n");
        outputStream.close();
        
        /**
         * 打开文件读取数据后关闭流对象
         */
        FSDataInputStream inputStream = service.open(file1);
        IOUtils.copyBytes(inputStream, System.out, 256);
        inputStream.close();
 
        /**
         * 重命名文件
         */
        String newFile = "index.dirs/test6.txt";
        String srcFile = "index.dirs/test6";
        if (service.rename(srcFile, newFile)) {
            System.out.println("重命名成功");
        } else {
            System.out.println("重命名失败");
        }
        
        /**
         * 创建目录、重命名目录
         */
        String path1 = "test.test";
        if (service.createDirection(path1)) {
            System.out.println("创建目录成功");
        } else {
            System.out.println("创建目录失败");
        }
        
        String path2 = "test.dirs";
        if (service.rename(path1, path2)) {
            System.out.println("重命名目录成功");
        } else {
            System.out.println("重命名目录失败");
        }
        
        /**
         * 删除目录
         */
        if (service.delete(path2)) {
            System.out.println("删除目录成功");
        } else {
            System.out.println("删除目录失败");
        }
        
        /**
         * 上传文件
         */
        service.uploadFileToHdfs("/Users/liupengchun/Downloads/hadoop-logo.jpg", "/index.dirs");
        
        /**
         * 查询指定路径中的文件与目录信息。
         */
        RemoteIterator<LocatedFileStatus> fsIterator = service.showFiles("/index.dirs");
        LocatedFileStatus lfs;
        while (fsIterator.hasNext()) {
            lfs = fsIterator.next();
            System.out.println(
                    (lfs.isDirectory() ? "文件夹" : "文件") 
                    + "文件大小:" + lfs.getLen()
                    + "文件路径:" + lfs.getPath()
                    );
        }
        
        /**
         * 查询指定文件在那个集群节点
         */
        BlockLocation[] locations = service.getFileBlockLocation(file1);
        if(locations != null && locations.length > 0){
            for(BlockLocation location : locations){
                System.out.println(location.getHosts()[0]);
            }
        }
        
        /**
         * 删除文件
         */
        if (service.delete(file1)) {
            System.out.println("删除文件成功");
        } else {
            System.out.println("删除文件失败");
        }
        
        /**
         * 下载文件
         */
        service.downloadFileFormHdfs("/Users/liupengchun/Downloads/hadoop-logo2.jpg", "/index.dirs/hadoop-logo.jpg");
    }
}

下图为测试结果:


image.png

至此,HDFS-3.1.1 实验示例演示完毕。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,287评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,346评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,277评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,132评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,147评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,106评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,019评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,862评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,301评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,521评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,682评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,405评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,996评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,651评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,803评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,674评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,563评论 2 352