本节来介绍Spark实战案例:将某网站的访问日志,按照页面名称来分区存放。
本节用到的Windows版的Hadoop(需要用到其中的winutils.exe文件):
hadoop-2.4.1.zip 提取码:80c4
winutils.exe 提取码:npmm
access_log.txt 提取码:xxjc
本节用到的日志文件格式如下所示:
192.168.88.1 - - [30/Jul/2017:12:54:52 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:53 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
192.168.88.1 - - [30/Jul/2017:12:54:54 +0800] "GET /MyDemoWeb/mysql.jsp HTTP/1.1" 200 241
1.下载Spark安装目录下的所有Jar包
使用WinSCP工具将$SPARK_HOME/jars目录下的所有Jar包下载到本地目录如E:/sparklib中。
2.使用IDEA搭建Spark程序开发环境
- 打开IDEA开发工具,选择“Create New Project”,选择Scala下面的sbt工程,Next
- 工程名字:WebLogPartitioner,选择一个保存位置,比如:E:/WebLogPartitioner,Java版本和Scala版本需要和Spark集群环境中的保持一致,sbt版本保持默认,Finish
- 在WebLogPartitioner工程上右键单击,New,Directory,输入目录名:lib,OK
- 将刚才下载好的所有Jar包,复制粘贴到lib文件夹,在弹出的确认对话框中单击OK
- 选中lib目录下的所有的Jar包,右键,选中“Add as Libirary”,在弹出的对话框中输入库的名字,比如:spark_jars,OK
注意:IDEA构建工程的过程比较慢,请耐心等待。只有等工程构建完成后,工程的结构才会完整,写代码时才会有智能提示。可以通过更改sbt源的方式来提高构建速度,这里不作介绍。
3.编写WebLogPartitioner程序
- 在WebLogPartitioner工程下的src/main/scala目录上右键,New,Package:demo,OK
- 在demo下面New,Scala Class:WebLogPartitioner.scala,种类选择:Object,OK
- 编写WebLogPartitioner代码如下:
package demo
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import scala.collection.mutable
object WebLogPartitioner {
def main(args: Array[String]): Unit = {
//指定Hadoop家目录:在Windows下执行需要用到其中的winutils.exe文件
System.setProperty("hadoop.home.dir","E:\\hadoop-2.4.1\\hadoop-2.4.1")
//创建SparkConf对象
val conf = new SparkConf().setAppName("WebLogPartitioner").setMaster("local")
//创建SparkContext对象
val sc = new SparkContext(conf)
//读入日志数据,如:192.168.88.1 - - [30/Jul/2017:12:54:41 +0800]
//"GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
val rdd1 = sc.textFile("d:\\temp\\access_log.txt").map(
line => {
//解析字符串,返回结果Map(JSP名字,日志信息),如:(hadoop.jsp, 192.168.88.1 - -
//[30/Jul/2017:12:54:41 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242)
//解析命令字符串,如:GET /MyDemoWeb/hadoop.jsp HTTP/1.1
val index1 = line.indexOf("\"") //找到左双引号的位置
val index2 = line.lastIndexOf("\"") //找到右双引号的位置
val command = line.substring(index1+1,index2)
//解析jps文件路径字符串,如:/MyDemoWeb/hadoop.jsp
val index3 = command.indexOf(" ") //第一个空格
val index4 = command.lastIndexOf(" ") //最后一个空格
val path = command.substring(index3+1,index4)
//解析jsp文件名,如:hadoop.jsp
val jspName = path.substring(path.lastIndexOf("/")+1)
// 返回Map(JSP名字,日志信息)
(jspName,line)
}
)
//得到jsp文件名的数据集,去掉重复的记录
val rdd2 = rdd1.map(_._1).distinct().collect()
//创建分区规则对象
val myPartioner = new WebLogPartitionerRule(rdd2)
//根据分区规则开始建立分区
val rdd3 = rdd1.partitionBy(myPartioner)
//将完整的日志文件按照分区输出到指定目录
rdd3.saveAsTextFile("d:\\temp\\out1")
//停止SparkContext对象
sc.stop()
}
}
//定义分区规则的类
class WebLogPartitionerRule(allJSPName:Array[String]) extends Partitioner {
//定义一个Map,将jsp文件名映射到一个整数
val partitionMap = new mutable.HashMap[String,Int]()
var partID = 0 //分区号从0开始
for(name <- allJSPName){
//为每个jsp文件名建立一个分区号
partitionMap.put(name,partID)
//分区号自增
partID += 1
}
//重写变量numPartitions:分区总数
override def numPartitions = partitionMap.size
//重写方法getPartition:根据jsp名字,查询对应的分区号
override def getPartition(key: Any) = {
partitionMap.getOrElse(key.toString,0)
}
}
3.运行结果
按Ctrl + Shift + F10或者在代码上右键,选择Run “WebLogPartitioner”运行,结果如下:
image
从输出的结果中可以看出,将原来的日志按照页面分为8个分区,相同页面的日志划分到同一个分区中。就像下面这样:
image
image