Hadoop ha环境下的java api操作

1:POM;引入jar包
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.1.2</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>3.1.2</version>
</dependency>

2:设置连接池
import com.kafka.KafkaBase.service.HdfsService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**

  • HDFS相关配置

  • @author zhenmin.gu

  • @since 1.0.0
    */
    @Configuration
    public class HdfsConfig {

    private String defaultHdfsUri = "hdfs://ns1";

    @Bean
    public HdfsService getHbaseService(){
    org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();

     return new HdfsService(conf,defaultHdfsUri);
    

    }
    }

3:读写操作API

import com.alibaba.fastjson.JSON;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.Charset;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**

  • HDFS相关的基本操作

  • @since 1.0.0
    */
    public class HdfsService {

    private Logger logger = LoggerFactory.getLogger(HdfsService.class);
    private Configuration conf = null;

    /**

    • 默认的HDFS路径
      */
      private String defaultHdfsUri;

    public HdfsService(Configuration conf,String defaultHdfsUri) {
    this.conf = conf;
    this.defaultHdfsUri = defaultHdfsUri;
    }

    /**

    • 获取HDFS文件系统
    • @return org.apache.hadoop.fs.FileSystem
      */
      private FileSystem getFileSystem() throws IOException {
      return FileSystem.get(conf);
      }

    /**

    • 创建HDFS目录

    • @since 1.0.0

    • @param path HDFS的相对目录路径,比如:/testDir

    • @return boolean 是否创建成功
      */
      public boolean mkdir(String path){
      //如果目录已经存在,则直接返回
      if(checkExists(path)){
      return true;
      }else{
      FileSystem fileSystem = null;

       try {
           fileSystem = getFileSystem();
      
           //最终的HDFS文件目录
           String hdfsPath = generateHdfsPath(path);
           //创建目录
           return fileSystem.mkdirs(new Path(hdfsPath));
       } catch (IOException e) {
           logger.error(MessageFormat.format("创建HDFS目录失败,path:{0}",path),e);
           return false;
       }finally {
           close(fileSystem);
       }
      

      }
      }

    /**

    • 上传文件至HDFS
    • @since 1.0.0
    • @param srcFile 本地文件路径,比如:D:/test.txt
    • @param dstPath HDFS的相对目录路径,比如:/testDir
      */
      public void uploadFileToHdfs(String srcFile, String dstPath){
      this.uploadFileToHdfs(false, true, srcFile, dstPath);
      }

    /**

    • 上传文件至HDFS

    • @since 1.0.0

    • @param delSrc 是否删除本地文件

    • @param overwrite 是否覆盖HDFS上面的文件

    • @param srcFile 本地文件路径,比如:D:/test.txt

    • @param dstPath HDFS的相对目录路径,比如:/testDir
      */
      public void uploadFileToHdfs(boolean delSrc, boolean overwrite, String srcFile, String dstPath){
      //源文件路径
      Path localSrcPath = new Path(srcFile);
      //目标文件路径
      Path hdfsDstPath = new Path(generateHdfsPath(dstPath));

      FileSystem fileSystem = null;
      try {
      fileSystem = getFileSystem();

       fileSystem.copyFromLocalFile(delSrc,overwrite,localSrcPath,hdfsDstPath);
      

      } catch (IOException e) {
      logger.error(MessageFormat.format("上传文件至HDFS失败,srcFile:{0},dstPath:{1}",srcFile,dstPath),e);
      }finally {
      close(fileSystem);
      }
      }

    /**

    • 判断文件或者目录是否在HDFS上面存在

    • @since 1.0.0

    • @param path HDFS的相对目录路径,比如:/testDir、/testDir/a.txt

    • @return boolean
      */
      public boolean checkExists(String path){
      FileSystem fileSystem = null;
      try {
      fileSystem = getFileSystem();

       //最终的HDFS文件目录
       String hdfsPath = generateHdfsPath(path);
      
       //创建目录
       return fileSystem.exists(new Path(hdfsPath));
      

      } catch (IOException e) {
      logger.error(MessageFormat.format("'判断文件或者目录是否在HDFS上面存在'失败,path:{0}",path),e);
      return false;
      }finally {
      close(fileSystem);
      }
      }

    /**

    • 获取HDFS上面的某个路径下面的所有文件或目录(不包含子目录)信息

    • @since 1.0.0

    • @param path HDFS的相对目录路径,比如:/testDir

    • @return java.util.List<java.util.Map<java.lang.String,java.lang.Object>>
      */
      public List<Map<String,Object>> listFiles(String path, PathFilter pathFilter){
      //返回数据
      List<Map<String,Object>> result = new ArrayList<>();

      //如果目录已经存在,则继续操作
      if(checkExists(path)){
      FileSystem fileSystem = null;

       try {
           fileSystem = getFileSystem();
      
           //最终的HDFS文件目录
           String hdfsPath = generateHdfsPath(path);
      
           FileStatus[] statuses;
           //根据Path过滤器查询
           if(pathFilter != null){
               statuses = fileSystem.listStatus(new Path(hdfsPath),pathFilter);
           }else{
               statuses = fileSystem.listStatus(new Path(hdfsPath));
           }
      
           if(statuses != null){
               for(FileStatus status : statuses){
                   //每个文件的属性
                   Map<String,Object> fileMap = new HashMap<>(2);
      
                   fileMap.put("path",status.getPath().toString());
                   fileMap.put("isDir",status.isDirectory());
      
                   result.add(fileMap);
               }
           }
       } catch (IOException e) {
           logger.error(MessageFormat.format("获取HDFS上面的某个路径下面的所有文件失败,path:{0}",path),e);
       }finally {
           close(fileSystem);
       }
      

      }

      return result;
      }

/**
 * 从HDFS下载文件至本地
 * 
 * @since 1.0.0
 * @param srcFile HDFS的相对目录路径,比如:/testDir/a.txt
 * @param dstFile 下载之后本地文件路径(如果本地文件目录不存在,则会自动创建),比如:D:/test.txt
 */
public void downloadFileFromHdfs(String srcFile, String dstFile){
    //HDFS文件路径
    Path hdfsSrcPath = new Path(generateHdfsPath(srcFile));
    //下载之后本地文件路径
    Path localDstPath = new Path(dstFile);

    FileSystem fileSystem = null;
    try {
        fileSystem = getFileSystem();

        fileSystem.copyToLocalFile(hdfsSrcPath,localDstPath);
    } catch (IOException e) {
        logger.error(MessageFormat.format("从HDFS下载文件至本地失败,srcFile:{0},dstFile:{1}",srcFile,dstFile),e);
    }finally {
        close(fileSystem);
    }
}

/**
 * 打开HDFS上面的文件并返回 InputStream
 * 
 * @since 1.0.0
 * @param path HDFS的相对目录路径,比如:/testDir/c.txt
 * @return FSDataInputStream
 */
public FSDataInputStream open(String path){
    //HDFS文件路径
    Path hdfsPath = new Path(generateHdfsPath(path));

    FileSystem fileSystem = null;
    try {
        fileSystem = getFileSystem();

        return fileSystem.open(hdfsPath);
    } catch (IOException e) {
        logger.error(MessageFormat.format("打开HDFS上面的文件失败,path:{0}",path),e);
    }

    return null;
}

/**
 * 打开HDFS上面的文件并返回byte数组,方便Web端下载文件
 * <p>new ResponseEntity<byte[]>(byte数组, headers, HttpStatus.CREATED);</p>
 * <p>或者:new ResponseEntity<byte[]>(FileUtils.readFileToByteArray(templateFile), headers, HttpStatus.CREATED);</p>
 * 
 * @since 1.0.0
 * @param path HDFS的相对目录路径,比如:/testDir/b.txt
 * @return FSDataInputStream
 */
public byte[] openWithBytes(String path){
    //HDFS文件路径
    Path hdfsPath = new Path(generateHdfsPath(path));

    FileSystem fileSystem = null;
    FSDataInputStream inputStream = null;
    try {
        fileSystem = getFileSystem();
        inputStream = fileSystem.open(hdfsPath);

        return IOUtils.toByteArray(inputStream);
    } catch (IOException e) {
        logger.error(MessageFormat.format("打开HDFS上面的文件失败,path:{0}",path),e);
    }finally {
        if(inputStream != null){
            try {
                inputStream.close();
            } catch (IOException e) {
                // ignore
            }
        }
    }

    return null;
}

/**
 * 打开HDFS上面的文件并返回String字符串
 * 
 * @since 1.0.0
 * @param path HDFS的相对目录路径,比如:/testDir/b.txt
 * @return FSDataInputStream
 */
public String openWithString(String path){
    //HDFS文件路径
    Path hdfsPath = new Path(generateHdfsPath(path));

    FileSystem fileSystem = null;
    FSDataInputStream inputStream = null;
    try {
        fileSystem = getFileSystem();
        inputStream = fileSystem.open(hdfsPath);

        return IOUtils.toString(inputStream, String.valueOf(Charset.forName("UTF-8")));
    } catch (IOException e) {
        logger.error(MessageFormat.format("打开HDFS上面的文件失败,path:{0}",path),e);
    }finally {
        if(inputStream != null){
            try {
                inputStream.close();
            } catch (IOException e) {
                // ignore
            }
        }
    }

    return null;
}

/**
 * 打开HDFS上面的文件并转换为Java对象(需要HDFS上门的文件内容为JSON字符串)
 * 
 * @since 1.0.0
 * @param path HDFS的相对目录路径,比如:/testDir/c.txt
 * @return FSDataInputStream
 */
public <T extends Object> T openWithObject(String path, Class<T> clazz){
    //1、获得文件的json字符串
    String jsonStr = this.openWithString(path);

    //2、使用com.alibaba.fastjson.JSON将json字符串转化为Java对象并返回
    return JSON.parseObject(jsonStr, clazz);
}

/**
 * 重命名
 * 
 * @since 1.0.0
 * @param srcFile 重命名之前的HDFS的相对目录路径,比如:/testDir/b.txt
 * @param dstFile 重命名之后的HDFS的相对目录路径,比如:/testDir/b_new.txt
 */
public boolean rename(String srcFile, String dstFile) {
    //HDFS文件路径
    Path srcFilePath = new Path(generateHdfsPath(srcFile));
    //下载之后本地文件路径
    Path dstFilePath = new Path(dstFile);

    FileSystem fileSystem = null;
    try {
        fileSystem = getFileSystem();

        return fileSystem.rename(srcFilePath,dstFilePath);
    } catch (IOException e) {
        logger.error(MessageFormat.format("重命名失败,srcFile:{0},dstFile:{1}",srcFile,dstFile),e);
    }finally {
        close(fileSystem);
    }

    return false;
}

/**
 * 删除HDFS文件或目录
 * 
 * @since 1.0.0
 * @param path HDFS的相对目录路径,比如:/testDir/c.txt
 * @return boolean
 */
public boolean delete(String path) {
    //HDFS文件路径
    Path hdfsPath = new Path(generateHdfsPath(path));

    FileSystem fileSystem = null;
    try {
        fileSystem = getFileSystem();

        return fileSystem.delete(hdfsPath,true);
    } catch (IOException e) {
        logger.error(MessageFormat.format("删除HDFS文件或目录失败,path:{0}",path),e);
    }finally {
        close(fileSystem);
    }

    return false;
}

/**
 * 获取某个文件在HDFS集群的位置
 * 
 * @since 1.0.0
 * @param path HDFS的相对目录路径,比如:/testDir/a.txt
 * @return org.apache.hadoop.fs.BlockLocation[]
 */
public BlockLocation[] getFileBlockLocations(String path) {
    //HDFS文件路径
    Path hdfsPath = new Path(generateHdfsPath(path));

    FileSystem fileSystem = null;
    try {
        fileSystem = getFileSystem();
        FileStatus fileStatus = fileSystem.getFileStatus(hdfsPath);

        return fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
    } catch (IOException e) {
        logger.error(MessageFormat.format("获取某个文件在HDFS集群的位置失败,path:{0}",path),e);
    }finally {
        close(fileSystem);
    }

    return null;
}


/**
 * 将相对路径转化为HDFS文件路径
 * 
 * @since 1.0.0
 * @param dstPath 相对路径,比如:/data
 * @return java.lang.String
 */
private String generateHdfsPath(String dstPath){
    String hdfsPath = defaultHdfsUri;
    if(dstPath.startsWith("/")){
        hdfsPath += dstPath;
    }else{
        hdfsPath = hdfsPath + "/" + dstPath;
    }

    return hdfsPath;
}

/**
 * close方法
 */
private void close(FileSystem fileSystem){
    if(fileSystem != null){
        try {
            fileSystem.close();
        } catch (IOException e) {
            logger.error(e.getMessage());
        }
    }
}

}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352

推荐阅读更多精彩内容

  • 通过API操作HDFS 今天的主要内容 HDFS获取文件系统 HDFS文件上传 HDFS文件下载 HDFS目录创建...
    须臾之北阅读 2,703评论 0 3
  • 前言 Netflix电影推荐的百万美金比赛,把“推荐”变成了时下最热门的数据挖掘算法之一。也正是由于Netflix...
    Alukar阅读 1,512评论 0 11
  • kerberos 介绍 阅读本文之前建议先预读下面这篇博客kerberos认证原理---讲的非常细致,易懂 Ker...
    PunyGod阅读 20,019评论 7 29
  • 本文以Loadrunner的Java_Vuser脚本为例,来做一次HDFS的文件操作测试,由于LoadRunner...
    smooth00阅读 393评论 0 1
  • 独坐朝南路 孤身向北庭 秋风千滴露 冷雨四周星 谪别南过海 当窗思北冥 他乡人是客 羁旅叹伶仃
    思明帝阅读 202评论 0 1