1、Hadoop概要

、HDFS

        Hadoop中的分布式文件系统,高容错(数据库blcok备份),可扩展,适合存储大文件,不适合存储小文件,不适合处理低延时的数据(HBase更好),一次写入、多次读写,不支持多用户写入及任意修改文件。

1、原理架构

(1)NameNode:主节点,负责管理文件系统的命名空间,将HDFS的元数据存储在NameNode节点的内存中;负责响应客户端对文件的读写请求。

(2)DataNode:数据节点,主要负责数据的读写, 存储block以及block元数据到datanode本地磁盘(此处的元数据包括数据块的长度、块数据的校验和、时间戳);定期向NameNode发送心跳,超过10分钟节点不可用,6小时上报当前DataNode上的块状态。

(3)SecondaryNameNode:辅助节点,定期做checkpoint操作,合并NameNode的fsimage及editlog,NameNode就有了最新的fsimage文件和更小的editslog文件,可减少恢复系统的时间,每小时或每分钟editslog含有100万个事务,就创建一个checkpoint检查点。

心跳机制

    集群的心跳机制,让集群中各节点形成一个整体,可以判断DataNode是否在线;知道各DataNode的存储情况;集群刚开始启动时,99.9%的block没有达到最小副本数1,集群处于安全模式,涉及BlockReport;

首先,NameNode启动时会开一个ipc server;DataNode每3秒钟向NameNode发送一个心跳,心跳返回结果带有NameNode给该DataNode的命令;每6小时向NameNode上报当前DataNode上的块状态报告,块状态报告包含了一个该 Datanode上所有数据块的列表;超过10分钟没有收到某个DataNode 的心跳,则认为该DataNode节点不可用。

负载均衡

    在机器之间磁盘利用率不平衡、DataNode节点出现故障、增添新的DataNode的时候可能造成不均衡;

     可以手动触发负载均衡: sbin/start-balancer.sh -t 5% # 磁盘利用率最高的节点若比最少的节点,大于5%,触发均衡

2、SecondaryNameNode

引入原因:

    ·客户端对HDFS的增删重命名等操作,会保存再次namenode的editlog中;

    ·系统出故障时,可从editlog进行恢复;

    · editlog日志大小随时间变在越来越大,系统重启根据日志恢复的时候会越来越长;

    ·为解决恢复系统时间长:设置检查点checkpoint,定期将namenode内存中元数据持久化保存到磁盘,形成fsimage文件,恢复系统时不再只依赖editlog日志,先从fsimage恢复出元数据,再到回放editlog日志检查点之后记录;

    ·但对editlog日志文件的保存策略未改变,editlog日志依然不断增大;

    ·为解决editlog大,引入部署在另外一节点secondarynamenode,定期做checkpoint操作,合并fsimage及editlog,nameNode就有了最新的fsimage文件和更小的edits文件。

执行过程:

        先请求NameNode继续滚动写edits日志;

        再GET请求读取NameNode当前fsimage及edits;

        然后读取fsimage到内存中,并回放执行edits中的每个操作,创建一个新的fsimage 文件,后缀为.ckpt;

        最后PUT请求将新的fsimage发送到原NameNode,原NameNode用新的fsimage替换旧的fsimage,

创建checkpoint两大条件:

    ·SecondaryNameNode每隔1小时创建一个检查点;

    ·Secondary NameNode每1分钟检查一次,从上一检查点开始,edits日志文件中是否已包括100万个事务,如果是,也会创建检查点;

NameNode与SecondaryNameNode 的区别与联系? 

 2)区别,功能不同

(1)NameNode负责管理元数据,以及每一路径(文件)所对应的数据块信息。

(2)SecondaryNameNode,主要定期合并NameNode的fsimage及editlog

3)联系:

(1)SecondaryNameNode中保存了一份和namenode一致的镜像文件(fsimage)和编辑日志(edits)。

(2)在主namenode发生故障时(假设没有及时备份数据),可以从SecondaryNameNode恢复数据。

3、数据存储

(1)元数据管理

    ·元数据:关于文件或目录的描述信息,如文件所在路径、文件名称、文件类型等等,这些信息称为文件的元数据metadata

    ·命名空间:文件系统中,为了便于管理存储介质上的,给每个目录、目录中的文件、子目录都起了名字,这样形成的层级结构,称之为命名空间;

·HDFS元数据:文件目录树、所有的文件(目录)名称、文件属性(生成时间、副本、权限)、每个文件的块列表、每个block块所在的datanode列表;每个文件、目录、block占用大概 150Byte字节的元数据 ;元数据metadata保存在NameNode内存中,所以HDFS适合存储大文件,不适合存储小文件

    HDFS元数据信息以两种形式持久化保存:①编辑日志edits log②命名空间镜像文件fsimage

    ·edits log:HDFS编辑日志文件,保存客户端对HDFS的所有更改记录,如增、删、重命名文件(目录),这些操作会修改HDFS目录树;NameNode会在编辑日志edit日志中记录下来;类似mysql的binlog。一旦系统出故障,可从editlog进行恢复

    ·fsimage:HDFS元数据镜像文件,即将namenode内存中的数据落入磁盘生成的文件;保存了文件系统目录树信息以及文件、块、datanode的映射关系 

(2)分块存储

    数据分块存储和副本的存放,是保证可靠性和高性能的关键:  

    向HDFS上传文件,是按照128M为单位,切分成一个个block,分散的存储在集群的不同数据节点datanode上。如果每个block只有一份的话,当block所在的节点宕机后,此block将无法访问,进而导致文件无法完整读取;为保正数据的可用及容错,HDFS设计成每个block共有三份,即三个副本;实际机房中,会有机架,每个机架上若干服务器

4数据流程

请求上传——检查目录——可以上传 

查询Datanode信息——分配datanode 

建立数据流——根据管道写数据—— 循环写入其他block

1)请求上传:客户端向namenode请求上传文件,namenode检查目标文件是否已存在,父目录是否存在。 namenode返回是否可以上传。

2)分配datanode:客户端请求第一个block上传到哪几个datanode服务器上,namenode返回3个datanode节点,如dn1、dn2、dn3。

3)建立数据流管道:客户端请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,dn1、dn2、dn3逐级应答客户端, 建立数据流管道pipeline ;

