import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
import java.io.*;
/**
* 文件系统工具类
*/
public class FSUtil {
//获取FileSystem对象
public static FileSystem getFS() {
FileSystem fs = null;
try {
Configuration conf = new Configuration();
conf.setInt("io.file.buffer.size", 8192);
fs = FileSystem.newInstance(conf);
} catch (Exception e) {
//do noting
}
return fs;
}
//获取LocalFileSystem对象
public static LocalFileSystem getLFS() {
LocalFileSystem lfs = null;
try {
Configuration conf = new Configuration();
lfs = FileSystem.newInstanceLocal(conf);
} catch (Exception e) {
//do noting
}
return lfs;
}
//关闭文件对象
public static void closeFS(FileSystem fs) {
if (fs != null) {
try {
fs.close();
} catch (Exception e) {
// do nothing
}
}
}
/**
* 读取hdfs的文件写到console
*
* @throws IOException
*/
public static void downToConsole(FileSystem fs, String toFilePath) throws IOException {
FSDataInputStream fis = fs.open(new Path(toFilePath));
IOUtils.copyBytes(fis, System.out, 4096, true);
}
/**
* 读取hdfs的文件写到local
*/
public static void downToLocal(FileSystem fs, String fromHdfsPath, String toLocalPath) throws IOException {
FSDataInputStream fis = fs.open(new Path(fromHdfsPath));
OutputStream os = new FileOutputStream(new File(toLocalPath));
IOUtils.copyBytes(fis, os, 4096, true);
}
/**
* 创建目录
*/
public static Boolean mkdir(FileSystem fs, String dirName) throws IOException {
boolean isok = fs.mkdirs(new Path(dirName));
return isok;
}
/**
* 列出 hdfs或者local 中的文件列表
* 递归列出文件文件列表???
*/
public static void listFile(FileSystem fs, String dirName) throws IllegalArgumentException, IOException {
FileStatus[] fss = fs.listStatus(new Path(dirName));
if (fss.length == 0) {
System.out.println("内容为空");
return;
}
for (FileStatus f : fss) {
System.out.print("文件路径:" + f.getPath().toString() + " ");
System.out.print("文件名:" + f.getPath().getName() + " ");
System.out.print("文件大小:" + f.getLen() / 1024.0 + "kb" + " ");
System.out.print("文件用户:" + f.getOwner() + " ");
System.out.println("文件的权限:" + f.getPermission());
}
}
/**
* 获取集群磁盘情况
*
* @throws IOException
*/
public static void getDiskSource(FileSystem fs) throws IOException {
FsStatus fsstatus = fs.getStatus();
System.out.println("总容量:" + fsstatus.getCapacity() / 1024 / 1024 / 1024.0 + "GB");
System.out.println("已经使用的容量:" + fsstatus.getUsed() / 1024 / 1024.0 + "MB");
System.out.println("维持容量:" + fsstatus.getRemaining() / 1024 / 1024 / 1024.0 + "GB");
}
/**
* 获取DataNode信息
*/
public static void getDataNodeInfo(FileSystem fs) throws IOException {
//强转为分布式文件系统
DistributedFileSystem dis = (DistributedFileSystem) fs;
DatanodeInfo[] dinfo = dis.getDataNodeStats();
for (DatanodeInfo info : dinfo) {
System.out.print(info.getHostName() + " ");
System.out.print(info.getName() + " ");
System.out.println(info.getCapacity());
}
}
/**
* 查看块位置信息
*
* @param fileName
*/
public static void getBlockLocation(FileSystem fs, String fileName) throws IllegalArgumentException, IOException {
//获取文件状态
FileStatus fss = fs.getFileStatus(new Path(fileName));
//获取块位置信息
BlockLocation[] bls = fs.getFileBlockLocations(fss, 0, fss.getLen());
for (BlockLocation bl : bls) {
for (int i = 0; i < bl.getHosts().length; i++) {
System.out.print(bl.getHosts()[i] + " ");
System.out.print(bl.getTopologyPaths()[i] + " ");
System.out.println(bl.getNames()[i]);
}
}
}
/**
* 待进度的文件上传
*
* @param toHdfsPath
*/
public static void uploadWithProcess(FileSystem fs, String fromLocalPath, String toHdfsPath) throws IllegalArgumentException, IOException {
FileInputStream fis = new FileInputStream(new File(fromLocalPath));
FSDataOutputStream fos = fs.create(new Path(toHdfsPath), new Progressable() {
public void progress() {
try {
System.out.print("* ");
Thread.sleep(20);
} catch (InterruptedException e) {
//
}
}
});
IOUtils.copyBytes(fis, fos, 4096, true);
System.out.println("finished...");
}
/**
* 将本地文件copy到hdfs中(上传目录和文件均可以)
*/
public static void copyFromLocal(FileSystem fs, String fromLocalPath, String toHdfsPath) throws IllegalArgumentException, IOException {
fs.copyFromLocalFile(new Path(fromLocalPath), new Path(toHdfsPath));
}
/**
* 将本地文件copy到hdfs中(上传不同目录下的多个文件)
*/
public static void copyFromLocalOfMulti(FileSystem fs, Path[] fromLocalPaths, String toHdfsPath) throws IllegalArgumentException, IOException {
fs.copyFromLocalFile(false, false, fromLocalPaths, new Path(toHdfsPath));
}
/**
* 下载到本地(下载多个不同目录的文件??)
*/
public static void copyToLocal(FileSystem fs, String fromHdfsPath, String toLocalPath) throws IllegalArgumentException, IOException {
fs.copyToLocalFile(new Path(fromHdfsPath), new Path(toLocalPath));
}
/**
* 存在、删除
*
* @param path
*/
public static void Delete(FileSystem fs, String path) throws IllegalArgumentException, IOException {
//判断文件是否存在
//如果存在,在看是目录还是是文件
Path fileOrDir = new Path(path);
if (fs.exists(fileOrDir)) {
if (fs.isDirectory(fileOrDir)) {
//递归删除
fs.delete(fileOrDir, true);
} else {
fs.deleteOnExit(fileOrDir);
}
} else {
System.out.println("文件或者目录不存在");
}
}
/**
* 和并本地小文件并上传hdfs
*
* @param fileName
*/
public static void uploadWithMerge(FileSystem fs, LocalFileSystem lfs, String fileName, String toHdfsPath) throws IOException {
FileStatus[] fss = lfs.listStatus(new Path(fileName));
FSDataOutputStream fos = fs.create(new Path(toHdfsPath));
FileInputStream fis = null;
for (FileStatus f : fss) {
//获取本地文件列表的路径
String localFilePath = f.getPath().toString().replace("file:/", "");
//获取本地文件的数据流
fis = new FileInputStream(new File(localFilePath));
byte[] bytes = new byte[1024];
int length = 0;
while ((length = fis.read(bytes)) != -1) {
fos.write(bytes, 0, length);
}
}
}
/**
* 重命名或者移动
*/
public static void rename(FileSystem fs, String oldName, String newName) throws IllegalArgumentException, IOException {
fs.rename(new Path(oldName), new Path(newName));
}
/**
* 追加
*/
public static void append(FileSystem fs, String inputFilePath, String outFilePath) throws IllegalArgumentException, IOException {
FSDataOutputStream fos = fs.append(new Path(outFilePath), 4096);
InputStream is = new FileInputStream(new File(inputFilePath));
IOUtils.copyBytes(is, fos, 4096, true);
}
}
FSUtil_文件系统工具类
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- 1、日历和清单系统的工具推荐 (1)手机自带日历系统 苹果手机的日历系统可同步 (2)清单系统APP1:OmniF...
- -------------------------常用工具----------------------------...
- 译:cod9 注意:此图集包含直面战争的画面,请读者自行斟酌。 第一次世界大战特辑图集系列:(点击蓝色标题处可转到...