springboot通过Java Api操作Docker中的Hadoop(填坑)

Hadoop是一个分布式的文件系统(HDFS),由很多服务器联合起来实现其功能,集群中的服务器有各自的角色,用于存储文件通过目录树来定位文件。
HDFS集群包括,NameNode、DataNode、Secondary Namenode:
(1)NameNode:负责管理整个文件系统的元数据,以及每一个路径(文件)所对应的数据块信息。
(2)DataNode:负责管理用户的文件数据块,每一个数据块都可以在多个datanode上存储多个副本。
(3)Secondary NameNode用来监控HDFS状态的辅助后台程序,每隔一段时间获取HDFS元数据的快照。

HDFS可以通过Java Api来实现对HDFS内的文件进行读写操作。

1、Hadoop安装
由于本篇重点讲HDFS的开发,Hadoop的安装配置就不重点讲,我们通过最快速的方式来实现Hadoop安装,即通过下载别人已经配置好的Docker镜像进行Hadoop安装。
我的镜像:registry.cn-hangzhou.aliyuncs.com/xvjialing/hadoop
这个镜像是已经完全配置好的Hadoop,通过伪分布式方式(管理节点和数据节点在一台机)部署,直接运行即可使用。
(1)创建容器:

[root@iZbp13sno1lc2yxlhjc4b3Z ~]# docker run --name hadoop -d -p 8091:50070 -p 8092:9000 registry.cn-hangzhou.aliyuncs.com/xvjialing/hadoop

说明:
50070端口:提供HDFS的管理控制台,可在浏览器里面查看HDFS各节点信息和文件目录
9000端口:提供访问HDFS文件系统地址,通过该端口对文件进行读写操作

该镜像hadoop安装目录:/usr/local/hadoop

(2)配置文件:
2个重要的核心配置文件,hdfs-site.xml、core-site.xml。配置文件目录:/usr/local/hadoop/etc/hadoop
hdfs-site.xml,配置hdfs各节点目录地址:

<?xml version="1.0"?>
<configuration>
    <property>
        <name>dfs.namenode.name.dir</name>
        <value>file:///root/hdfs/namenode</value>
        <description>NameNode directory for namespace and transaction logs storage.</description>
    </property>
    <property>
        <name>dfs.datanode.data.dir</name>
        <value>file:///root/hdfs/datanode</value>
        <description>DataNode directory</description>
    </property>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>

core-site.xml,配置hdfs的访问地址端口:

<?xml version="1.0"?>
<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000/</value>
    </property>
</configuration>

(3)访问控制台
浏览器输入http://192.168.2.104:8091


image.png

可查看hdfs的各节点健康信息以及文件目录信息。

2、hadoop项目搭建
使用springboot快速搭建项目结构,在项目里面引入hadoop相关的依赖包即可。
(1)pom.xml

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <!--hadoop依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.1.1</version>
        </dependency>
    </dependencies>

(2)application.yml
主要配置hadoop的访问地址,以及访问用户

#HDFS配置
hdfs:
  path: hdfs://192.168.2.104:8092
  user: root

注意:
path:我这里是docker主机的地址和端口,对应映射是docker里面hdfs配置的地址端口,即:hdfs://localhost:9000
user:这里是个坑点,这个用户必须是运行hadoop的系统用户,否则会报没有权限操作的异常,由于docker是root用户,所以填root。你可以试下随便填,看报什么错

(3)主要的JAVA类
操作HDFS涉及到的主要类就两个,比较简单:
Configuration:封装客户端和服务端的配置,就是访问hdfs的地址path和用户user。
FileSystem:封装操作文件系统对象,提供了很多方法来对hdfs进行操作,就是对目录/文件进行增/删/改/查/上传/下载/遍历等。

3、通过API操作hdfs
(1)获取FileSystem对象

public class HdfsService {

    @Value("${hdfs.path}")
    private String hdfsPath;

    @Value("${hdfs.user}")
    private String user;

    /**
     * 获取hdfs配置信息
     * @return
     */
    private Configuration getConfiguration(){
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", hdfsPath);
        return configuration;
    }

    /**
     * 获取文件系统对象
     * @return
     */
    public FileSystem getFileSystem() throws Exception {
        FileSystem fileSystem = FileSystem.get(new URI(hdfsPath), getConfiguration(), user);
        return fileSystem;
    }
...
}