4)根据管道写数据:客户端开始往dn1上传第一个block(先从磁盘读取数据放到一个本地内存缓存),以packet为单位,dn1收到一个packet就会传给pipeline中的下一个 dn2,直到最后一个dn3;dn1每传完一个packet,会放入一个应答队列ackQueue等待应答,最后一个datanode成功存储之后,会返回传递至客户端ack packet(确认队列),成功收到ack后,会将packet删除,否则重新发送。

5)循环写入其他block:当一个block传输完成之后,客户端再次请求namenode上传第二个block的服务器。文件最后一个block块数据写完后,会再发送一个空的packet,表示当前block写完了,然后关闭pipeline;

5读数据流程

1)客户端向namenode请求下载文件,namenode通过查询元数据,找到文件块所在的datanode地址。

2)挑选一台datanode(就近原则,然后随机)服务器,请求读取数据。

3)datanode开始传输数据给客户端(从磁盘里面读取数据放入流,以packet为单位来做校验)。

4)客户端以packet为单位接收,先在本地缓存,然后写入目标文件。

6、小文件治理

NameNode存储着文件系统的元数据,每个文件、目录、块大概有150字节的元数据;小文件数量多会大量占用namenode的内存; 使namenode读取元数据速度变慢, 启动时间延长; 还因为占用内存过大, 导致gc时间增加等.

解决办法:两个角度,

一是,从数据源入手,如每小时抽取一次改为每天抽取一次等方法来积累数据量.

二是,选择合并.HAR文件方案,Sequence Files方案

    如果小文件无可避免,一般就采用合并的方式解决. 可以写一个MR任务读取某个目录下的所有小文件, 并重写为一个大文件.

    SequenceFile文件,是一种由header信息和一条条record记录组成的文件。每个record是键值对形式,小文件名作为当前record的键,小文件的内容作为当前record的值;

7、高可用HA

    对于HDFS,nameNode存储元数据在内存中,并负责管理文件系统的命名空间和客户端对HDFS的读写请求。只存在一个nameNode,一旦发生“单点故障”,会使整个系统失效。

    HDFS2.x采用了HA(High Availability高可用)架构。(HDFS HA可看作为NN和SN的优化);在HA集群中,可设置两个nameNode,一个处于“活跃(Active)”状态,另一个处于“待命(Standby)”状态。由zookeeper确保一主一备,主备切换

如何热备份元数据:

    Standby nameNode是ActivenameNode的“热备份”,因此Active nameNode的状态信息必须实时同步到StandbynameNode。

    Active nameNode将更新数据写入到共享存储系统,StandbynameNode一直监听该系统,一旦发现有新的数据写入,就立即从公共存储系统中读取这些数据并加载到StandbynameNode自己内存中,从而保证元数据与ActivenameNode状态一致。

    块报告:nameNode保存了数据块到实际存储位置的映射信息,为了实现故障时的快速切换,必须保证StandbynameNode中也包含最新的块映射信息。因此需要给所有DataNode配置Active和Standby两个nameNode的地址,把块的位置和心跳信息同时发送到两个nameNode上。

8、Hadoop联邦

HA高可用解决了单点故障问题,但HA本质上还是单个nameNode工作,在扩展性、整体性能和隔离性方面仍有问题。

·扩展性:元数据存储在nameNode内存中,受限于内存上限(每个文件、目录、block占用约150字节)

·整体性能:吞吐量受单个NN的影响

·隔离性:一个程序可能会影响其他程序的运行,如果一个程序消耗过多资源会导致其他程序无法顺利运行

HDFS联邦,解决扩展性、整体性能和隔离性

·扩展性:有多个命名空间;每个命名空间有一个nameNode或一主一备两个nameNode,使得HDFS的命名服务能够水平扩展;

·整体性能:多个nameNode分别管理各自命名空间和块,相互独立,不需要彼此协调;

9、文件压缩

·gzip:优点是压缩率高,速度快。Hadoop支持与直接处理文本一样。缺点不支持split,当文件压缩在128m内,都可以用gzip;

·bzip2:支持split,很高的压缩率,比gzip高,hadoop支持但不支持native,linux自带命令使用方便。缺点压缩解压速度慢

·Izo: 优点压缩速度快,合理的压缩率;支持split,是最流行的压缩格式。支持native库;缺点 比gzip压缩率低,hadoop本身不支持,需要安装;在应用中对lzo格式文件需要处理如 指定inputformat为lzo格式;

·Snappy:压缩高速,压缩率合理,支持本地库,不支持split,hadoop不支持,要安装linux没有对应命令;当MR输出数据较大,作为到reduce数据压缩格式

 、MapReduce

    MapReduce,是采用一种分而治之的思想,设计出来的分布式离线计算框架,输入输出都是hdfs。由两个阶段组成:Map阶段(切分成一个个小的任务);Reduce阶段(汇总小任务的结果)

     map任务一次读取block的一行数据,将当前所读行的行首相对于当前block开始处的字节偏移量作为key(0),当前行的内容作为value,以kv对的形式输入map()方法;   map()方法内,按需求,执行业务代码;   map()方法的输出作为reduce()的输入;    输入文件有几个block,就会生成几个map任务;

     reduce任务通过网络将各map任务输出结果中,属于自己的数据拉取过来,key相同的键值对作为一组,调用一次reduce();reduce任务生成一个结果文件,文件写入HDFS;reduce任务的个数,由程序中编程指定:job.setNumReduceTasks(4)

    shuffle主要指的是map端的输出作为reduce端输入的过程

map端的shuffle

(1)环形内存缓冲:

        每个map任务都有一个对应的环形内存缓冲区;map()方法输出kv对时,先写入到环形缓冲区(默认100M,当内容占据80%缓冲区空间后,由一个后台线程将缓冲区中的数据溢出写到一个磁盘文件。

    在溢出写的过程中,map任务可以继续向环形缓冲区写入数据;但是若写入速度大于溢出写的速度,最终造成100m占满后,map任务会暂停向环形缓冲区中写数据的过程;只执行溢出写的过程;直到环形缓冲区的数据全部溢出写到磁盘,才恢复向缓冲区写入

(2)后台线程溢写磁盘过程,

    1)分区:先对每个溢写的kv对根据key进行hash分区;分区的个数由reduce任务数决定;自定义分区,实现Partitioner接口,在getPartition()中实现分区逻辑

    2)排序:每个分区中,每个kv对根据key在内存中排序;

    3)可选combine聚合:若设置了map端本地聚合combiner,则对每个分区中,排好序的数据做combine预聚合操作;

    4)可选压缩:若设置了对map输出压缩的功能,会对溢写数据压缩


