一 简介
HDFS,它是一个文件系统,用于存储文件,通过目录树来定位文件:其次,它是分布式的,由很多服务器联合起来实现其功能,集群中的服务器有各自的角色。
HDFS的设计适合一次写入,多次读出的场景,且不支持文件的修改。适合用来做数据分析,并不适合用来做网盘应用。
二 体系结构和基本概念
1 特点:
- 可拓展
- 容错:有备份
- 高可用:对于多个Master
- 高吞吐:多台机器并发读
2 设计分布式文件系统思路
1 系统架构
- 有多台Masters,其中一台为Active状态,其他Standby
- 多个Slaves同时向Active的Master发送Heartbeat
- 若Active的Master出现宕机,则转向其他Master
2 日志存取
- Client有200M的log文件需要存储,首先与Master进行RPC通通信
- Master将有剩余空间的Slave传给Client
-
Client将log文件传给该Slave
- 当另一台Clinet需要读取时,同理通过Master
但是以上存在问题,没有副本。当该Slave宕机,数据无法读取。
因此存数据时,需要同时复制给其他Slaves。(通过Buket复制,而不是全部写完再复制)
三 HDFS Shell
查看HDFS Shell命令
在hadoop文件夹下的bin中执行
./hdfs dfs
命令
[-appendToFile <localsrc> ... <dst>] #追加内容
[-cat [-ignoreCrc] <src> ...]#查看内容
[-checksum <src> ...]#查看
[-chgrp [-R] GROUP PATH...]#改变所有组
[-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]#改变全新
[-chown [-R] [OWNER][:[GROUP]] PATH...]#改变所属用户
[-copyFromLocal [-f] [-p] [-l] [-d] [-t <thread count>] <localsrc> ... <dst>]#从本地复制一份上传到HDFS
[-copyToLocal [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]#吧HDFS考到本地
[-count [-q] [-h] [-v] [-t [<storage type>]] [-u] [-x] [-e] <path> ...]#统计数量
[-cp [-f] [-p | -p[topax]] [-d] <src> ... <dst>]#拷贝
[-createSnapshot <snapshotDir> [<snapshotName>]]#创建快照
[-deleteSnapshot <snapshotDir> <snapshotName>]#删除快照
[-df [-h] [<path> ...]]#查看磁盘使用情况
[-du [-s] [-h] [-v] [-x] <path> ...]
[-expunge]
[-find <path> ... <expression> ...]
[-get [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>] #下载
[-getfacl [-R] <path>]
[-getfattr [-R] {-n name | -d} [-e en] <path>]
[-getmerge [-nl] [-skip-empty-file] <src> <localdst>]
[-head <file>]
[-help [cmd ...]]
[-ls [-C] [-d] [-h] [-q] [-R] [-t] [-S] [-r] [-u] [-e] [<path> ...]]
[-mkdir [-p] <path> ...]
[-moveFromLocal <localsrc> ... <dst>]
[-moveToLocal <src> <localdst>]
[-mv <src> ... <dst>]
[-put [-f] [-p] [-l] [-d] <localsrc> ... <dst>] #上传
[-renameSnapshot <snapshotDir> <oldName> <newName>]
[-rm [-f] [-r|-R] [-skipTrash] [-safely] <src> ...] #删除
[-rmdir [--ignore-fail-on-non-empty] <dir> ...] #删除文件夹
[-setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set <acl_spec> <path>]]
[-setfattr {-n name [-v value] | -x name} <path>]
[-setrep [-R] [-w] <rep> <path> ... ]#修改副本数
[-stat [format] <path> ...] #查看文件信息
[-tail [-f] <file>] #查看末尾
[-test -[defsz] <path>]
[-text [-ignoreCrc] <src> ...] #查看内容
[-touch [-a] [-m] [-t TIMESTAMP ] [-c] <path> ...]
[-touchz <path> ...]
[-truncate [-w] <length> <path> ...]
[-usage [cmd ...]]
将HDFS加入环境变量
为方便操作,可将hdfs添加到环境变量
- 进入hadoop目录
vi /etc/profile
- 添加hadoop目录
export HADDOP_HOME=/bigdata/hadoop-3.2.0
- 添加path
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin
查看文件 -ls
查看HDFS下的文件
hdfs dfs -ls /
循环查看目录
hdfs dfs -ls -R
自动读取core-site.xml的信息,得到namenode的位置,不需要指定
查看Linux下的文件
ls
将文件写入HDFS -put
将文件上传到根目录
hdfs dfs -put 文件名 hdfs://主机名:端口号/
删除文件
hdfs dfs -rm 文件名
文件通过切割上传
BlockSize: 128M
该文件174.76M被切成两块
在该目录下可看到文件大小(单位为字节)
/bigdata/tmp/dfs/data/current/BP-595572733-127.0.0.1-1563891152282/current/finalized/subdir0/subdir0
红框中为切块的大小,相加即为文件大小
从HDFS上下载文件 -get
hdfs dfs -get /文件名 /保存路径/保存文件名
下载多个文件并合并为一个文件
hdfs dfs -getmerge /a.txt /b.txt /home/c.txt
a.txt,b.txt为下载的文件,c.txt为存储的文件名
查看文件 -cat 或 -text
hdfs dfs -cat /文件名(HDFS上的文件)
创建目录 -mkdir
hdfs dfs -mkdir -p log/2019
从HDFS上删除文件 -rm
hdfs dfs -rm /文件名
修改权限 -chmod
hdfs dfs -chmod +w /文件名
计数 -count
hdfs dfs -count /
四 HDFS的Java编程API
将本地文件上传至HDFS
1 创建maven项目
参考 https://blog.csdn.net/qq_35437792/article/details/80631434
2
package myhadoop.cn;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Before;
import org.junit.Test;
import java.io.*;
public class HDFSAPI {
private FileSystem fileSystem = null;
@Before
public void init() throws Exception {
//与HDFS建立连接,知道NameNode地址即可
Configuration conf = new Configuration();
System.setProperty("HADOOP_USER_NAME","root");
conf.set("fs.defaultFS", "hdfs://myhadoop.cn:9000");
fileSystem = FileSystem.get(conf);
}
@Test
public void testUpload() throws Exception {
//打开本地文件系统的一个文件作为输入流
InputStream in = new FileInputStream("C://Users//33598//Desktop//salary.txt");
//使用hdfs的filesystem打开一个输入流
OutputStream out = fileSystem.create(new Path("/salary.txt"));
IOUtils.copyBytes(in, out, 1024, true);
}
}
踩雷1
参考 https://blog.csdn.net/fragrant_no1/article/details/85842038
踩雷2 could only be written to 0 of the 1 minReplication nodes.
(忘记截图)
踩雷3 Permission denied: user=hadoop, access=WRITE, inode="/":root:supergroup:drwxr-xr-x
增加环境变量
并且重启IDEA
成功!
创建目录
@Test
public void testMkdir() throws Exception {
fileSystem.mkdirs(new Path("/a/b"));
fileSystem.close();
}
删除文件
@Test
public void testDel() throws Exception {
boolean flag = fileSystem.delete(new Path("/a"),false);
System.out.println(flag);
fileSystem.close();
}
delete()函数的第二个参数false为不是递归删除,true为递归删除。
以上代码删除刚才创建的a文件夹。由于a文件夹下还存在b文件夹,当参数为false时报错:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.fs.PathIsNotEmptyDirectoryException): `/a is non empty': Directory is not empty
改为true:
成功!
五 HDFS架构原理
概念
- 文件切块:打开一个文件进行写入(按字节),当到达切块大小后,将该文件关闭,重新打开另一个文件,继续写入。
- 副本的存放:当一台机器宕机后用户仍可从其他机器读取数据
- 元数据:上传的数据,datanode存放真正的数据,namenode存放数据的描述信息(元数据)
重要组件
- NameNode:负责管理
- DataNode:存储数据,维护block
-
SecondaryNameNode (伪分布式和非高可用的集群才有):帮助NameNode进行数据的同步(对磁盘和内存进行同步)
一个Rack1上放多台服务器,block存放在服务器上(绿色方块)。Client上运行程序,读取数据。首先向Namenode读取数据信息(文件存储份数,存储机器),反回元数据,Client根据返回的元数据,从服务器上读取。
写数据: - 将文件切分成blk_1
- 将数据包blk_1写到h0上,再进行水平复制到h1 h3上
- 将剩下的数据blk_2写到h0上,复制到h2 h4上
Namenode工作机制
- 目录树:NameNode记录文件夹的逻辑结构(并非真正的文件夹)
- fsimage: 磁盘中的数据(某一段时间的信息),最新信息在内存中
- edits: 标记是否上传成功,元数据每隔一段时间清理标记失败的文件
- fstime: 新版本不保存在这里,保存在Linux文件系统中
SecondaryNameNode工作机制
HA(高可用)
在新版中并不替换,而是生成新的名字保存。
DataNode工作机制
六 Hadoop的RPC通信机制
什么是RPC(Remote Procedure Call):(https://baike.baidu.com/item/%E8%BF%9C%E7%A8%8B%E8%BF%87%E7%A8%8B%E8%B0%83%E7%94%A8/7854346),它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。
例子
服务端
package myhadoop.rpc;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import java.io.IOException;
public class RPCServer implements RPCService{
public static void main(String[] args) throws HadoopIllegalArgumentException, IOException {
Configuration conf = new Configuration();
RPC.Server server = new RPC.Builder(conf)
.setProtocol(RPCService.class)
.setBindAddress("192.168.106.1")
.setPort(9527)
.setInstance(new RPCServer())
.build();
server.start();
}
public String sayHi(String name) {
return "Hi~" + name;
}
}
服务端接口
package myhadoop.rpc;
public interface RPCService {
//客户端指定的versionID,使客户端服务端一致
public static final long versionID = 10010L;
public String sayHi(String name);
}
客户端
package myhadoop.rpc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import java.io.IOException;
import java.net.InetSocketAddress;
public class RPCClient {
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
//在客户端获取代理对象,有代理对象就可以获取目标对象(RPCServer)的方法
RPCService proxy = RPC.getProxy(RPCService.class,10010, new InetSocketAddress("192.168.106.1", 9527),conf);
//
String result = proxy.sayHi("Tom");
System.out.println(result);
RPC.stopProxy(proxy);
}
}
客户端调用服务器段方法,最终将值返回给客户端