Coding

1.基础:wordCount

2.三个重要自定义接口:partitioner、combiner、自定义排序(WritableComparator)

partitioner用于自定义maptask执行结果分区,按照分区结果启动相应数量reduce,默认使用对key进行hash的方式分区。(例子:对手机流量统计同时按照归属地进行分区)自定义一个partitioner继承抽象类:Partitioner然后在job对象中,设置自定义partitioner: job.setPartitionerClass(CustomPartitioner.class),即可在maptask处理数据时(将数据写入缓冲区)对数据进行自定义分区。

combiner用于处理maptask到reduceTask之间的中间结果,Combiner将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。使用combiner要注意不能破坏最终的结果,不适用于求平均值这种情况,具体问题具体分析。(例子:结合wordCount即可,可以发现每个mapTask执行结果变成了类似reduceTask执行结果<hello,n(n>1)>)。

自定义排序:比如需要以某个bean作为key并按照bean中的某个属性进行排序,需对这个bean实现WritableComparable接口,自定义排序逻辑。(对手机流量统计结果按照总流量大小进行排序输出)。

3.coding

3.1 两表join算法的实现

select a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product b on a.pid = b.id

join

在Reduce端实现:

思路:

自定义join后的实体类型infoBean,包含两个表join后的属性,外加一个flag标识是哪个表的数据

mapTask:通过文件名判断是哪种数据(order还是product),切分行,赋值给infoBean,构造k-v键值对,key为join条件

reduceTask:可知一个productBean会对应多个orderBean,在reduce阶段,一次读一组key相同的数据,通过flag区分是哪种bean(orderBean必然是一组,productBean则是一个),对orderBean进行遍历,将所缺的product数据由productBean,set进去即可。

在Map端实现:解决数据倾斜的问题

根据join条件,比如有很大一部分pid分区后涌入一个reduce,而其他pid只有少数,却也涌入其他reduce,就会造成数据量大的reduce处理起来较慢,并发效率低的情况。解决方式是在Map端实现,适用于关联表中有小表的情形;可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,可以大大提高join操作的并发度,加快处理速度。

思路:

先在mapper类中预先定义好小表,进行join------引入实际场景中的解决方案:一次加载数据库或者用distributedcache

在mapper类中重写 setup方法,该方法在map任务初始化时调用,在此处将小表读取放入本地缓存。在map方法中只读取大表,然后将小表和其关联即可,无需reduce。

指定需要缓存一个文件到所有的maptask运行节点工作目录的方法:

3.2  找出共同好友(difficult)

思路:

重点:好友关系是单向的,如何得到一组数据,以两个用户为key,value为两者的共同好友。

第一步 :map

读一行  A:B,C,D,F,E,O

输出    <B,A><C,A><D,A><F,A><E,A><O,A>   (B在A的好友列表、C在A的好友列表,D在A的好友列表。。。)

第二步:reduce

拿到的数据比如<C,A><C,B><C,E><C,F><C,G>......

输出:<C  A,B,E,F,G…….>     ——>C为key,value为,列表中有C作为好友的人,即,A,B,E,F,G…都有C作为好友。

第三步:map,以第二步输出作为输入

读入一行<C  A,B,E,F,G…….>

输出<A-B,C><A-E,C><A-F,C>……还要把A,B,E,F,G…….先排序防止A-B和B-A这种情况

从而得到两两共同好友C

第四步:reduce

读入数据 <A-B,C><A-B,F><A-B,G>.......

输出: A-B  C,F,G,.....

A-B为key,收集到key对应的所有value,就是这两两的共同好友

3.3  流量统计

1.对流量日志中的用户统计总上、下行流量

map:切分数据,对日志进行处理,将电话号码作为key,将上行流量、下行流量、总流量生成一个bean作为value。

reduce:一个key,得到一组bean,然后统计总流量。

2.统计流量且按照流量大小倒序排序

使用第一步的结果作为输入,使bean作为key,电话号码作为value,实现WritableComparable接口,自定义排序方法,这样就自动排序了。因为每个bean都是不同的,所以对于reduce来说,不存在一个key对应一组value,所以reduce一次只处理一个k-v,将v作为key,key作v,得到排序后的结果。

3.根据号码归属地对数据分区,将数据写到不同数据

实现方法:自定义Partitioner,实现getPartition方法,然后在客户端程序定义

job.setPartitionerClass(ProvincePartitioner.class);

自定义partition后,要根据自定义partitioner的逻辑设置相应数量的reduce task

job.setNumReduceTasks(n);

这样就可以按照自定义方式对数据进行分区处理了

注意:如果reduceTask的数量>= getPartition的结果数  ,则会多产生几个空的输出文件part-r-000xx

如果    1<reduceTask的数量<getPartition的结果数 ,则有一部分分区数据无处安放,会Exception!!!

如果 reduceTask的数量=1,则不管mapTask端输出多少个分区文件,最终结果都交给这一个reduceTask,最终也就只会产生一个结果文件 part-r-00000

3.4  web日志预处理

需求:对web访问日志中的各字段识别切分,去除日志中不合法的记录

定义一个流量数据的bean

定义一个数据校验函数,当数据不全或者数据中日志的请求状态字段为400,设置为非法记录。

使用mapper对数据进行处理,每次读取一行,bean为key,当bean不合法,不写入。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容