reduce端的shuffle

 (1)拉取

    reduce task会在每个map task运行完成后,通过HTTP获得map task输出中,属于自己的分区数据(许多kv对),如果map输出数据比较小,先保存在reduce的jvm内存中,否则直接写入reduce磁盘。

(2)归并merge:

    一旦内存缓冲区达到阈值(默认0.66)或map输出数的阈值(默认1000),则触发归并merge,结果写到本地磁盘。

(3)combine(可选):若MR编程指定了combine,在归并过程中会执行combine操作


2、数据倾斜

    MR数据倾斜,一般是指map端输出数据中存在数据频率倾斜的状况,即部分输出键的数据量远远大于其它的输出键,导致map和reduce的任务执行时间大为延长,也会让需要缓存数据集的操作消耗更多的内存资源。

造成原因:

        ·原数据频率不一致,某些key键值对数量远多于其他键的键值对,导致分区时分区不均匀,一些分区中数据多,一些少

        ·原数据大小不同,某些key键值对的大小远远大于平均值。对缓存造成较大的影响,乃至导致OutOfMemoryError异常。

如何减缓数据倾斜主要是分区不均匀,

    预聚合Combine聚合并精简数据。

    ②自定义分区根据输出键背景知识,进行自定义分区。

    抽样范围分区先对原数据进行抽样,得到的结果集,通过TotalOrderPartitioner中范围分区器,预设分区边界值,进行分区。

    ④数据大小倾斜,调参line.maxlength,限制RecordReader读取最大长度

3、代码

 继承Mapper类,实现map()方法;

继承Reducer类,实现reduce()方法

 三、Yarn

1、原理架构

YARN(Yet Another Resource Negotiator)是Hadoop2.0资源管理的子项目

1) Resource Manager:全局资源管理器,一个集群只有一个RM,类似老总。 负责和AM(Application Master)交互,资源调度、资源分配等;

2)Node Manager:一台机器上的管理者,类似于部门经理。管理着本机上若干小弟Containers的生命周期、监视资源和跟踪节点健康并定时上报给RM;接收并处理来自AM的Container启动/停止等各种请求。

3)Application Master:应用程序的管理器,类似项目经理,一个应用程序只有一个AM。负责任务开始时找RM要资源,任务完成时向RM注销自己,释放资源;与NM通信以启动/停止任务;接收NM同步的任务进度信息。

ApplicationMaster可以在容器内运行任何类型的任务,不同的 ApplicationMaster 被分布到不同的节点上,因此它们之间不会相互影响。

 Container:一台机器上具体提供运算资源,类似员工,将设备上的内存、CPU、磁盘、网络等资源封装在一起的抽象概念——“资源容器”,Container是一个动态资源分配单位,为了限定每个任务使用的资源量。

Client向 ResourceManager 提交的每一个应用程序都必须有一个 ApplicationMaster,它经过 ResourceManager 分配资源后,运行于某一个 Slave 节点的 Container 中,具体做事情的 Task,同样也运行与某一个 Slave 节点的 Container 中。

2执行过程

        Application在Yarn中的执行过程,整个执行过程可以总结为三步:应用程序提交 -> 启动应用的ApplicationMaster实例 -> ApplicationMaster实例管理应用程序的执行

精简版的:

    步骤1:客户端程序向 ResourceManager 提交应用,请求一个 RM的ApplicationMaster 实例,并请求传递给RM的scheduler(调度器);调度器分配container(容器)

    步骤2:ResourceManager 找到一个可以运行一个 Container 的 NodeManager,并在这个 Container 中启动 ApplicationMaster 实例;

    步骤3:ApplicationMaster 与 ResourceManager 注册进行通信,为内部要执行的任务申请资源,一旦得到资源后,将于 NodeManager 通信,以启动对应的 Task;

    步骤4:所有任务运行完成后,ApplicationMaster 向 ResourceManager 注销,整个应用程序运行结束。

2、调度器

在YARN中有三种调度器可以选择:FIFO Scheduler ,Capacity Scheduler,Fair Scheduler

3、yarn状态

yarn的web ui上能够看到yarn 应用程序分为如下几个状态:

- NEW -----新建状态

- NEW_SAVING-----新建保存状态

- SUBMITTED-----提交状态

- ACCEPTED-----接受状态

- RUNNING-----运行状态

- FINISHED-----完成状态

- FAILED-----失败状态

- KILLED-----杀掉状态

、zookeeper

ZooKeeper是分布式应用程序的协调服务主从架构leader;follower或observer

主要通过本身的文件系统通知机制,维护和监控存储的数据的状态变化,达到基于数据的集群管理,主要用来解决分布式集群中应用系统的一致性问题(指数据在多个副本之间保持一致的特性)。为了保证事务的顺序一致性,ZK采用递增的事务id号(zxid)来标识事务,所有提议(proposal)都有zxid。

ZooKeeper   简版文件系统(Znode) +原语基本命令+通知机制(Watcher)。

1、保证事务的顺序一致性

(1)zookeeper采用了全局递增的事务Id来标识,所有的 proposal(提议)在被提出时候,加上了 zxid。zxid实际上是一个 64 位的数字,高32 位是 epoch用来标识 leader 周期,如果有新的 leader 产生出来,epoch会自增;低32位用来递增计数。

(2)当新产生proposal的时候,会依据数据库的两阶段过程,首先会向其他的 server 发出事务执行请求,如果超过半数的机器都能执行并且能够成功,那么就会开始执行。

客户端的读请求可以被集群中的任意一台机器处理,如果读请求在节点上注册了监听器,这个监听器也是由所连接的zookeeper机器来处理。对于写请求,这些请求会同时发给其他zookeeper机器并且达成一致后,请求才会返回成功。因此, 随着 zookeeper 的集群机器增多,读请求的吞吐会提高但是写请求的吞吐会下降。

[if !supportLists]2、[endif]ZAB协议

