Mapreduce案例之移动公司日志分析


1.分布式计算思想:

1.1基本思想:mapreduce是两个操作步骤,即映射和规约也是这个分布式计算的思想。即实现一个指定的Map映射函数,用来把一组键值对映射成新的键值对,再把新的键值对发送个Reduce规约函数,用来保证所有映射的键值对中的每一个共享相同的键组。

1.2执行流程:HDFS上的数据与map任务沟通时会被切分split一个split对应一个map ,块和split数目不一定相同,每一个reduce任务对应一个文件。结果存放在目录中

map任务运行在节点上(一个节点可以运行多个map任务,但是一个map任务不能跨多个节点上运行)reduce任务与map同理进入map键值对k1

v1原始数据

1.3处理计算步骤:

1.3.1. map任务处理

1.1读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。

1.2.写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。

1.3对输出的key、value进行分区。

1.4.对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。

1.5分组后的数据进行归约。

1.3.2.reduce任务处理

2.1.对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。

2.2.对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、values处理,转换成新的key、value输出。

2.3把reduce的输出保存到HDFS文件中,k3

v3是结果数据,k2 v2是中间数据,k1 v1是初始数据

某些类的数据,送到某些reduce中,这个过程被称为shuffle——数据分配过程map和reducer在伪分布式下是在第一个节点的map任务数量一般一个节点会有2个任务,输出流hdfs完成

2.业务逻辑:

移动公司日志,一般记录以下几个数据,手机号码,上行流量,下行流量。目的统计一个手机号码某一个时间段的产生的总流量。并按照一定的规则排序。

首先把这个庞大的数据源从本地上传到Hdfs上,被逻辑切分多个块,然后进行分布式并行计算统计总流量。

3.具体步骤:

3.1.继承Mapper类重写map方法

这个步骤就是把大文件分成若干个split,每个split对应一个map任务,比如说计算一个136字段的手机号的总流量。

3.2.继承Reducer类重写reduce方法

这个步骤是对map处理的数据进行汇总,比如分区,排序。

3.3.新建驱动Job加载reducemap方法

Mapreduce代码:

import java.io.IOException;

import java.util.Map;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

importorg.apache.hadoop.mapreduce.Partitioner;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DataCount {

publicstatic class DCMapper extends Mapper{

protectedvoid map(LongWritable key, Text value, Context context)

throwsIOException, InterruptedException {

//把文本数据转化String类型便于后续分割

Stringline = value.toString();

//分割数据

String[]fields = line.split("\t");

Stringtel = fields[1];

longup = Long.parseLong(fields[8]);

longdown = Long.parseLong(fields[9]);

DataBeanbean = new DataBean(tel, up, down); //封装的对象

//send

context.write(newText(tel), bean);//提交给reduce

}

}

publicstatic class DCReducer extends Reducer{

@Override

protectedvoid reduce(Text key, Iterable values, Context context)

throwsIOException, InterruptedException {

longup_sum = 0;

longdown_sum = 0;

for(DataBeanbean : values){

up_sum+= bean.getUpPayLoad();

down_sum+= bean.getDownPayLoad();

}

DataBeanbean = new DataBean("", up_sum, down_sum);

context.write(key,bean);

}

}

publicstaticclass Partitioner1 extendsPartitioner{

publicint getPartition(Text key, DataBean value, int numPartitions)

{

Stringaccount=key.toString();

Stringsub_acc=account.substring(0,3);

return0;

}

}

publicstatic void main(String[] args) throws Exception

{

Configurationconf = new Configuration();

Jobjob = Job.getInstance(conf);

job.setJarByClass(DataCount.class);

job.setMapperClass(DCMapper.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(DataBean.class);

FileInputFormat.setInputPaths(job,new Path(args[0]));

job.setReducerClass(DCReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(DataBean.class);

FileOutputFormat.setOutputPath(job,new Path(args[1]));

job.waitForCompletion(true);

}

}

4.对于对象的封装

封装代码:

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class DataBean implements Writable{

privateString tel;

privatelong upPayLoad;

privatelong downPayLoad;

privatelong totalPayLoad;

publicDataBean(){}

publicDataBean(String tel, long upPayLoad, long downPayLoad) {

super();

this.tel= tel;

this.upPayLoad= upPayLoad;

this.downPayLoad= downPayLoad;

this.totalPayLoad= upPayLoad + downPayLoad;

}

publicString toString() {

returnthis.upPayLoad + "\t" + this.downPayLoad + "\t" +this.totalPayLoad;

}

publicvoid write(DataOutput out) throws IOException {

out.writeUTF(tel);

out.writeLong(upPayLoad);

out.writeLong(downPayLoad);

out.writeLong(totalPayLoad);

}

publicvoid readFields(DataInput in) throws IOException {

this.tel= in.readUTF();

this.upPayLoad= in.readLong();

this.downPayLoad= in.readLong();

this.totalPayLoad= in.readLong();

}

publicString getTel() {

returntel;

}

publicvoid setTel(String tel) {

this.tel= tel;

}

publiclong getUpPayLoad() {

returnupPayLoad;

}

publicvoid setUpPayLoad(long upPayLoad) {

this.upPayLoad= upPayLoad;

}

publiclong getDownPayLoad() {

returndownPayLoad;

}

publicvoid setDownPayLoad(long downPayLoad) {

this.downPayLoad= downPayLoad;

}

publiclong getTotalPayLoad() {

returntotalPayLoad;

}

publicvoid setTotalPayLoad(long totalPayLoad) {

this.totalPayLoad= totalPayLoad;

}

}

5.业务处理排序:

排序代码:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;

importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class paixu extendsMapper

{

publicstatic void main(String[] args) throws IOException {

Configurationconf=new Configuration();

Jobjob=Job.getInstance(conf);

job.setJarByClass(paixu.class);

//job.setMapperClass();

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(bean.class);

FileInputFormat.setInputPaths(job,new Path(args[0]));

job.setReducerClass(r.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(bean.class);

FileOutputFormat.setOutputPath(job,new Path(args[1]));

}

privateText k=new Text();

privatebean v=new bean();

protectedvoid map(LongWritable key, Text value,Context context)

throwsIOException, InterruptedException {

Stringline=value.toString();

String[]fields=line.split("\t");

Stringaccount=fields[0];

doublein=Double.parseDouble(fields[1]);

doubleout=Double.parseDouble(fields[2]);

k.set(account);

v.set(account,in, out);

context.write(k,v);

}

publicstatic class r extends Reducer{

privatebean v =new bean();

protectedvoid reduce(Text key, Iterable value,Context context)

throwsIOException, InterruptedException {

doubleinsum=0;

doubleoutsum=0;

for(beano:value)

{

insum+=o.getIncome();

outsum+=o.getExpenses();

}

v.set("",insum, outsum);

context.write(key,v);

}

}

}

6.运行结果:

总结:在写Mapreduce中体会到只要做好自己三个步骤,剩下的框架都会自动做好,不需要考虑,但是在自己做的三个步骤中最难的就是业务逻辑的处理。所以在Mapreduce分布式并行计算中业务逻辑处理是重中之重。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,816评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,729评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,300评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,780评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,890评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,084评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,151评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,912评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,355评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,666评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,809评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,504评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,150评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,121评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,628评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,724评论 2 351

推荐阅读更多精彩内容