(2)通过FileSystem操作hdfs
在Service层添加如下API操作代码:


    /**
     * 创建HDFS文件夹
     * @param dir
     * @return
     * @throws Exception
     */
    public boolean mkdir(String dir) throws Exception{
        if(StringUtils.isBlank(dir)){
            return false;
        }
        if(exist(dir)){
            return true;
        }
        FileSystem fileSystem = getFileSystem();
        boolean isOk = fileSystem.mkdirs(new Path(dir));
        fileSystem.close();
        return isOk;
    }

    /**
     * 判断HDFS的文件是否存在
     * @param path
     * @return
     * @throws Exception
     */
    public boolean exist(String path) throws Exception {
        if(StringUtils.isBlank(path)){
            return false;
        }
        FileSystem fileSystem = getFileSystem();
        return fileSystem.exists(new Path(path));
    }

    /**
     * 读取路径下的文件信息
     * @param path
     * @return
     * @throws Exception
     */
    public List<Map<String,Object>> readPathInfo(String path) throws Exception {
        if(!exist(path)){
            return null;
        }
        FileSystem fs = getFileSystem();
        FileStatus[] statuses = fs.listStatus(new Path(path));
        if(statuses == null || statuses.length < 1){
            return null;
        }
        List<Map<String,Object>> list = new ArrayList<>();
        for(FileStatus fileStatus : statuses){
            Map<String,Object> map = new HashMap<>();
            map.put("filePath", fileStatus.getPath());
            map.put("fileStatus", fileStatus.toString());

            list.add(map);
        }
        return  list;
    }

    /**
     * HDFS创建文件
     * @param path
     * @param file
     * @throws Exception
     */
    public void createFile(String path, MultipartFile file) throws Exception {
        if(StringUtils.isBlank(path) || null == file){
            return;
        }
        FileSystem fs = getFileSystem();
        String fileName = file.getOriginalFilename();//文件名
        Path filePath = new Path(path + "/" + fileName);
        FSDataOutputStream outputStream = fs.create(filePath);
        outputStream.write(file.getBytes());
        outputStream.close();
        fs.close();
    }

    /**
     * 读取HDFS文件内容
     * @param path
     * @return
     * @throws Exception
     */
    public String readFileToString(String path) throws Exception{
        if(!exist(path)){
            return null;
        }
        FileSystem fs = getFileSystem();
        FSDataInputStream inputStream = null;
        try {
            inputStream = fs.open(new Path(path));
            BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
            StringBuffer sb = new StringBuffer();
            String line = "";
            while ((line = reader.readLine()) != null){
                sb.append(line);
            }
            return sb.toString();
        }finally {
            if(inputStream != null){
                inputStream.close();
            }
            fs.close();
        }
    }

    /**
     * 获取目录下的文件列表
     * @param path
     * @return
     * @throws Exception
     */
    public List<Map<String,Object>> listFiles(String path) throws Exception {
        if(!exist(path)){
            return null;
        }
        FileSystem fs = getFileSystem();
        RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path(path), true);
        List<Map<String,Object>> list = new ArrayList<>();
        while (iterator.hasNext()){
            LocatedFileStatus fileStatus = iterator.next();
            Map<String,Object> map = new HashMap<>();
            map.put("filePath", fileStatus.getPath().toString());
            map.put("fileName", fileStatus.getPath().getName());

            list.add(map);
        }
        return  list;
    }

    /**
     * 重命名HDFS文件
     * @param oldName
     * @param newName
     * @return
     * @throws Exception
     */
    public boolean renameFile(String oldName, String newName)throws Exception{
        if(!exist(oldName) || StringUtils.isBlank(newName)){
            return false;
        }
        FileSystem fs = getFileSystem();
        boolean isOk = fs.rename(new Path(oldName), new Path(newName));
        fs.close();
        return isOk;
    }

    /**
     * 删除HDFS文件
     * @param path
     * @return
     * @throws Exception
     */
    public boolean deleteFile(String path)throws Exception {
        if(!exist(path)){
            return false;
        }
        FileSystem fs = getFileSystem();
        boolean isOk = fs.deleteOnExit(new Path(path));
        fs.close();
        return isOk;
    }

    /**
     * 上传文件到HDFS
     * @param path
     * @param uploadPath
     * @throws Exception
     */
    public void uploadFile(String path,String uploadPath) throws Exception{
        if(StringUtils.isBlank(path) || StringUtils.isBlank(uploadPath)){
            return;
        }
        FileSystem fs = getFileSystem();
        fs.copyFromLocalFile(new Path(path), new Path(uploadPath));
        fs.close();
    }

    /**
     * 从HDFS下载文件
     * @param path
     * @param downloadPath
     * @throws Exception
     */
    public void downloadFile(String path, String downloadPath) throws Exception{
        if(StringUtils.isBlank(path) || StringUtils.isBlank(downloadPath)){
            return;
        }
        FileSystem fs = getFileSystem();
        fs.copyToLocalFile(new Path(path), new Path(downloadPath) );
        fs.close();
    }

    /**
     * 拷贝HDFS文件
     * @param sourcePath
     * @param targetPath
     * @throws Exception
     */
    public void copyFile(String sourcePath, String targetPath) throws Exception{
        if(StringUtils.isBlank(sourcePath) || StringUtils.isBlank(targetPath)){
            return;
        }
        FileSystem fs = getFileSystem();
        FSDataInputStream inputStream = null;
        FSDataOutputStream outputStream = null;
        try{
            inputStream = fs.open(new Path(sourcePath));
            outputStream = fs.create(new Path(targetPath));
            //todo IOUtils.copyBytes(inputStream, outputStream, , false);
        }finally {
            if(inputStream != null){
                inputStream.close();
            }
            if(outputStream != null){
                outputStream.close();
            }
            fs.close();
        }
    }

    /**
     * 读取HDFS文件并返回byte[]
     * @param path
     * @return
     * @throws Exception
     */
    public byte[] readFileToBytes(String path) throws Exception{
        if(!exist(path)){
            return null;
        }
        FileSystem fs = getFileSystem();
        FSDataInputStream inputStream = null;
        try {
            inputStream = fs.open(new Path(path));
            return IOUtils.readFullyToByteArray(inputStream);
        }finally {
            if(inputStream != null){
                inputStream.close();
            }
            fs.close();
        }
    }

    /**
     * 获取文件块在集群的位置
     * @param path
     * @return
     * @throws Exception
     */
    public BlockLocation[] getFileBlockLocations(String path)throws Exception{
        if(exist(path)){
            return null;
        }
        FileSystem fs = getFileSystem();
        FileStatus fileStatus = fs.getFileStatus(new Path(path));
        return fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
    }