ZAB协议是Zookeeper 专门设计的一种支持崩溃恢复的原子广播协议。

当整个zookeeper集群刚刚启动或者 Leader 服务器宕机、重启或者网络故障导致不存在过半的服务器与 Leader 服务器保持正常通信时,所有进程(服务器)进入崩溃恢复模式,首先选举产生新的 Leader 服务器,然后集群中 Follower 服务器开始与新的 Leader 服务器进行数据同步,当集群中超过半数机器与该 Leader服务器完成数据同步之后,退出恢复模式进入消息广播模式,Leader 服务器开始接收客户端的事务请求生成事物提案来进行事务请求处理。

Zab协议两种模式 :恢复模式(选主),广播模式(同步)

恢复模式(选主)分两种情况:全新集群leader选举、非全新集群leader选举;

集群中过半数Server启动后,才能选举出Leader;投票信息结构为(sid, zxid),服务器ID,事务ID;规则为:zxid大的server胜出;zxid相等,sid大的胜出

选主后的数据同步进行广播模式

leader构建NEWLEADER封包,包含leader中最大的zxid值;广播给其它follower;follower收到后,如果自己的最大zxid小于leader的,则需要与leader状态同步;否则不需要;leader给需要同步的每个follower创建LearnerHandler线程,负责数据同步请求;leader主线程等待LearnHandler线程处理结果;只有多数follower完成同步,leader才开始对外服务,响应写请求、

该协议需要做到以下几点:

(1)集群在半数以下节点宕机的情况下能正常对外提供服务;

(2)客户端的写请求,全部转交给leader来处理,leader需确保写变更,能实时同步给所有follower及observer;

(3)leader宕机或整个集群重启时,需要确保那些已经在leader服务器上提交的事务最终被所有服务器都提交,确保丢弃那些只在leader服务器上被提出的事务,并保证集群能快速恢复到故障前的状态。 

3、DFS HA方案

主要分两部分:①元数据同步 ②主备切换

①元数据同步:

·在同一个HDFS集群,运行两个互为主备的NameNode节点,在主备切换过程中,新的Active NameNode必须确保与原Active NamNode元数据同步完成,才能对外提供服务。

·用JournalNode集群作为共享存储系统,客户端对HDFS做操作 ,同时会记录到JournalNode集群存储HDFS新产生的元数据。

·当有新数据写入JournalNode集群时,Standby NameNode能监听到此情况,将新数据同步过来。这样,Active NameNode(写入)和Standby NameNode(读取)实现元数据同步 。

②主备切换:

·每个NameNode节点上各有一个ZKFC进程,ZKFC会监控NameNode的健康状况,当发现Active NameNode异常时,通过Zookeeper集群进行namenode主备选举,完成Active和Standby状态的切换

4、四种类型的数据节点 Znod

·(1)PERSISTENT-持久节点

除非手动删除,否则节点一直存在于Zookeeper上

·(2)EPHEMERAL-临时节点

临时节点的生命周期与客户端会话绑定,一旦客户端会话失效(客户端与

zookeeper连接断开不一定会话失效),那么这个客户端创建的所有临时节点 都会被移除。

·(3)PERSISTENT_SEQUENTIAL-持久顺序节点

基本特性同持久节点,只是增加了顺序属性,节点名后边会追加一个由父节 点维护的自增整型数字。

·(4)EPHEMERAL_SEQUENTIAL-临时顺序节点

基本特性同临时节点,增加了顺序属性,节点名后边会追加一个由父节点维 护的自增整型数字。

5、Server工作状态

服务器具有四种状态,分别是LOOKING、FOLLOWING、LEADING、OBSERVING。

1、LOOKING:寻找Leader状态。当服务器处于该状态时,它会认为当前集群中

没有Leader,因此需要进入 Leader 选举状态。

2、FOLLOWING:跟随者状态。表明当前服务器角色是Follower。

3、LEADING:领导者状态。表明当前服务器角色是Leader。

4、OBSERVING:观察者状态。表明当前服务器角色是Observer。

6、zk节点宕机如何处理?

Zookeeper本身也是集群,推荐配置不少于 3 个服务器。Zookeeper 自身也要保证当一个节点宕机时,其他节点会继续提供服务。

·如果是一个Follower宕机,还有 2 台服务器提供访问,因为 Zookeeper 上的数据是有多个副本的,数据并不会丢失;

·如果是一个Leader宕机,Zookeeper 会选举出新的 Leader。 ZK 集群的机制是只要超过半数的节点正常,集群就能正常提供服务。只有在 ZK 节点挂得太多,只剩一半或不到一半节点能工作,集群才失效。

·所以

3个节点的 cluster 可以挂掉 1 个节点(leader 可以得到 2 票>1.5)

2个节点的 cluster 就不能挂掉任何 1 个节点了(leader 可以得到 1 票<=1)

7、集群支持动态添加机器吗?

Zookeeper在水平扩容这方面不太好。两种方式:

全部重启:关闭所有Zookeeper服务,修改配置之后启动。不影响之前客户端的

会话。

逐个重启:在过半存活即可用的原则下,一台机器重启不影响整个集群对外提供

服务。这是比较常用的方式。

3.5版本开始支持动态扩容。

8、Zk的java客户端都有哪些?

java客户端:zk 自带的 zkclient 及 Apache 开源的 Curator。

常用命令:ls get set create delete等


  七、Kafka

·kafka是一个分布式消息系统。具有高性能、持久化、多副本备份、横向扩展能力。将消息保存在磁盘中,以顺序读写方式访问磁盘,避免随机读写导致性能瓶颈。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。

·Kafka集群包含一个或多个服务器,服务器节点称为broker,broker存储topic的数据。broker可分为Controller与follower。Controller管理集群broker的上下线,所有topic的分区副本分配和leaderPartition选举等工作

·每条发布到Kafka集群的消息都有一个类别Topic,Topic像一个消息队列,每个topic包含一个或多个partition,Kafka分配的单位是partition。每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition,其他partition为flower作为备用选主。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。

·offset:消费者在对应分区上已经消费的消息数(位置),kafka0.8 版本之前offset保存在zookeeper上。之后offset保存在kafka集群上。

1、kafka的文件存储机制

·同一个topic下,有多个不同的partition,每个partition为一个目录。partition命名的规则是,topic的名称加上一个序号,序号从0开始。

