大家好,我是Iggi。
今天我给大家分享的是HDFS-3.1.1版本的实验示例。
首先用一段文字简介HDFS:
HDFS是英文Hadoop Distributed File System的缩写。作为Hadoop的核心组件之一,它的设计思路参考于Google的GFS论文,是GFS的开源实现。由于HDFS自身的成熟稳定,加之拥有众多用户,现在已经成为当前分布式存储的事实标准。HDFS为Hadoop生态圈中的其它组件提供最基本的存储功能。它具有高容错性、高可靠性、高扩展性、高获得性、高吞吐率等特征为大数据存储和处理提供了强大的底层存储架构,可以说它是一切大数据平台的基础。HDFS采用Master/Slave架构,主服务器运行Master进程NameNode,从服务器Slave进程DataNode。它将集群中所有服务器的存储空间连接到一起,构成了一个统一的、海量的存储空间。下图是HDFS的架构图。
如果大家还想了解更多的有关HDFS的知识请访问:http://hadoop.apache.org/docs/r3.1.1/hadoop-project-dist/hadoop-hdfs/HdfsUserGuide.html
好,下面进入正题。介绍Java操作HDFS组件完成对文件系统的操作。
第一步:使用IDE建立Maven工程,建立工程时没有特殊说明,按照向导提示点击完成即可。重要的是在pom.xml文件中添加依赖包,内容如下图。
等待系统下载好依赖的jar包后便可以编写程序了。
以下代码段是操作HDFS的测试类:
package linose.hdfs;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
/**
* HDFS 服务类
* @author Iggi
*
*/
public class HdfsService {
protected String user;
protected String defaultFS;
protected Configuration conf;
protected FileSystem fileSystem;
/**
* 构造函数
* @param defaultFS hdfs://ip:port
* @throws IOException
*/
public HdfsService(String defaultFS) throws IOException {
this.defaultFS = defaultFS;
conf = new Configuration();
conf.set("fs.defaultFS", defaultFS);
fileSystem = FileSystem.get(conf);
}
/**
* 构造函数
* @param user hdfs用户名
* @param defaultFS hdfs://ip:port
* @throws URISyntaxException
* @throws InterruptedException
* @throws IOException
*/
public HdfsService(String user, String defaultFS) throws IOException, InterruptedException, URISyntaxException {
this.user = user;
this.defaultFS = defaultFS;
conf = new Configuration();
conf.set("fs.defaultFS", defaultFS);
fileSystem = FileSystem.get(new URI(defaultFS), conf, user);
}
/**
* 获取HDFS文件系统对象,拿到该对象后可以跳过封装,直接调用HDFS提供的方法。
* @return HDFS文件系统对象
*/
public FileSystem getFileSystem() {
return fileSystem;
}
/**
* 析构资源
*/
public void clear() {
fileSystem = null;
conf = null;
}
/**
* 将输入路径构建成hdfs路径
* @param path 输入路径
* @return hdfs路径
*/
protected Path buildHdfsPath(String path) {
String hdfsPath = defaultFS;
if (path.startsWith("/")) {
hdfsPath += path;
} else {
hdfsPath += ("/" + path);
}
return new Path(hdfsPath);
}
/**
* 输入路径是否存在
* @param path 输入路径
* @return 是否存在
* @throws IOException
* @throws URISyntaxException
* @throws InterruptedException
*/
public boolean IsExists(String path) throws IOException, InterruptedException, URISyntaxException {
Path hdfsPath = buildHdfsPath(path);
return fileSystem.exists(hdfsPath);
}
/**
* 创建路径,如果该路径存在返回成功,否则创建新目录
* @param path 路径
* @return 成功或失败
* @throws IOException
* @throws URISyntaxException
* @throws InterruptedException
*/
public boolean createDirection(String path) throws IOException, InterruptedException, URISyntaxException {
if (IsExists(path)) {
return true;
}
Path hdfsPath = buildHdfsPath(path);
return fileSystem.mkdirs(hdfsPath);
}
/**
* 创建文件返回流对象,如果该文件已存在就已追加形式打开并返回流对象
* @param path 文件路径
* @return 成功或失败
* @throws IOException
* @throws InterruptedException
* @throws URISyntaxException
*/
public FSDataOutputStream createFile(String path) throws IOException, InterruptedException, URISyntaxException {
if (IsExists(path)) {
Path hdfsPath = buildHdfsPath(path);
return fileSystem.append(hdfsPath);
}
Path hdfsPath = buildHdfsPath(path);
return fileSystem.create(hdfsPath);
}
/**
* 创建新文件,没有写文件操作。
* @param path 文件路径
* @return 成功或失败
* @throws IOException
* @throws InterruptedException
* @throws URISyntaxException
*/
public boolean createNewFile(String path) throws IOException, InterruptedException, URISyntaxException {
if (IsExists(path)) {
return true;
}
Path hdfsPath = buildHdfsPath(path);
return fileSystem.createNewFile(hdfsPath);
}
/**
* 显示制定路径下的文件与目录名称
* @param path 制定路径
* @return 制定目录内的文件与文件夹名称链表
* @throws FileNotFoundException
* @throws IOException
* @throws InterruptedException
* @throws URISyntaxException
*/
public RemoteIterator<LocatedFileStatus> showFiles(String path) throws FileNotFoundException, IOException {
Path hdfsPath = buildHdfsPath(path);
RemoteIterator<LocatedFileStatus> fsIterator = fileSystem.listFiles(hdfsPath, true);
return fsIterator;
}
/**
* 重命名文件
* @param srcName 源文件名称 /test/t1
* @param dstName 目标文件名称 /test/t2
* @return 成功或失败
* @throws IOException
* @throws InterruptedException
* @throws URISyntaxException
*/
public boolean rename(String fileName, String newFileName) throws IOException {
Path hdfsPath = buildHdfsPath(fileName);
Path newHdfsPath = buildHdfsPath(newFileName);
return fileSystem.rename(hdfsPath, newHdfsPath);
}
/**
* 删除文件
* @param path 文件路径
* @return 成功或失败
* @throws IOException
* @throws InterruptedException
* @throws URISyntaxException
*/
public boolean delete(String path) throws IOException {
Path hdfsPath = buildHdfsPath(path);
return fileSystem.delete(hdfsPath, true);
}
/**
* 打开文件,获取流对象后自行转换要使用的格式例如byte[]或者String,使用后需要关闭流对象
* @param path
* @return 流对象
* @throws IOException
* @throws InterruptedException
* @throws URISyntaxException
*/
public FSDataInputStream open(String path) throws IOException {
Path hdfsPath = buildHdfsPath(path);
return fileSystem.open(hdfsPath);
}
/**
* 获取文件或文件夹路径在集群中的位置
* @param path 文件或文件夹路径
* @return 节点对象
* @throws IOException
* @throws InterruptedException
* @throws URISyntaxException
*/
public BlockLocation[] getFileBlockLocation(String path) throws IOException, InterruptedException, URISyntaxException {
if (!IsExists(path)) {
return null;
}
Path hdfsPath = buildHdfsPath(path);
FileStatus fileStatus = fileSystem.getFileStatus(hdfsPath);
return fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
}
/**
* 关闭文件系统
* @throws IOException
* @throws InterruptedException
* @throws URISyntaxException
*/
public void close() throws IOException {
fileSystem.close();
}
/**
* 上传文件到HDFS
* @param localFile 本地文件
* @param hdfsDirection HDFS目录
* @throws IOException
* @throws InterruptedException
* @throws URISyntaxException
*/
public void uploadFileToHdfs(String localFile, String hdfsDirection) throws IOException {
Path localPath = new Path(localFile);
Path hdfsPath = buildHdfsPath(hdfsDirection);
fileSystem.copyFromLocalFile(localPath, hdfsPath);
}
/**
* 拷贝文件输出简单的进度条,如果需要百分比,请自己实现
* @param localFile 本地文件
* @param hdfsDirection HDFS目录
* @throws IOException
*/
public void uploadFileToHdfsWithProgress(String localFile, String hdfsDirection) throws IOException {
Path localPath = new Path(localFile);
Path hdfsPath = buildHdfsPath(hdfsDirection);
FileInputStream inputStream = new FileInputStream(localPath.toString());
InputStream input = new BufferedInputStream(inputStream);
FSDataOutputStream outputStream = fileSystem.create(hdfsPath, new Progressable() {
public void progress() {
// 进度条的输出
System.out.print(".");
}
});
IOUtils.copyBytes(input, outputStream, 4096);
input.close();
outputStream.close();
}
/**
* 从HDFS目录下载文件
* @param localFile 本地文件位置
* @param hdfsPath HDFS文件位置
* @throws IOException
* @throws InterruptedException
* @throws URISyntaxException
*/
public void downloadFileFormHdfs(String localFile, String hdfsFile) throws IOException {
Path localPath = new Path(localFile);
Path hdfsPath = buildHdfsPath(hdfsFile);
fileSystem.copyToLocalFile(hdfsPath, localPath);
}
}
以下代码段是测试代码:
package linose.hdfs;
import java.io.IOException;
import java.net.URISyntaxException;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
//import org.apache.log4j.BasicConfigurator;
/**
* Hello HDFS!
* 本示例演示如何在分布式文件系统中进行:
* 创建目录、创建文件、写入数据、打开文件、读取数据、追加数据、重命名文件、删除文件、重命名目录、删除目录
* 查询指定路径中的文件与目录信息,查询指定文件在那个集群节点
* 文件上传、下载、进度展示功能。
*/
public class App
{
public static void main( String[] args ) throws IOException, InterruptedException, URISyntaxException
{
/**
* 为了清楚的看到输出结果,暂将集群调试信息缺省。
* 如果想查阅集群调试信息,取消注释即可。
*/
//BasicConfigurator.configure();
/**
* 操作HDFS基础信息赋值
*/
String user = "hdfs";
String defaultFS = "hdfs://master2.linose.cloud.beijing.com:8020";
/**
* 获取HDFS服务对象
*/
HdfsService service = new HdfsService(user, defaultFS);
/**
* 创建目录示例
*/
String path = "index.dirs";
if (service.createDirection(path)) {
System.out.println("创建目录成功");
} else {
System.out.println("创建目录失败");
}
/**
* 创建文件示例1,创建文件后返回流对象并写入新信息。
*/
String file1 = "index.dirs/test1";
FSDataOutputStream outputStream = service.createFile(file1);
for (int i = 0; i < 5; ++i) {
outputStream.writeUTF("index: "+ i + " hello HDFS! \n");
}
outputStream.close();
System.out.println("创建文件,写入Hello HDFS后,关闭流对象");
/**
* 创建文件示例2,创建新文件。
*/
String file2 = "index.dirs/test8";
if (service.createNewFile(file2)) {
System.out.println("创建新文件成功");
} else {
System.out.println("创建新文件失败");
}
/**
* 追加数据
*/
outputStream = service.createFile(file1);
outputStream.writeUTF("这是一条追加数据 \n");
outputStream.close();
/**
* 打开文件读取数据后关闭流对象
*/
FSDataInputStream inputStream = service.open(file1);
IOUtils.copyBytes(inputStream, System.out, 256);
inputStream.close();
/**
* 重命名文件
*/
String newFile = "index.dirs/test6.txt";
String srcFile = "index.dirs/test6";
if (service.rename(srcFile, newFile)) {
System.out.println("重命名成功");
} else {
System.out.println("重命名失败");
}
/**
* 创建目录、重命名目录
*/
String path1 = "test.test";
if (service.createDirection(path1)) {
System.out.println("创建目录成功");
} else {
System.out.println("创建目录失败");
}
String path2 = "test.dirs";
if (service.rename(path1, path2)) {
System.out.println("重命名目录成功");
} else {
System.out.println("重命名目录失败");
}
/**
* 删除目录
*/
if (service.delete(path2)) {
System.out.println("删除目录成功");
} else {
System.out.println("删除目录失败");
}
/**
* 上传文件
*/
service.uploadFileToHdfs("/Users/liupengchun/Downloads/hadoop-logo.jpg", "/index.dirs");
/**
* 查询指定路径中的文件与目录信息。
*/
RemoteIterator<LocatedFileStatus> fsIterator = service.showFiles("/index.dirs");
LocatedFileStatus lfs;
while (fsIterator.hasNext()) {
lfs = fsIterator.next();
System.out.println(
(lfs.isDirectory() ? "文件夹" : "文件")
+ "文件大小:" + lfs.getLen()
+ "文件路径:" + lfs.getPath()
);
}
/**
* 查询指定文件在那个集群节点
*/
BlockLocation[] locations = service.getFileBlockLocation(file1);
if(locations != null && locations.length > 0){
for(BlockLocation location : locations){
System.out.println(location.getHosts()[0]);
}
}
/**
* 删除文件
*/
if (service.delete(file1)) {
System.out.println("删除文件成功");
} else {
System.out.println("删除文件失败");
}
/**
* 下载文件
*/
service.downloadFileFormHdfs("/Users/liupengchun/Downloads/hadoop-logo2.jpg", "/index.dirs/hadoop-logo.jpg");
}
}
下图为测试结果:
至此,HDFS-3.1.1 实验示例演示完毕。