Hadoop MR ETL离线项目

一、需求及步骤解析

1、需求

利用MR对日志进行清洗后交由Hive统计分析

2、步骤解析

1、自己造一份日志,包含(cdn,region,level,time,ip,domain,url、traffic)字段,且time、ip、domain、traffic变化,50M到100M大小
2、编写MR程序对日志进行清洗
3、清洗完后的日志移动到Hive外表的location上
4、刷新Hive分区信息
5、查询每个domain的traffic的总和
6、利用Shell封装整个运行过程

二、利用日志生成器生成日志并上传至HDFS

日志生成器

package com.ruoze.hadoop.utils;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;

public class GenerateLogUtils {
    public static void main(String[] args) {
        generateLog();
    }

    private static String generateLog() {
        try {

            //创建文件只创建一次  此处代码不能放到for循环中 不然会很耗费性能
            File file = new File("access.log");
            if (!file.exists()) {
                file.createNewFile();
            }

            for (int i = 0; i < 1000000; i++) {
                Random rd = new Random();
                Date date = randomDate("2019-01-01", "2019-01-31");

                String[] domainStr = new String[]{
                        "v1.go2yd.com",
                        "v2.go2yd.com",
                        "v3.go2yd.com",
                        "v4.go2yd.com",
                        "v5.go2yd.com",
                };
                int domainNum = rd.nextInt(domainStr.length - 1);

                String[] trafficStr = new String[]{
                        "136662",
                        "785966",
                        "987422",
                        "975578",
                        "154851",
                        ""
                };

                int trafficNum = rd.nextInt(trafficStr.length - 1);

                StringBuilder builder = new StringBuilder();
                builder
                        .append("baidu").append("\t")
                        .append("CN").append("\t")
                        .append("2").append("\t")
                        .append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date)).append("\t")
                        .append(getRandomIp()).append("\t")
                        .append(domainStr[domainNum]).append("\t")
                        .append("http://v1.go2yd.com/user_upload/1531633977627104fdecdc68fe7a2c4b96b2226fd3f4c.mp4_bd.mp4").append("\t")
                        .append(trafficStr[trafficNum]).append("\t");
                FileWriter fileWriter = new FileWriter(file.getName(), true);
                fileWriter.write(builder.toString() + "\n");

                fileWriter.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return "";
    }