·每一个partition目录下的文件,被平均切割成大小相等的数据文件(每一个数据文件都被称为一个段(segment file);每一个segment段消息,数量不一定相等,使得老的segment可以被快速清除。默认保留7天的数据,每次满1G后,在写入到一个新的文件中。

·每一个partition只需要支持顺序读写就可以,也就是说它只会往文件的末尾追加数据,这就是顺序写的过程,生产者只会对每一个partition做数据的追加(写操作)。

·在partition目录下,有两类文件,一类是以log为后缀的文件,一类是以index为后缀的文件,每一个log文件和一个index文件相对应,这一对文件就是一个segment file,也就是一个段。log文件:就是数据文件,里面存放的就是消息, index文件:是索引文件,记录元数据信息。

·元数据指向,对应的数据文件(log文件)中消息的物理偏移地址。log文件达到1个G后滚动重新生成新的log文件。

2、Kafka内部数据不丢失

调整Producer,consumer,broker的各项参数,保证Kafka内部数据不丢失

producer:acks参数、retry参数

高可用型,配置:acks = all,retries > 0 retry.backoff.ms=100(毫秒) (并根据实际情况设置retry可能恢复的间隔时间)

优点:这样保证了producer端每发送一条消息都要成功,如果不成功并将消息缓存起来,等异常恢复后再次发送。缺点:这样保证了高可用,但是这会导致集群的吞吐量不是很高,因为数据发送到broker之后,leader要将数据同步到fllower上,如果网络带宽、不稳定等情况时,ack响应时间会更长

2.折中型,配置:acks = 1retries > 0 retries时间间隔设置 (并根据实际情况设置retries可能恢复的间隔时间)

优点:保证了消息的可靠性和吞吐量,是个折中的方案;缺点:性能处于2者中间

3.高吞吐型配置:acks = 0

优点:可以相对容忍一些数据的丢失,吞吐量大,可以接收大量请求;缺点:不知道发送的消息是否成功

② Consumer: group.id 、auto.offset.reset 、enable.auto.commit

1设置consumergroup分组的id,group.id:如果为空,则会报异常

2设置从何处开始进行消费auto.offset.reset = earliest(最早) /latest(最晚)

3设置是否开启自动提交消费位移的功能,默认开启 enable.auto.commit= true/false(默认true)

Broker:replication-factor、min.insync.replicas、unclean.leander.election.enable

1.replication-factor >=2

在创建topic时会通过replication-factor来创建副本的个数,它提高了kafka的高可用性,同时,它允许n-1台broker挂掉,设置好合理的副本因子对kafka整体性能是非常有帮助的,通常是3个,极限是5个,如果多了也会影响开销。

2.min.insync.replicas = 2

分区ISR队列集合中最少有多少个副本,默认值是1

3.unclean.leander.election.enable = false

是否允许从ISR队列中选举leader副本,默认值是false,如果设置成true,则可能会造成数据丢失。

3、kafka调优,提升生产者的吞吐量

1)设置发送消息的缓冲区buffer.memory:默认32MB。如果发送消息速度小于写入消息速度,就会导致缓冲区写满,此时生产消息就会阻塞住,所以说这里就应该多做一些压测,尽可能保证说这块缓冲区不会被写满导致生产行为被阻塞住;

2)设置压缩compression.type,默认是none,不压缩,但是也可以使用lz4压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大producer端的cpu开销。

3)设置batch的大小batch.size,默认16kb,就是一个batch满了16kb就发送出去,一般在实际生产环境,这个batch的值可以增大一些来提升吞吐量。如果batch太小,会导致频繁网络请求,吞吐量下降;如果batch太大,会导致一条消息需要等待很久才能被发送出去,而且会让内存缓冲区有很大压力,过多数据缓冲在内存里

4)设置消息的发送延迟linger.ms,这个值默认是0,意思就是消息必须立即被发送,但是这是不对的。一般设置一个100毫秒之内的,这样的话就是说,这个消息被发送出去后进入一个batch,如果100毫秒内,这个batch满了16kb,自然就会发送出去。但是如果100毫秒内,batch没满,那么也必须把消息发送出去了,不能让消息的发送延迟时间太长,也避免给内存造成过大的一个压力。

4、sparkStreaming整合kafka

sparkStreaming对接kafka两种方式:

[if !supportLists]1. [endif]Receiver模式,由kafka将数据发送数据,Spark Streaming被动接收数据;

在spark的executor当中启动了一些receiver的线程,专门去kafka拉取数据,拉取回来的数据这些receiver不会处理,然后另外一些线程专门来处理数据,基于kafka的high level API进行消费,offset自动保存到了zk当中去了,不用我们主动去维护offset的值

问题:拉取数据的线程以及处理数据的线程互相不会通信,造成问题:处理数据线程挂掉了,拉取数据的线程还在继续拉取数据,数据全部都堆积在execotr里面了      

2. Direct模式,由Spark Streaming主动去kafka中拉取数据。

不再单独启动线程去拉取数据,获取到的数据也不用保存在executor内存里面了,获取到的数据直接就进行处理。

问题:使用kafka的low level API进行消费,需要自己手动的维护offset值

sparkStreaming整合kafka官网提供两个jar包

一个是基于0.8版本整合:提供两种方式整合,receiverdirect方式一个是基于0.10版本整合:只提供了direct方式整合。

5、在Kafka中broker的意义是什么?

在Kafka集群中,broker指Kafka服务器。

术语解析:

名称说明

Topic主题,可以理解为一个队列

Partition分区,为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序

Offset偏移量,kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka

Broker一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic

Producer消息生产者,向kafka broker发消息的客户端

Consumer消息消费者,向kafka broker取消息的客户端

Consumer Group消费者组,这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic;

6、Kafka服务器能接收到的最大信息是多少?

Kafka服务器可以接收到的消息的最大大小是1000000字节。

7、Kafka中的ZooKeeper是什么?Kafka是否可以脱离ZooKeeper独立运行?

Zookeeper是一个开放源码的、高性能的协调服务,它用于Kafka的分布式应用。

不可以,不可能越过Zookeeper直接联系Kafka broker,一旦Zookeeper停止工作,它就不能服务客户端请求。

