一、需求及步骤解析
1、需求
利用MR对日志进行清洗后交由Hive统计分析
2、步骤解析
1、自己造一份日志,包含(cdn,region,level,time,ip,domain,url、traffic)字段,且time、ip、domain、traffic变化,50M到100M大小
2、编写MR程序对日志进行清洗
3、清洗完后的日志移动到Hive外表的location上
4、刷新Hive分区信息
5、查询每个domain的traffic的总和
6、利用Shell封装整个运行过程
二、利用日志生成器生成日志并上传至HDFS
日志生成器
package com.ruoze.hadoop.utils;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
public class GenerateLogUtils {
public static void main(String[] args) {
generateLog();
}
private static String generateLog() {
try {
//创建文件只创建一次 此处代码不能放到for循环中 不然会很耗费性能
File file = new File("access.log");
if (!file.exists()) {
file.createNewFile();
}
for (int i = 0; i < 1000000; i++) {
Random rd = new Random();
Date date = randomDate("2019-01-01", "2019-01-31");
String[] domainStr = new String[]{
"v1.go2yd.com",
"v2.go2yd.com",
"v3.go2yd.com",
"v4.go2yd.com",
"v5.go2yd.com",
};
int domainNum = rd.nextInt(domainStr.length - 1);
String[] trafficStr = new String[]{
"136662",
"785966",
"987422",
"975578",
"154851",
""
};
int trafficNum = rd.nextInt(trafficStr.length - 1);
StringBuilder builder = new StringBuilder();
builder
.append("baidu").append("\t")
.append("CN").append("\t")
.append("2").append("\t")
.append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date)).append("\t")
.append(getRandomIp()).append("\t")
.append(domainStr[domainNum]).append("\t")
.append("http://v1.go2yd.com/user_upload/1531633977627104fdecdc68fe7a2c4b96b2226fd3f4c.mp4_bd.mp4").append("\t")
.append(trafficStr[trafficNum]).append("\t");
FileWriter fileWriter = new FileWriter(file.getName(), true);
fileWriter.write(builder.toString() + "\n");
fileWriter.close();
}
} catch (IOException e) {
e.printStackTrace();
}
return "";
}
/**
* 随机生成时间
*
* @param beginDate
* @param endDate
* @return
*/
private static Date randomDate(String beginDate, String endDate) {
try {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
Date start = format.parse(beginDate);
Date end = format.parse(endDate);
if (start.getTime() >= end.getTime()) {
return null;
}
long date = random(start.getTime(), end.getTime());
return new Date(date);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
private static long random(long begin, long end) {
long rtn = begin + (long) (Math.random() * (end - begin));
if (rtn == begin || rtn == end) {
return random(begin, end);
}
return rtn;
}
/**
* 随机生成IP-----------------------------------------------------
*
* @return
*/
public static String getRandomIp() {
// ip范围
int[][] range = {{607649792, 608174079},// 36.56.0.0-36.63.255.255
{1038614528, 1039007743},// 61.232.0.0-61.237.255.255
{1783627776, 1784676351},// 106.80.0.0-106.95.255.255
{2035023872, 2035154943},// 121.76.0.0-121.77.255.255
{2078801920, 2079064063},// 123.232.0.0-123.235.255.255
{-1950089216, -1948778497},// 139.196.0.0-139.215.255.255
{-1425539072, -1425014785},// 171.8.0.0-171.15.255.255
{-1236271104, -1235419137},// 182.80.0.0-182.92.255.255
{-770113536, -768606209},// 210.25.0.0-210.47.255.255
{-569376768, -564133889}, // 222.16.0.0-222.95.255.255
};
Random rdint = new Random();
int index = rdint.nextInt(10);
String ip = num2ip(range[index][0] + new Random().nextInt(range[index][1] - range[index][0]));
return ip;
}
/*
* 将十进制转换成ip地址
*/
public static String num2ip(int ip) {
int[] b = new int[4];
String x = "";
b[0] = (int) ((ip >> 24) & 0xff);
b[1] = (int) ((ip >> 16) & 0xff);
b[2] = (int) ((ip >> 8) & 0xff);
b[3] = (int) (ip & 0xff);
x = Integer.toString(b[0]) + "." + Integer.toString(b[1]) + "." + Integer.toString(b[2]) + "." + Integer.toString(b[3]);
return x;
}
}
将access.log上传至HDFS路径
hadoop fs -put access.log /g6/hadoop/accesslog/20190402/
三、MR清洗
1、编写清洗日志的LogUtils类
package com.ruoze.hadoop.utils;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Locale;
public class LogUtils {
DateFormat sourceFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.ENGLISH);
DateFormat targetFormat = new SimpleDateFormat("yyyyMMddHHmmss");
/**
* 日志文件解析,对内容进行字段的处理
* 按\t分割
*/
public String parse(String log) {
String result = "";
try {
String[] splits = log.split("\t");
String cdn = splits[0];
String region = splits[1];
String level = splits[2];
String timeStr = splits[3];
// String time = timeStr.substring(1, timeStr.length() - 7);
String time = targetFormat.format(sourceFormat.parse(timeStr));
String ip = splits[4];
String domain = splits[5];
String url = splits[6];
String traffic = splits[7];
StringBuilder builder = new StringBuilder("");
builder.append(cdn).append("\t")
.append(region).append("\t")
.append(level).append("\t")
.append(time).append("\t")
.append(ip).append("\t")
.append(domain).append("\t")
.append(url).append("\t")
.append(traffic);
result = builder.toString();
} catch (ParseException e) {
e.printStackTrace();
}
return result;
}
}
2、LogUtils的单元测试
package com.ruoze.hadoop.utils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class LogUtilsTest {
private LogUtils utils;
@Test
public void LogUtilsTest() {
String log = "baidu\tCN\t2\t2019-01-10 16:02:54\t121.77.143.199\tv2.go2yd.com\thttp://v3.go2yd.com/user_upload/1531633977627104fdecdc68fe7a2c4b96b2226fd3f4c.mp4_bd.mp4\t97557845";
String result = utils.parse(log);
System.out.println(result);
}
@Before
public void setUp() {
utils = new LogUtils();
}
@After
public void trarDown() {
utils = null;
}
}
测试结果如图:
3、Mapper
package com.ruoze.hadoop.mapreduce;
import com.ruoze.hadoop.utils.LogUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class LogETLMapper extends Mapper<LongWritable,Text,NullWritable,Text>{
/**
* 通过mapreduce框架的map方式进行数据清洗
* 进来一条数据就按照我们的解析规则清洗完以后输出
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
int length = value.toString().split("\t").length;
String traffic = value.toString().split("\t")[7];
if(length == 8 && traffic != null) {
LogUtils utils = new LogUtils();
String result = utils.parse(value.toString());
if(StringUtils.isNotBlank(result)) {
context.write(NullWritable.get(), new Text(result));
}
}
}
}
4、Job
package com.ruoze.hadoop.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class LogETLDriver {
public static void main(String[] args) throws Exception{
if (args.length != 2) {
System.err.println("please input 2 params: input output");
System.exit(0);
}
String input = args[0];
String output = args[1];
//System.setProperty("hadoop.home.dir", "D:\\Hadoop\\hadoop-2.6.0-cdh5.7.0");
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(configuration);
Path outputPath = new Path(output);
if (fileSystem.exists(outputPath)) {
fileSystem.delete(outputPath, true);
}
Job job = Job.getInstance(configuration);
job.setJarByClass(LogETLDriver.class);
job.setMapperClass(LogETLMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
job.waitForCompletion(true);
}
}
以上程序编写后打包上传至服务器:
[hadoop@hadoop000 lib]$ ll
total 12
-rw-r--r-- 1 hadoop hadoop 8754 Mar 29 22:38 hadoop-1.0.jar
在HDFS上创建MR程序的输出路径:
hadoop fs -mkdir -p /g6/hadoop/access/output/day=20190402
四、创建Hive外表
create external table g6_access (
cdn string,
region string,
level string,
time string,
ip string,
domain string,
url string,
traffic bigint
) partitioned by (day string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION '/g6/hadoop/access/clear'
因为MR程序每次运行时会删除输出路径,所以Hive的location不要指向输出路径,等MR跑完后将数据移动到location下。
五、运行Hadoop MR程序进行测试
1、运行MR
hadoop jar /home/hadoop/lib/hadoop-1.0.jar com.ruoze.hadoop.mapreduce.LogETLDriver /g6/hadoop/accesslog/20190402/ /g6/hadoop/access/output/day=20190402
2、将输出结果移动到Location下
hadoop fs -mv /g6/hadoop/access/output/day=20190402 /g6/hadoop/access/clear
3、刷新Hive分区(不刷新Hive是查询不到数据的)
alter table g6_access add if not exists partition(day=20190402);
4、Hive统计分析每个domain的traffic的总和
hive (g6_hadoop)> select domain,count(*) from g6_access group by domain;
Query ID = hadoop_20190402232525_4b5c6115-d9a4-4dbd-8cbd-768f298decb4
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1554215624276_0002, Tracking URL = http://hadoop000:8088/proxy/application_1554215624276_0002/
Kill Command = /home/hadoop/soul/app/hadoop-2.6.0-cdh5.7.0/bin/hadoop job -kill job_1554215624276_0002
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2019-04-02 23:30:45,007 Stage-1 map = 0%, reduce = 0%
2019-04-02 23:30:51,476 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 1.73 sec
2019-04-02 23:30:57,940 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 2.77 sec
MapReduce Total cumulative CPU time: 2 seconds 770 msec
Ended Job = job_1554215624276_0002
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 2.77 sec HDFS Read: 44772154 HDFS Write: 76 SUCCESS
Total MapReduce CPU Time Spent: 2 seconds 770 msec
OK
domain _c1
v1.go2yd.com 74908
v2.go2yd.com 74795
v3.go2yd.com 75075
v4.go2yd.com 75222
Time taken: 21.612 seconds, Fetched: 4 row(s)
六、shell封装整个流程
g6_mr_etl.sh
#/bin/bash
source ~/.bash_profile
if [ $# != 1 ] ; then
echo "Usage: g6_mr_etl.sh <dateString>"
echo "E.g.: g6_mr_etl.sh 20190402"
exit 1;
fi
process_date=$1
echo -e "\033[36m###### step1:MR ETL ######\033[0m"
hadoop jar /home/hadoop/lib/hadoop-1.0.jar com.ruoze.hadoop.mapreduce.LogETLDriver /g6/hadoop/accesslog/$process_date/ /g6/hadoop/access/output/day=$pro
cess_date
hive -e "use hive;
create external table if not exists g6_access (
cdn string,
region string,
level string,
time string,
ip string,
domain string,
url string,
traffic bigint
) partitioned by (day string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION '/g6/hadoop/access/clear' ;"
echo -e "\033[36m###### step2:Mv Data to DW ###### \033[0m"
hadoop fs -mv /g6/hadoop/access/output/day=$process_date /g6/hadoop/access/clear
echo -e "\033[36m###### step3:Alter metadata ######\033[0m"
database=g6_hadoop
hive -e "use ${database}; alter table g6_access add if not exists partition(day=$process_date);"