(3)编写Controller提供接口


@RestController
@RequestMapping("/api/hdfs")
public class HdfsController {

    @Autowired
    private HdfsService service;

    @GetMapping("/mkdir")
    public Object mkdir(String path){
        try {
            service.mkdir(path);
            return RtnData.ok();
        } catch (Exception e) {
            return RtnData.fail(e);
        }
    }

    @PostMapping("/createFile")
    public Object createFile(String path, MultipartFile file){
        try {
            service.createFile(path, file);
            return RtnData.ok();
        } catch (Exception e) {
            return RtnData.fail(e);
        }
    }

    @GetMapping("/readFileToString")
    public Object readFileToString(String path){
        try {
            return RtnData.ok(service.readFileToString(path));
        } catch (Exception e) {
            return RtnData.fail(e);
        }
    }
...
}

4、来个测试
我们通过调用/api/hdfs/mkdir接口来创建目录,然后在控制台看下是文件目录是否存在。
通过postman调用接口,创建/test目录:

image.png

登录控制台,菜单Utilities/Browse the file siystem,t可以看到已经成功添加:
image.png

5、填下坑,一定要看
下面的几个坑点我搞了好几天才解决。
(1)可能你会遇到Connect Refused之类的错误。
修改core-site.xml,将hdfs://localhost:9000/中的localhost修改为容器地址
(2)可能你会遇到Permission denied之类的错误
客户端调用的时候,user不是运行hdfs的用户/用户组,没有权限操作。
(3)可能你会遇到HADOOP_HOME and hadoop.home.dir are unset之类的报错
客户端没有安装设置hadoop的环境变量:HADOOP_HOME。可以不予理会,不影响操作。
(4)可能你会遇到Could not locate Hadoop executable: xxx\bin\winutils.exe之类的报错
客户端hadoop的bin目录没有winutils.exe文件。可以不予理会,不影响操作。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,718评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,683评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,207评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,755评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,862评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,050评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,136评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,882评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,330评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,651评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,789评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,477评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,135评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,864评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,099评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,598评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,697评论 2 351