Zookeeper主要用于在集群中不同节点之间进行通信,在Kafka中,它被用于提交偏移量,因此如果节点在任何情况下都失败了,它都可以从之前提交的偏移量中获取,除此之外,它还执行其他活动,如: leader检测、分布式同步、配置管理、识别新节点何时离开或连接、集群、节点实时状态等等。

8、解释Kafka的用户如何消费信息?

在Kafka中传递消息是通过使用sendfile API完成的。它支持将字节Socket转移到磁盘,通过内核空间保存副本,并在内核用户之间调用内核。

9、解释如何提高远程用户的吞吐量?

如果用户位于与broker不同的数据中心,则可能需要调优Socket缓冲区大小,以对长网络延迟进行摊销。

10、解释一下,在数据制作过程中,你如何能从Kafka得到准确的信息?

在数据中,为了精确地获得Kafka的消息,你必须遵循两件事: 在数据消耗期间避免重复,在数据生产过程中避免重复。

这里有两种方法,可以在数据生成时准确地获得一个语义:

每个分区使用一个单独的写入器,每当你发现一个网络错误,检查该分区中的最后一条消息,以查看您的最后一次写入是否成功

在消息中包含一个主键(UUID或其他),并在用户中进行反复制

11、解释如何减少ISR中的扰动?broker什么时候离开ISR?(☆☆☆☆☆)

ISR是一组与leaders完全同步的消息副本,也就是说ISR中包含了所有提交的消息。ISR应该总是包含所有的副本,直到出现真正的故障。如果一个副本从leader中脱离出来,将会从ISR中删除。

12、Kafka为什么需要复制?

Kafka的信息复制确保了任何已发布的消息不会丢失,并且可以在机器错误、程序错误或更常见些的软件升级中使用。

13、如果副本在ISR中停留了很长时间表明什么?

如果一个副本在ISR中保留了很长一段时间,那么它就表明,跟踪器无法像在leader收集数据那样快速地获取数据。

14、请说明如果首选的副本不在ISR中会发生什么?

如果首选的副本不在ISR中,控制器将无法将leadership转移到首选的副本。

15、Kafka有可能在生产后发生消息偏移吗?

在大多数队列系统中,作为生产者的类无法做到这一点,它的作用是触发并忘记消息。broker将完成剩下的工作,比如使用id进行适当的元数据处理、偏移量等。

作为消息的用户,你可以从Kafka broker中获得补偿。如果你注视SimpleConsumer类,你会注意到它会获取包括偏移量作为列表的MultiFetchResponse对象。此外,当你对Kafka消息进行迭代时,你会拥有包括偏移量和消息发送的MessageAndOffset对象。

16、kafka 的消息投递保证机制以及如何实现?(☆☆☆☆☆)

Kafka支持三种消息投递语义:

① At most once 消息可能会丢,但绝不会重复传递

② At least one 消息绝不会丢,但可能会重复传递

③ Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户想要的

consumer在从broker读取消息后,可以选择commit,该操作会在Zookeeper中存下该consumer在该partition下读取的消息的offset,该consumer下一次再读该partition时会从下一条开始读取。如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同。

可以将consumer设置为autocommit,即consumer一旦读到数据立即自动commit。如果只讨论这一读取消息的过程,那Kafka是确保了Exactly once。但实际上实际使用中consumer并非读取完数据就结束了,而是要进行进一步处理,而数据处理与commit的顺序在很大程度上决定了消息从broker和consumer的delivery guarantee semantic。

·读完消息先commit再处理消息。这种模式下,如果consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于At most once。

·读完消息先处理再commit消费状态(保存offset)。这种模式下,如果在处理完消息之后commit之前Consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了,这就对应于At least once。

·如果一定要做到Exactly once,就需要协调offset和实际操作的输出。经典的做法是引入两阶段提交,但由于许多输出系统不支持两阶段提交,更为通用的方式是将offset和操作输入存在同一个地方。比如,consumer拿到数据后可能把数据放到HDFS,如果把最新的offset和数据本身一起写到HDFS,那就可以保证数据的输出和offset的更新要么都完成,要么都不完成,间接实现Exactly once。(目前就high level API而言,offset是存于Zookeeper中的,无法存于HDFS,而low level API的offset是由自己去维护的,可以将之存于HDFS中)。

总之,Kafka默认保证At least once,并且允许通过设置producer异步提交来实现At most once,而Exactly once要求与目标存储系统协作,Kafka提供的offset可以较为容易地实现这种方式。

17、如何保证Kafka的消息有序(☆☆☆☆☆)

Kafka对于消息的重复、丢失、错误以及顺序没有严格的要求。

Kafka只能保证一个partition中的消息被某个consumer消费时是顺序的,事实上,从Topic角度来说,当有多个partition时,消息仍然不是全局有序的。

18、kafka数据丢失问题,及如何保证

1)数据丢失:

acks=1的时候(只保证写入leader成功),如果刚好leader挂了。数据会丢失。

acks=0的时候,使用异步模式的时候,该模式下kafka无法保证消息,有可能会丢。

2)brocker如何保证不丢失:

acks=all: 所有副本都写入成功并确认。

retries = 一个合理值。

min.insync.replicas=2  消息至少要被写入到这么多副本才算成功。

unclean.leader.election.enable=false 关闭unclean leader选举,即不允许非ISR中的副本被选举为leader,以避免数据丢失。

3)Consumer如何保证不丢失

如果在消息处理完成前就提交了offset,那么就有可能造成数据的丢失。

enable.auto.commit=false 关闭自动提交offset

处理完数据之后手动提交。

19、kafka的balance是怎么做的

官方原文

Producers publish data to the topics of their choice. The producer is able to choose which message to assign to which partition within the topic. This can be done in a round-robin fashion simply to balance load or it can be done according to some semantic partition function (say based on some key in the message). More on the use of partitioning in a second.

翻译:

生产者将数据发布到他们选择的主题。生产者可以选择在主题中分配哪个分区的消息。这可以通过循环的方式来完成,只是为了平衡负载,或者可以根据一些语义分区功能(比如消息中的一些键)来完成。更多关于分区在一秒钟内的使用。

20、kafka的消费者方式

consumer采用pull(拉)模式从broker中读取数据。

push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。