    /**
     * 随机生成时间
     *
     * @param beginDate
     * @param endDate
     * @return
     */
    private static Date randomDate(String beginDate, String endDate) {
        try {
            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
            Date start = format.parse(beginDate);
            Date end = format.parse(endDate);

            if (start.getTime() >= end.getTime()) {
                return null;
            }
            long date = random(start.getTime(), end.getTime());
            return new Date(date);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    private static long random(long begin, long end) {
        long rtn = begin + (long) (Math.random() * (end - begin));
        if (rtn == begin || rtn == end) {
            return random(begin, end);
        }
        return rtn;
    }


    /**
     * 随机生成IP-----------------------------------------------------
     *
     * @return
     */
    public static String getRandomIp() {

        // ip范围
        int[][] range = {{607649792, 608174079},// 36.56.0.0-36.63.255.255
                {1038614528, 1039007743},// 61.232.0.0-61.237.255.255
                {1783627776, 1784676351},// 106.80.0.0-106.95.255.255
                {2035023872, 2035154943},// 121.76.0.0-121.77.255.255
                {2078801920, 2079064063},// 123.232.0.0-123.235.255.255
                {-1950089216, -1948778497},// 139.196.0.0-139.215.255.255
                {-1425539072, -1425014785},// 171.8.0.0-171.15.255.255
                {-1236271104, -1235419137},// 182.80.0.0-182.92.255.255
                {-770113536, -768606209},// 210.25.0.0-210.47.255.255
                {-569376768, -564133889}, // 222.16.0.0-222.95.255.255
        };

        Random rdint = new Random();
        int index = rdint.nextInt(10);
        String ip = num2ip(range[index][0] + new Random().nextInt(range[index][1] - range[index][0]));
        return ip;
    }

    /*
     * 将十进制转换成ip地址
     */
    public static String num2ip(int ip) {
        int[] b = new int[4];
        String x = "";

        b[0] = (int) ((ip >> 24) & 0xff);
        b[1] = (int) ((ip >> 16) & 0xff);
        b[2] = (int) ((ip >> 8) & 0xff);
        b[3] = (int) (ip & 0xff);
        x = Integer.toString(b[0]) + "." + Integer.toString(b[1]) + "." + Integer.toString(b[2]) + "." + Integer.toString(b[3]);

        return x;
    }


}

将access.log上传至HDFS路径

 hadoop fs -put access.log  /g6/hadoop/accesslog/20190402/

三、MR清洗

1、编写清洗日志的LogUtils类

package com.ruoze.hadoop.utils;

import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Locale;

public class LogUtils {
    DateFormat sourceFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.ENGLISH);
    DateFormat targetFormat = new SimpleDateFormat("yyyyMMddHHmmss");

    /**
     * 日志文件解析,对内容进行字段的处理
     * 按\t分割
     */
    public String parse(String log) {
        String result = "";
        try {
            String[] splits = log.split("\t");
            String cdn = splits[0];
            String region = splits[1];
            String level = splits[2];
            String timeStr = splits[3];
//            String time = timeStr.substring(1, timeStr.length() - 7);

            String time = targetFormat.format(sourceFormat.parse(timeStr));

            String ip = splits[4];
            String domain = splits[5];
            String url = splits[6];
            String traffic = splits[7];

            StringBuilder builder = new StringBuilder("");
            builder.append(cdn).append("\t")
                    .append(region).append("\t")
                    .append(level).append("\t")
                    .append(time).append("\t")
                    .append(ip).append("\t")
                    .append(domain).append("\t")
                    .append(url).append("\t")
                    .append(traffic);

            result = builder.toString();
        } catch (ParseException e) {
            e.printStackTrace();
        }
        return result;
    }
}

2、LogUtils的单元测试

package com.ruoze.hadoop.utils;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class LogUtilsTest {
    private LogUtils utils;

    @Test
    public void LogUtilsTest() {

        String log = "baidu\tCN\t2\t2019-01-10 16:02:54\t121.77.143.199\tv2.go2yd.com\thttp://v3.go2yd.com/user_upload/1531633977627104fdecdc68fe7a2c4b96b2226fd3f4c.mp4_bd.mp4\t97557845";
        String result = utils.parse(log);
        System.out.println(result);
    }

    @Before
    public void setUp() {

        utils = new LogUtils();
    }

    @After
    public void trarDown() {
        utils = null;
    }
}

测试结果如图:


3、Mapper

package com.ruoze.hadoop.mapreduce;

import com.ruoze.hadoop.utils.LogUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class LogETLMapper  extends Mapper<LongWritable,Text,NullWritable,Text>{
    /**
     * 通过mapreduce框架的map方式进行数据清洗
     * 进来一条数据就按照我们的解析规则清洗完以后输出
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        int length = value.toString().split("\t").length;
        String traffic = value.toString().split("\t")[7];
        if(length == 8 && traffic != null) {

            LogUtils utils = new LogUtils();
            String result = utils.parse(value.toString());
            if(StringUtils.isNotBlank(result)) {
                context.write(NullWritable.get(), new Text(result));
            }
        }
    }
}

4、Job

package com.ruoze.hadoop.mapreduce;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class LogETLDriver {
    public static void main(String[] args) throws Exception{
        if (args.length != 2) {
            System.err.println("please input 2 params: input output");
            System.exit(0);
        }

        String input = args[0];
        String output = args[1];  

        //System.setProperty("hadoop.home.dir", "D:\\Hadoop\\hadoop-2.6.0-cdh5.7.0");

        Configuration configuration = new Configuration();

        FileSystem fileSystem = FileSystem.get(configuration);
        Path outputPath = new Path(output);
        if (fileSystem.exists(outputPath)) {
            fileSystem.delete(outputPath, true);
        }

        Job job = Job.getInstance(configuration);
        job.setJarByClass(LogETLDriver.class);
        job.setMapperClass(LogETLMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);
    }

}

以上程序编写后打包上传至服务器:

[hadoop@hadoop000 lib]$ ll
total 12
-rw-r--r-- 1 hadoop hadoop 8754 Mar 29 22:38 hadoop-1.0.jar

在HDFS上创建MR程序的输出路径:

hadoop fs -mkdir -p /g6/hadoop/access/output/day=20190402

四、创建Hive外表

create external table g6_access (
cdn string,
region string,
level string,
time string,
ip string,
domain string,
url string,
traffic bigint
) partitioned by (day string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION '/g6/hadoop/access/clear' 

因为MR程序每次运行时会删除输出路径,所以Hive的location不要指向输出路径,等MR跑完后将数据移动到location下。

五、运行Hadoop MR程序进行测试

1、运行MR

hadoop jar /home/hadoop/lib/hadoop-1.0.jar com.ruoze.hadoop.mapreduce.LogETLDriver /g6/hadoop/accesslog/20190402/ /g6/hadoop/access/output/day=20190402

2、将输出结果移动到Location下

hadoop fs -mv /g6/hadoop/access/output/day=20190402 /g6/hadoop/access/clear

3、刷新Hive分区(不刷新Hive是查询不到数据的)

alter table g6_access add if not exists partition(day=20190402);

4、Hive统计分析每个domain的traffic的总和

hive (g6_hadoop)> select domain,count(*) from g6_access group by domain;
Query ID = hadoop_20190402232525_4b5c6115-d9a4-4dbd-8cbd-768f298decb4
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1554215624276_0002, Tracking URL = http://hadoop000:8088/proxy/application_1554215624276_0002/
Kill Command = /home/hadoop/soul/app/hadoop-2.6.0-cdh5.7.0/bin/hadoop job  -kill job_1554215624276_0002
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2019-04-02 23:30:45,007 Stage-1 map = 0%,  reduce = 0%
2019-04-02 23:30:51,476 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.73 sec
2019-04-02 23:30:57,940 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 2.77 sec
MapReduce Total cumulative CPU time: 2 seconds 770 msec
Ended Job = job_1554215624276_0002
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 2.77 sec   HDFS Read: 44772154 HDFS Write: 76 SUCCESS
Total MapReduce CPU Time Spent: 2 seconds 770 msec
OK
domain  _c1
v1.go2yd.com    74908
v2.go2yd.com    74795
v3.go2yd.com    75075
v4.go2yd.com    75222
Time taken: 21.612 seconds, Fetched: 4 row(s)

六、shell封装整个流程

g6_mr_etl.sh

#/bin/bash

source ~/.bash_profile

if [ $# != 1 ] ; then
echo "Usage: g6_mr_etl.sh <dateString>"
echo "E.g.: g6_mr_etl.sh 20190402"
exit 1;
fi


process_date=$1 

echo -e "\033[36m###### step1:MR ETL ######\033[0m"  
hadoop jar /home/hadoop/lib/hadoop-1.0.jar com.ruoze.hadoop.mapreduce.LogETLDriver /g6/hadoop/accesslog/$process_date/ /g6/hadoop/access/output/day=$pro
cess_date



hive -e "use hive;
create external table if  not exists g6_access (
cdn string,
region string,
level string,
time string,
ip string,
domain string,
url string,
traffic bigint
) partitioned by (day string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION '/g6/hadoop/access/clear' ;"

echo -e "\033[36m###### step2:Mv Data to DW ###### \033[0m"  
hadoop fs -mv /g6/hadoop/access/output/day=$process_date /g6/hadoop/access/clear


echo -e "\033[36m###### step3:Alter metadata ######\033[0m"  
database=g6_hadoop
hive -e "use ${database}; alter table g6_access add if not exists partition(day=$process_date);"
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容