一、Hive窗口函数
窗口函数的语法:
Function() OVER ([PARTITION BY <...>] [ORDER BY <...>] [Window Specification])
分析函数有3个基本组成部分:
1. 分区子句
2. 排序子句
3. 开窗子句
注:不加 partition by 的话则把整个数据集当作一个分区,如果不指定开窗子句,默认统计窗口为从起点到当前行;如果不指定ORDER BY,则将分组内所有值累加,会对某些函数统计结果产生影响,如sum()
窗口函数可达到的效果:
在分组内进行聚合、排序等操作
窗口函数和聚合函数的不同之处是:
对于每个组返回多行,而聚合函数对于每个组只返回一行。
开窗函数指定了分析函数工作的数据窗口大小,这个数据窗口大小可能会随着行的变化而变化
开窗子句:
关键是理解 ROWS BETWEEN 含义,也叫做window子句:
PRECEDING:往前
FOLLOWING:往后
CURRENT ROW:当前行
UNBOUNDED:无边界
UNBOUNDED PRECEDING 表示从最前面的起点开始
UNBOUNDED FOLLOWING:表示到最后面的终点
窗口函数可以分为三类:
聚合函数,通常的聚合函数,如 SUM 和 MAX 等。
排序函数,排序数据函数,如 RANK 和 ROW_NUMBER 等。
分析函数,统计和对比函数,如 LEAD、LAG 和 FIRST_VALUE 等。
rank、dense_rank、row_number三者对比:
ROW_NUMBER() 从1开始,按照顺序,生成分组内记录的序列
RANK() 生成数据项在分组中的排名,排名相等会在名次中留下空位
DENSE_RANK() 生成数据项在分组中的排名,排名相等会在名次中不会留下空位
Sum:
如果没有order by,不仅分区内没有排序,sum()计算的也是整个分区的总和,也无法使用开窗子句了
max()
无论有没有order by 都是计算整个分区的最大值
NTILE(n)
用于将分组数据按照顺序切分成n片,返回当前切片值
cume_dist
返回小于等于当前值的行数/分组内总行数
LAG(col,n,DEFAULT)
用于统计窗口内往上第n行值
LEAD(col,n,DEFAULT)
用于统计窗口内往下第n行值
FIRST_VALUE
取分组内排序后,截止到当前行,第一个值
LAST_VALUE
取分组内排序后,截止到当前行,最后一个值
二、sort by的使用场景
1.distribute by和sort by在分区内部排序,可以用在窗口函数中,并且如果数据量过大时,先分区排序,再对结果全局排序,可以提高执行效率
2.cluster by:除了具有distribute by的功能外还兼具sort by的功能。但是排序只能是升序排序,不能指定排序规则为ASC或者DESC,并且分区和排序的列必须相同,没用过
三、如何进行数据清洗的
1.这个项目因为数据来源全部都是oracle,因此数据质量比较高,没有特地进行数据清洗工作,而是在业务处理过程中完成的比如说异常数据剔除、字段格式化之类的工作
2.如果是日志数据的话,第一步肯定是将需要用到的字段提取出来,供数据分析使用
3.数据格式统一,一般是对一些id之类的用lpad进行补全操作,对前后有空格的数据进行trim,对时间字段进行统一格式
4.异常数据剔除,像评分超过限定值,某些不能为空的字段为空
5.数据去重
四、Impala简述
Impala是实时交互大数据查询工具,Impala底层不是MapReduce,而是与关系数据库中类似的分布式查询引擎,无法用作数据分析,只是一个查询工具,可以和Kudu配合使用
五、如何看出一条sql是否走了索引
利用SQL执行计划(是指查看一条SQL语句在数据库中实际执行的时候,一步步的分别都做了什么)
六、项目中定时增量抽取数据是怎么做的
通过informatic进行增量导入:
首先增量的表需要有主键和增量字段,然后在infa里面添加一个更新策略模块,参数设置为DD_UPDATE,然后将工作流的插入方式设置为update else insert,我们导入的时候可以通过ETS配置增量起止区间,然后在这个区间内的数据,会进行主键比较,如果主键相同则更新,如果主键不同则插入,当导入到一半发生错误时,也没关系,重新执行,如果主键相同则更新,如果主键不同则插入
通过sqoop进行增量导入:
要满足表有增量字段,可以是时间也可以是自增的id,我们表里面都是有自增主键的,所以选择了append的方式,在sqoop参数中配置--incremental append --check-column id --last-value 1,然后创建一个sqoop job通过oozie每天定时去跑,last-value里面的增量值会记录在metastore中(需要开启sqoop的metastore),实现自动增量,如果导入发生错误,那么应该不会上传到HDFS上,如果导入成功,则将最后导入的id作为下次导入的起始id,然后创建外部分区表(每次都删除原表创建新表),按年月和平台分区后进行数据处理,oozie配置这块我自己没怎么配置过
其余表的导入:
其余数据量比较小的,或者不满足增量条件的,目前我们都是全量导入
七、历史数据会发生变化的表怎么处理:
一般历史数据是不会改动的,会改动的表就做成全量好了
八、Sqoop命令和sqoop job的区别:
Sqoop命令是直接执行,sqoop job是创建完成后去调用
九、Sqoop和infa导入过程中遇到什么问题
Infa:
1.字段内容被截取,可以适当放大落地表字段长度
2.为了不影响抽取效率,blob/clob字段采取不抽取策略
Sqoop:
1.数据中的换行符问题
替换或删除换行符等特殊字符 ,如:–hive-drop-import-delims
2.jdbc驱动需要放在sqoop的lib里面
3.空值问题
当表中有字符串类型的字段时,则其既可能含有空对象,又可能含有空字符串时,两个参数都需要用上
sqoop import ... --null-string '\\N' --null-non-string '\\N'
十、什么是数据仓库、主题域、主题
将不同数据来源的数据整合起来,经过清洗、转换后,面向不同的主题进行数据分析
主题域:集体企业
主题:招标,工程,运营,人员,银行等
数据中心包含数据仓库,可以对外提供分析之后的业务数据
十一、Spark-Streaming获取kafka数据的两种消费模式、三种消费语义
两种消费模式:
基于Receiver的方式不好,基于Direct的方式好在:
1.简化并行读取:Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据
2.数据恢复便捷:
只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复
3.一次且仅一次的事务机制:
在Direct的方式中,我们是直接从kafka来读数据,可以保证,所有从Kafka接收到的数据,都是一次且仅一次
三种消费语义:
1.最多一次:读取消息 -> 提交offset -> 处理消息
(offset已经提交了,数据还没处理完,消费者挂了)
2.最少一次:读取消息 -> 处理消息 -> 提交offset
(数据处理完了,offset还没提交之前,消费者挂了)
3.精确一次:将处理消息和提交offset放在同一个事务中
(offset和数据同时处理完毕)
十二、Kafka 0.11后如何做到exactly-once语义
Kafka 0.11后的版本支持exactly-once语义,其中主要包括三个内部逻辑的改造:
幂等:partition内部的exactly-once顺序语义
幂等操作,是指可以执行多次,而不会产生与仅执行一次不同结果的操作,现在相同的消息,如果被producer发送多次,也只会被写入Kafka一次。要打开此功能,需要修改broker的配置:enable.idempotence = true。
事务:跨partition的原子性写操作
Kafka现在支持使用新事务API原子性的对跨partition进行写操作
Exactly-once 流处理
基于幂等和原子性,通过DirectStreams API实现exactly-once流处理成为可能。如果要在流应用中实现相关语义,只需要配置 processing.guarantee=exactly_once
十三、消费者如何消费Kafka中的消息,如果消费能力不足怎么办
Kafka 中一个 topic 中的消息是被打散分配在多个 Partition(分区) 中存储的, Consumer Group 在消费时需要从不同的 Partition 获取消息,如果应用需要读取全量消息,那么请为该应用设置一个消费组;如果该应用消费能力不足,那么可以考虑在这个消费组里增加消费者,但消费者组中消费者的数量不应该比分区数多,因为多出来的消费者是空闲的,没有任何帮助。
十四、消费者注册方式
subscribe:这种方式在新增topic或者partition或者消费者增加或者消费者减少的时候,会进行消费者组内消费者的重平衡(Rebalance)
Rebalance
当消费者从组中添加或删除时,需要进行分区数据重新分配的过程,称为重平衡
十五、项目中使用的各组件版本:
Kafka:0.11
Sparkstreaming 2.11
Hadoop2.6
Hive2.1
Sqoop1
Flume1.8
十六、管理offset的流程
1.在Kafka DirectStream初始化时,取得当前所有partition的存量offset(Partition 会为每个消费者组保存一个偏移量,记录其消费到的位置),以让DirectStream能够从正确的位置开始读取数据
2.读取消息数据,处理并存储结果(程序主体)
3.使用commitAsync() 提交offset将其持久化在kafka的topic中
十七、Kafka主题和分区的关系,如何创建分区
Kafka 的消息是存在于文件系统之上的, Topic 其实是逻辑上的概念,面相消费者和生产者,物理上存储的其实是 Partition,在创建topic时可以指定Partition个数
十八、数据倾斜专题
1.数据倾斜的表现
任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大。
2.数据倾斜的原因
1)、key分布不均匀
2)、某些SQL语句本身就有数据倾斜
3.数据倾斜的解决方案
3.1 参数调节
1)hive.map.aggr = true
Map 端部分聚合,相当于Combiner
2)hive.groupby.skewindata=true
数据倾斜的时候进行负载均衡,当项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。
3.2 sql语句优化
1)大小表Join:
使用map join让小的维度表(1000条以下的记录条数) 先进内存。在map端完成reduce.
2)大表Join大表,但是0值或空值过多:
把空值的key变成一个字符串加上随机数,把倾斜的数据分到不同的reduce上,由于null值关联不上,处理后并不影响最终结果。比如按userid进行join的情况,userid会有空值
3)Join时关联条件的数据类型不同,如int型的id和string类型的id
把数字类型用cast转换成字符串类型
4)将count(distinct)换为两次group by
十九、MapReduce(Hadoop)调优
(2)合理设置切块大小和分区个数(即map、reduce个数)
一个恰当的map并行度是大约每个节点10-100个map,且最好每个map的执行时间至少一分钟
合适的reduce数量等于各节点cpu core数量总和
(2)对数据进行压缩
(3)小文件事先合并成大文件
(4)在集群网络通信状态不好的时候,适当调大DN的心跳间隔
(5)增加溢写缓冲区大小,减少溢写次数,减少磁盘I/O
二十、hive语句调优
1.Join中需要将大表写在靠右的位置
2.尽量使用UDF而不是transfrom
3.大表join小表时,使用map join让小的维度表(1000条以下的记录条数) 先进内存。在map端完成reduce.(hive2.0以后默认自动开启map join)
4.将count(distinct)换为两次group by
5.先查出子查询,然后从查询结果中继续查询,避免从全量数据中查询
6.尽量把子查询中的group by移到外层进行
7.先过滤后排序效率相对较高
二十一、sparkstreaming调优
1.协调Kafka读取速率、每批次时间、数据处理所需时间,使得每个task 都能在批处理时间内及时处理完 Partition 内的数据
2.对一些经常使用到的rdd,可以调用rdd.cache()来缓存rdd
3.视情况调节用于缓存rdd的内存和用于shuffle的内存
说视频项目中主要负责用hive和sparkstreaming进行业务处理,flume数据采集我也看过flume的配置,因为我们的数据来源都是通过埋点之类的获取的日志数据,就在flume中配置source
8.将where ... in ()模式改为LEFT SEMI JOIN
查询a表中所有在b表中出现的key
SELECT a.key, a.val FROM a LEFT SEMI JOIN b on (a.key = b.key)
LEFT SEMI JOIN限制条件:
JOIN 子句中右边的表只能在 ON 子句中设置过滤条件
先确定一张主表,还需要什么字段join什么表,按需求进行过滤,按维度进行聚合(先聚合成最细粒度,因为粗粒度可以根据细粒度生成),将指标进行计算后输出
Hive语法
Explode函数
Filter里面和map一样的写法
hive中按指定格式返回日期(string)
date_format(a,"yy-MM")
hive窗口函数使用场景
1.统计1-12月的累积营业收入,即1月为1月份的值,2月为1.2月份值的和
2.一年中,统计出营业收入前1/4之的企业信息
sql不会时:
先想写sql的逻辑步骤
再想可以先查出子查询结果,然后再跟主表关联
再想内置函数有没有能直接实现的(如分组后最小值大于85,则全大于85)
再想窗口函数
case when then用法很多,不会写了先想这个
Spark中parallelize函数和makeRDD函数的区别
当调用parallelize()方法的时候,不指定分区数的时候,使用系统给出的分区数,而调用makeRDD()方法的时候,会为每个集合对象创建最佳分区,而这对后续的调用优化很有帮助
touch命令用法
修改文件时间戳或新建文件
数据量:
数据中心:几亿,100多个G
app项目:8千万左右,20-30g
kafka集群的规模,消费速度是多少。
3个节点,每秒10几M左右。
项目中的维度有哪些
我们几个通用的维度就是年月日这些时间维度,平台维度,采购类型(物资非物资维度),招标类型,采购方式
介绍sqoop抽取到hdfs后,不要特意说是外部表,只有问道的时候再说
MR设计核心
针对分布式存储架构,进行分布式并发处理
写单词统计sparkstreaming
写pv、uv、sql练习
Hive与sparkstreaming业务需要会讲两个
关于UDF
项目中其实很少用到UDF,不过我知道怎么实现,...,比如把查询出来的金额进行特殊格式化,原字段当作参数传进去,返回一个格式化以后的字段
ArrayList---顺序表
默认初始容量为10,每次扩容是在上一次容量的基础上增加一半,增删元素相对较慢,查询元素相对较快。是线程不安全的列表
HashSet:
默认初始容量是16,加载因子/装载因子是0.75f,每次扩容一倍。是一个线程不安全的集合
Comparator
比较器---重写compare方法,在compare方法中指定比较规则。根据返回值的正负来确定排序---如果返回值为正,则第一个参数排在后边;如果返回值为负,则第一个参数排在前面。
如果没有指定比较规则,而元素又需要进行排序,那么要求元素对应的类必须实现Comparable接口---重写compareTo
HashMap/Hashtable的区别
HashMap---允许键或者值为null。默认初始容量是16,默认加载因子是0.75f,每次扩容一倍。是一个异步式线程不安全的映射
Hashtable---不允许键或者值为null。默认初始容量是11,默认加载因子是0.75f。是一个同步式线程安全的映射
70.在系统中查找文件的命令:
Find / -name catch.sh
88.显示日志文件最后20行
tail -n 20 filename
1.Hadoop中用户自定义数据类型的实现
1.继承接口Writable,实现其方法write()和readFields(), 以便该数据能被序列化后完成网络传输或文件输入/输出;
2.如果该数据需要作为主键key使用,或需要比较数值大小时,则需要实现WritalbeComparable接口,实现其方法write(),readFields(),CompareTo() 。
1.Dstream的transform操作
可以用于执行任意的RDD到RDD的转换操作;
它可以用于实现,DStream API中所没有提供的操作;比如说,DStream API中,并没有提供将一个DStream中的每个batch,与一个特定的RDD进行join的操作。但是我们自己就可以使用transform操作来实现该功能。
scala> List(1,2,3).zip(List(100,200,300))
res91: List[(Int, Int)] = List((1,100), (2,200), (3,300))
7. 修改文件名
Mv 旧名 新名
17. 创建用户gm,指定登录密码和用户组和用户id,并查看该用户
Useradd -p 密码 -g 用户组 -u 用户id gm
31. 登录远端服务器
Ssh root@远端服务器ip地址
38. 关闭防火墙
Service iptables stop
39. 永久关闭防火墙
Chkconfig iptables off
55. yum安装、升级、卸载软件
Yum install、update、remove 软件名
六、HIVE语法(相当于数据库的sql)
9.补充:
增加分区:
ALTER TABLE book add PARTITION (category = 'zazhi') location '/user/hive/warehouse/datax.db/book/category=zazhi';
1. spark中的shuffle的含义
spark中一旦遇到宽依赖就需要进行shuffle的操作
所谓的shuffle的操作的本质就是将数据汇总后重新分发的过程
这个过程数据要汇总到一起 数据量可能很大所以不可避免的需要进行数据落磁盘的操作 会降低程序的性能
所以spark并不是完全使用内存而不读写磁盘 只能说它尽力避免这样的过程来提高效率
在DStream中提供了如下的和滑动窗口相关的方法:
window(windowLength, slideInterval)
countByWindow(windowLength, slideInterval)
reduceByWindow(func, windowLength, slideInterval)
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
countByValueAndWindow(windowLength, slideInterval, [numTasks])
可以通过以上机制改造案例
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc,Seconds(1));
ssc.checkpoint("file:///root/work/streamCheckPoint");
val lines = ssc.textFileStream("file:///root/work/streamData");
val words = lines.flatMap(_.split(" "));
words.map((_,1)).reduceByKeyAndWindow( (x:Int,y:Int)=>x+y, Seconds(5), Seconds(5) ).print
ssc.start();
B、HDFS中有两个文件a.text与b.text,文件的格式为(ip,username),如:a.text,b.text
a.text
127.0.0.1 xiaozhang
127.0.0.1 xiaoli
127.0.0.2 wangwu
127.0.0.3 lisi
B.text
127.0.0.4 lixiaolu
127.0.0.5 lisi
每个文件至少有1000万行,请用程序完成以下工作,
1)出现在b.text而没有出现在a.text的IP
rdd22.filter(x=>{!x.contains(rdd11)}).foreach(println)
12.怎么查看kafka的offset
consumer.position()
13. rdd 怎么分区宽依赖和窄依赖
宽依赖:父RDD的分区被子RDD的多个分区使用 例如 groupByKey、reduceByKey、sortByKey等操作会产生宽依赖,会产生shuffle
窄依赖:父RDD的每个分区都只被子RDD的一个分区使用 例如map、filter、union等操作会产生窄依赖
14.怎么解决kafka的数据丢失
producer端:
宏观上看保证数据的可靠安全性,肯定是依据分区数做好数据备份,设立副本数。
broker端:
topic设置多分区,分区自适应所在机器,为了让各分区均匀分布在所在的broker中,分区数要大于broker数。
分区是kafka进行并行读写的单位,是提升kafka速度的关键。
Consumer端
consumer端丢失消息的情形比较简单:如果在消息处理完成前就提交了offset,那么就有可能造成数据的丢失。由于Kafka consumer默认是自动提交位移的,所以在后台提交位移前一定要保证消息被正常处理了,因此不建议采用很重的处理逻辑,如果处理耗时很长,则建议把逻辑放到另一个线程中去做。为了避免数据丢失,现给出两点建议:
enable.auto.commit=false 关闭自动提交位移
在消息被完整处理之后再手动提交位移
3. 生产环境中为什么建议使用外部表?
解答:
1、因为外部表不会加载数据到hive,减少数据传输、数据还能共享。
2、hive不会修改数据,所以无需担心数据的损坏
3、删除表时,只删除表结构、不删除数据。
8. 假如一个分区的数据错误怎么通过hivesql删除
解答:
alter table ptable drop partition (daytime='20140911',city='bj');
元数据,数据文件都删除,但目录daytime= 20140911还在
返回日期部分(string)
to_data(a)
查找字符串中子串第一次出现的位置
instr(a)
字符串中大小写转换
lower(a)/lcase(a)
将字符串中的所有子串替换为另一子串
translate(a,b,c)
CDH搭建过程:
安装操作系统,配置固定ip
永久关闭每台机器的防火墙\SELinux
为每台机器配置ssh免秘钥登录
安装jdk\MySql
配置NTP
Cloudera -Manager Server 和Agent
通过CM安装CDH
首先判断是有监督学习还是无监督学习(是否存在可以学习的样本)
一、有监督学习
有监督的学习,即存在目标变量,需要探索特征变量和目标变量之间的关系,在目标变量的监督下学习和优化算法。例如,信用评分模型就是典型的有监督学习,目标变量为“是否违约”。算法的目的在于研究特征变量(人口统计、资产属性等)和目标变量之间的关系。
1.分类算法
分类算法和预测算法的最大区别在于,前者的目标变量是分类离散型(例如,是否逾期、是否肿瘤细胞、是否垃圾邮件等),后者的目标变量是连续型。一般而言,具体的分类算法包括,分类决策树、KNN、朴素贝叶斯、svm等。
分类决策树
1.给定一个训练数据集(数据集中的数据包含划分属性和决策属性,即:“当满足哪些划分属性时,可以给出什么样的决策结果)
比如:我们想知道在什么天气条件下适合户外运动”
2.那么划分属性包含:天气、温度、适度、风力
决策属性包含:是否可以进行户外运动
3.可以根据分类纯度(熵)来指定分类规则,最终将数据分类成决策树的形式
KNN
1、给定一个训练集数据,每个训练集数据都是已经分好类的。
2、设定一个初始的测试数据a,计算a到训练集所有数据的欧几里得距离,并排序。
3、选出训练集中离a距离最近的K个训练集数据。
4、比较k个训练集数据,选出里面出现最多的分类类型,此分类类型即为最终测试数据a的分类。
2.回归算法
预测类算法,其目标变量一般是连续型变量。常见的算法,包括线性回归、逻辑回归、svm等。
线性回归
先给定一个训练集,根据这个训练集学习出一个线性函数
然后测试这个函数是否足够拟合训练集数据
然后挑选出最好的线性函数(代价函数越小,说明我们线性回归的越好,和训练数据拟合的越好)
二、无监督学习
无监督学习,即不存在目标变量,基于数据本身,去识别变量之间内在的模式和特征。例如关联分析,通过数据发现项目A和项目B之间的关联性。例如聚类分析,通过距离,将所有样本划分为几个稳定可区分的群体。这些都是在没有目标变量监督下的模式识别和分析。
1)聚类分析
聚类的目的就是实现对样本的细分,使得同组内的样本特征较为相似,不同组的样本特征差异较大。常见的聚类算法包括kmeans、密度聚类等。
kmeans
(1)、设定数字k,从n个初始数据中随机的设置k个点为聚类中心点。
(2)、针对n个点的每个数据点,遍历计算到k个聚类中心点的距离,最后按照离哪个中心点最近,就划分到那个类别中。
(3)、对每个已经划分好类别的n个点,对同个类别的点求均值,作为此类别新的中心点。
(4)、循环(2),(3)直到最终中心点收敛。
2)关联分析
关联分析的目的在于,找出项目(item)之间内在的联系。常常是指购物篮分析,即消费者常常会同时购买哪些产品(例如游泳裤、防晒霜),从而有助于商家的捆绑销售。如Apriori算法等
Apriori
首先设定最小支持度与最小可信度两个门槛值,满足以上两个条件的前提下,根据这些组合最终推出我们的关联规则
支持度:比如在1000次的商品交易中同时出现了啤酒和尿布的次数是50次,那么此关联的支持度为5%
可信度:在数据集中已经出现A时,B发生的概率
补充:推荐系统协同过滤算法
用户相似度计算(欧式距离相似度计算、余弦相似度)
为相似的用户推荐其可能感兴趣的商品
拉链表
数据建模
看收藏的网站,写hive、sparkstreaming案例(黑名单过滤一个案例,数据分析一个案例,搜索词滑动统计案例),将笔记和资料等进行整理,创建目录方便查阅