对于Kafka而言,pull模式更合适,它可简化broker的设计,consumer可自主控制消费消息的速率,同时consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。

pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直等待数据到达。为了避免这种情况,我们在我们的拉请求中有参数,允许消费者请求在等待数据到达的“长轮询”中进行阻塞。

21、为什么kafka可以实现高吞吐?单节点kafka的吞吐量也比其他消息队列大,为什么?

、spark

spark是针对于大规模数据处理的统一分析引擎,它是基于内存计算框架,计算速度非常之快,但是它仅仅只是涉及到计算,并没有涉及到数据的存储,后期需要使用spark对接外部的数据源,比如hdfs。

1、速度快,job的输出结果可以保存在内存,spark任务以线程的方式运行在进程中。

2、易用性,可通过java/scala/python/R/SQL等不同语言

3、通用性,一个生态系统,包含了很多模块。sparksql:通过sql去开发spark程序做一些离线分析;sparkStreaming:主要是用来解决公司有实时计算的这种场景;Mlib:它封装了一些机器学习的算法库;Graphx:图计算

4、兼容性,任务要运行就需要计算资源(内存、cpu、磁盘,可以把spark程序提交到哪里去运。standAlone模式,自带的独立运行模式,整个任务的资源分配由spark集群的老大Master负责;yarn模式,把spark程序提交到yarn中运行,整个任务的资源分配由yarn中的老大ResourceManager负责;mesos,它也是apache开源的一个类似于yarn的资源调度平台

1、spark集群的架构

[if !supportLists]· [endif]Cluster Manager:在standalone模式中即为Master主节点,控制整个集群,监控worker。在YARN模式中为资源管理器

[if !supportLists]· [endif]Worker节点:从节点,负责控制计算节点,启动Executor或者Driver。

[if !supportLists]· [endif]Driver: 运行Application 的main()函数

[if !supportLists]· [endif]Executor:执行器,是为某个Application运行在worker node上的一个进程

2、运行原理

程序代码-> Driver ->调用main() -> 创建sparkContext -> 与spark集群交互

sparkContext ->连接ClusterManager -> 申请资源 -> 解析成多个task  ->分发给workNode -> 执行task -> 执行完毕释放资源

当前Driver启动以后,会去执行应用程序的main方法,并构建sparkConext对象。sparkContext与ClusterManager连接交互,并且sparkContext将程序代码解析成多个task,将task发送给workNode,workNode又会把task丢给任务执行器executor去执行,executor会启动线程池开始执行task。当所有的task执行完毕,spark向ClusterManager注销,并释放资源。

注册并申请资源Driver端向资源管理器(Standalone,Mesos,Yarn)申请运行Executor资源, 发送注册和申请计算资源的请求;

分配资源,Master通知对应的worker节点启动executor进程(计算资源), executor进程向Driver端发送注册并且申请task请求。

注册并申请task, Driver端运行客户端的main方法,构建SparkContext对象,在SparkContext对象内部依次构建DAGScheduler和TaskScheduler

executer运行task,按照客户端代码rdd的一系列操作顺序,生成DAG有向无环图。DAGScheduler拿到DAG有向无环图之后,按照宽依赖进行stage的划分。每一个stage内部有很多可以并行运行的task,最后封装在一个一个的taskSet集合中,然后把taskSet发送给TaskScheduler。TaskScheduler得到taskSet集合之后,依次遍历取出每一个task提交到worker节点上的executor进程中运行

注销,所有task运行完成,Driver端向Master发送注销请求,Master通知Worker关闭executor进程,Worker上的计算资源得到释放,最后整个任务也就结束了。

3、RDD五大特性

RDD(Resilient Distributed Dataset)叫做弹性 分布式 数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合.

Dataset:就是一个集合,存储很多数据.

Distributed:它内部的元素进行了分布式存储,方便于后期进行分布式计算.

Resilient: 表示弹性,rdd的数据是可以保存在内存或者是磁盘中. 

① 有一个分区列表

一个rdd有很多分区,每一个分区内部是包含了该rdd的部分数据, spark中任务是以task线程的方式运行, 一个分区就对应一个task线程,分区数决定了并行计算的力度。可以在创建RDD时指定RDD的分区个数,如果没有指定,那么就会采用默认值,默认值就是程序所分配到的CPU Core的数目。

② 每个分区都会有计算函数

Spark的RDD的计算函数是以分片为基本单位的,每个RDD都会实现 compute函数,对具体的分片进行计算,RDD中的分片是并行的,所以是分布式并行计算。

③ 一个rdd依赖其他rdd

窄依赖RDD会形成类似流水线一样的前后依赖关系,宽依赖后面的RDD具体的数据分片会依赖前面所有的RDD的所有数据分片

④ key-value型的RDD,可以设置分区函数

类似于mapreduce当中的paritioner接口,控制Key分到哪个reduce。

当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个基于范围的RangePartitioner。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Partitioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。

每个分区都有一个优先位置列表

移动数据不如移动计算,数据在哪台机器上,任务就启在哪个机器上,数据在本地上,不用走网络。不过数据进行最后汇总的时候就要走网络。(进行任务调度时会尽可能地将任务分配到处理数据的数据块所在的具体位置。

4、saprk调优

5、Shuffle

Shuffle就是对数据进行重组,在DAG调度的过程中,Stage阶段的划分是根据是否有shuffle过程,也就是是否存在宽依赖的时候。spark划分 stage 的整体思路是:从后往前推,遇到宽依赖就断开,划分为一个stage;遇到窄依赖就将这个 RDD 加入该 stage 中。

需要进行shuffle,这时候会将作业job划分成多个Stage,每一个stage内部有很多可以并行运行的task,stage与stage之间的过程就是shuffle阶段

在Spark的中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。ShuffleManager随着Spark的发展有两种实现的方式,分别为HashShuffleManager(spark1.2之前使用)和SortShuffleManager(spark1.2之后使用),

  spark1.2版本以前:hashShuffleManager、未经优化的hashShuffleManager、经过优化的hashShuffleManager;

spark1.2版本以后:SortShuffleManager、普通机制ByPass机制

HashShuffleManager的运行机制主要分成两种。 一种是普通运行机制,另一种是合并的运行机制。合并机制主要是通过复用buffer来优化Shuffle过程中产生的小文件的数量,Hash shuffle是不具有排序的Shuffle

SortShuffleManager的运行机制主要分成两种,一种是普通运行机制;另一种是bypass运行机制。

6、spark程序实现单词统计

object WordCount {

  defmain(args: Array[String]): Unit = {

    //1、构建sparkConf对象 设置application名称和master地址

    val sparkConf: SparkConf =new SparkConf().setAppName("WordCount").setMaster("local[2]")

    //2、构建sparkContext对象,该对象非常重要,它是所有spark程序的执行入口

    //它内部会构建 DAGScheduler和 TaskScheduler 对象

    val sc = new SparkContext(sparkConf)

    //设置日志输出级别

    sc.setLogLevel("warn")

    //3、读取数据文件

    val data: RDD[String] = sc.textFile("E:\\words.txt")

    //4、 切分每一行,获取所有单词

    val words: RDD[String] = data.flatMap(x=>x.split(" "))

    //5、每个单词计为1

    val wordAndOne: RDD[(String, Int)] = words.map(x => (x,1))

    //6、相同单词出现的1累加

    val result: RDD[(String, Int)] = wordAndOne.reduceByKey((x,y)=>x+y)

    //按照单词出现的次数降序排列,第二个参数默认true表示升序

    val sortedRDD: RDD[(String, Int)] = result.sortBy( x=> x._2,false)

    //7、收集数据打印

    val finalResult: Array[(String, Int)] = sortedRDD.collect()

    finalResult.foreach(println)

    //8、关闭sc

    sc.stop()

  }}

九Flume(☆☆☆☆)

1、Flume使用场景(☆☆☆☆☆)

线上数据一般主要是落地(存储到磁盘)或者通过socket传输给另外一个系统,这种情况下,你很难推动线上应用或服务去修改接口,实现直接向kafka里写数据,这时候你可能就需要flume这样的系统帮你去做传输。

2、Flume丢包问题(☆☆☆☆☆)

单机upd的flume source的配置,100+M/s数据量,10w qps flume就开始大量丢包,因此很多公司在搭建系统时,抛弃了Flume,自己研发传输系统,但是往往会参考Flume的Source-Channel-Sink模式。

一些公司在Flume工作过程中,会对业务日志进行监控,例如Flume agent中有多少条日志,Flume到Kafka后有多少条日志等等,如果数据丢失保持在1%左右是没有问题的,当数据丢失达到5%左右时就必须采取相应措施。

3、Flume与Kafka的选取

采集层主要可以使用Flume、Kafka两种技术。

·Flume:Flume 是管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展API。Kafka:Kafka是一个可持久化的分布式的消息队列。

Kafka 是一个非常通用的系统。你可以有许多生产者和很多的消费者共享多个主题Topics。相比之下,Flume是一个专用工具被设计为旨在往HDFS,HBase发送数据。它对HDFS有特殊的优化,并且集成了Hadoop的安全特性。所以,Cloudera 建议如果数据被多个系统消费的话,使用kafka;如果数据被设计给Hadoop使用,使用Flume。

 Flume内置很多的source和sink组件。然而,Kafka明显有一个更小的生产消费者生态系统,并且Kafka的社区支持不好。希望将来这种情况会得到改善,但是目前:使用Kafka意味着你准备好了编写你自己的生产者和消费者代码。如果已经存在的Flume Sources和Sinks满足你的需求,并且你更喜欢不需要任何开发的系统,请使用Flume。

Flume可以使用拦截器实时处理数据。这些对数据屏蔽或者过量是很有用的。Kafka需要外部的流处理系统才能做到。

Kafka和Flume都是可靠的系统,通过适当的配置能保证零数据丢失。然而,Flume不支持副本事件。于是,如果Flume代理的一个节点奔溃了,即使使用了可靠的文件管道方式,你也将丢失这些事件直到你恢复这些磁盘。如果你需要一个高可靠行的管道,那么使用Kafka是个更好的选择。

Flume和Kafka可以很好地结合起来使用。如果你的设计需要从Kafka到Hadoop的流数据,使用Flume代理并配置Kafka的Source读取数据也是可行的:你没有必要实现自己的消费者。你可以直接利用Flume与HDFS及HBase的结合的所有好处。你可以使用Cloudera Manager对消费者的监控,并且你甚至可以添加拦截器进行一些流处理。

4、数据怎么采集到Kafka,实现方式

使用官方提供的flumeKafka插件,插件的实现方式是自定义了flume的sink,将数据从channle中取出,通过kafka的producer写入到kafka中,可以自定义分区等。

5、flume管道内存,flume宕机了数据丢失怎么解决

1)Flume的channel分为很多种,可以将数据写入到文件。

2)防止非首个agent宕机的方法数可以做集群或者主备

6、flume配置方式,flume集群(问的很详细)

Flume的配置围绕着source、channel、sink叙述,flume的集群是做在agent上的,而非机器上。

7、flume不采集Nginx日志,通过Logger4j采集日志,优缺点是什么?

优点:Nginx的日志格式是固定的,但是缺少sessionid,通过logger4j采集的日志是带有sessionid的,而session可以通过redis共享,保证了集群日志中的同一session落到不同的tomcat时,sessionId还是一样的,而且logger4j的方式比较稳定,不会宕机。

缺点:不够灵活,logger4j的方式和项目结合过于紧密,而flume的方式比较灵活,拔插式比较好,不会影响项目性能。

8、flume和kafka采集日志区别,采集日志时中间停了,怎么记录之前的日志。

Flume采集日志是通过流的方式直接将日志收集到存储层,而kafka试讲日志缓存在kafka集群,待后期可以采集到存储层。

Flume采集中间停了,可以采用文件的方式记录之前的日志,而kafka是采用offset的方式记录之前的日志。

9、flume有哪些组件flume的sourcechannelsink具体是做什么的

1)source:用于采集数据,Source是产生数据流的地方,同时Source会将产生的数据流传输到Channel,这个有点类似于Java IO部分的Channel。

2)channel:用于桥接Sources和Sinks,类似于一个队列。

3)sink:从Channel收集数据,将数据写到目标源(可以是下一个Source,也可以是HDFS或者HBase)。

10、你是如何实现flume数据传输的监控的

11、Flume怎么做数据监听?有没有做ETL?

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351

推荐阅读更多精彩内容