Spark Sql日志分析项目实战

项目简介
  • 统计主站最受欢迎的课程Top N 访问次数
  • 按地市统计主站最受欢迎的Top N 课程
  • 按流量统计主站最受欢迎的Top N 课程
环境安装

CDH相关软件下载地址

Spark环境搭建

1、官网下载相应版本源码包
参考编译过程
2、spark源码编译中的坑
pom.xml添加

<repository>
    <id>cloudera</id>
    <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>

设置内存

export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"

选择scala版本

./dev/change-scala-version.sh 2.10
  • 环境搭建
    设置环境变量

local模式启动:spark-shell --master local[2]

standalone模式:

  • 修改spark-env.sh配置文件
    SPARK_MASTER_HOST=master
    SPARK_WORKER_CORES=2
    SPARK_WORKER_CORES=2g
    SPARK_WORKER_INSTANCES=1
  • 启动
    sbin/start-all.sh:启动slaves配置的所有节点的worker
    spark-shell --master spark://master:7077:启动spark
    spark-shell --help 可以查看启动参数
    --total-executor-cores 1 指定core总数量

Spark on Yarn
spark-env.sh 添加 Hadoop conf 的目录

  • [Client]
    Driver运行在Client端
    Client会和请求到的Container进行通信来完成作业的调度和执行,Client不能退出
    日志信息会在控制台输出,方便调试
  • [Cluster]
    Driver运行在ApplicationMaster中
    Client只要提交完作业之后就可以关闭
    日志在终端看不到,可以通过yarn logs -applicationId <app ID>查看日志
    ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ [--deploy-mode cluster \] //默认client模式 --executor-memory 1G \ --num-executors 1 \ /home/kang/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.1.0.jar \ 4
Spark SQL 框架介绍

Spark SQL is Apache Spark's module for working with structured(结构化) data.

  • Integrated(集成)
    Seamlessly mix(无缝混合) SQL queries with Spark programs.
    Spark SQL lets you query structured data inside Spark programs, using either SQL or a familiar DataFrame API. Usable in Java, Scala, Python and R.
  • Uniform Data Access(统一的数据访问)
    Connect to any data source the same way.
    DataFrames and SQL provide a common way to access a variety of data sources, including Hive, Avro, Parquet, ORC, JSON, and JDBC. You can even join data across these sources.
  • Hive Integration(Hive集成)
    Run SQL or HiveQL queries on existing warehouses
    Spark SQL supports the HiveQL syntax as well as Hive SerDes and UDFs, allowing you to access existing Hive warehouses.
  • Standard Connectivity(标准连接)
    Connect through JDBC or ODBC.
    A server mode provides industry standard JDBC and ODBC connectivity for business intelligence tools.
从Hive平滑过渡到Spark SQL
  • Spark1.x中Spark SQL的入口点:SQLContext
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

本地可直接运行

package com.test

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext

/**
  * SQLContext的使用
  */
object SQLContextApp {
  def main(args: Array[String]): Unit = {

    //创建相应的Context
    val sparkConf = new SparkConf()

    //在测试和或者生产中,参数一般通过脚本进行指定
    sparkConf.setAppName("SQLContext").setMaster("spark://192.168.247.100:7077")//测试通常采用本地模式“local[2]”
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)

    //相应的处理:json
    val path = args(0)
    val people = sqlContext.read.format("json").load(path)
    people.printSchema()
    people.show()

    //关闭资源
    sc.stop()
  }
}

打包上服务器运行

spark-submit \
  --name SQLContext \
  --class com.test.SQLContextApp \
  --master spark://192.168.247.100:7077 \
  /home/kang/lib/SparkTest-1.0.jar \
  /home/kang/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json
  • Spark1.x中Spark SQL的入口点:HiveContext
    要获取hive中的元数据信息,需把hive-site.xml配置文件复制到spark的/conf目录下
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
package com.test

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

/**
  * HiveContext的使用
  */
object HiveContextApp {
  def main(args: Array[String]): Unit = {

    //创建相应的Context
    val sparkConf = new SparkConf()

    //在测试和或者生产中,参数一般通过脚本进行指定
    sparkConf.setAppName("HiveContext").setMaster("spark://192.168.247.100:7077")//测试通常采用本地模式“local[2]”
    val sc = new SparkContext(sparkConf)
    val hiveContext = new HiveContext(sc)

    //相应的处理:json
    hiveContext.table("test").show

    //关闭资源
    sc.stop()
  }
}

打包上传提交时要添加MySQL连接包

spark-submit \
  --name HiveContext \
  --class com.test.HiveContextApp \
  --master spark://master:7077\
  --jars /home/kang/lib/mysql-connector-java-5.1.34.jar \
  /home/kang/lib/HiveContext.jar \
  • Spark2.x中Spark SQL的入口点:SparkSession
import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()
spark-shell/spark-sql的使用
  • 添加hive-site.xml配置文件
  • -- jars传递mysql驱动包
  • spark-shell --master local[2] --jars ~/lib/mysql-connector-java-5.1.34.jar
    spark.sql("show tables").show
  • spark-sql --master local[2] --jars ~/lib/mysql-connector-java-5.1.34.jar
    直接输入sql语句
    explain extended + sql(查看详细执行计划)
thriftserver/beeline的使用
  • 启动thriftserver
    默认端口是10000,可以修改
./sbin/start-thriftserver.sh --master local[2] \
--jars ~/lib/mysql-connector-java-5.1.34.jar \
--hiveconf hive.server2.thrift.port=10040
  • 启动beeline
./bin/beeline -u jdbc:hive2://localhost:10040 -n kang
  • thriftserver和普通的spark-shell/spark-sql有什么区别?
    spark-shell、spark-sql都是一个spark application
    thriftserver,不管启动多少个客户端(beeline/code),永远只有一个spark application,多个客户端可以共享缓存数据。
  • code连接thriftserver
    添加相关的依赖包
<dependency>
      <groupId>org.spark-project.hive</groupId>
      <artifactId>hive-jdbc</artifactId>
      <version>1.2.1.spark2</version>
</dependency>
package com.test

import java.sql.DriverManager

object SparkSQLThriftServerApp {

  def main(args: Array[String]): Unit = {
    Class.forName("org.apache.hive.jdbc.HiveDriver")
    val conn = DriverManager.getConnection("jdbc:hive2://192.168.247.100:10040","kang","")
    val pstmt = conn.prepareStatement("select * from test")
    val rs = pstmt.executeQuery()
    while (rs.next()){
      println("context:" + rs.getString("context"))
    }
    rs.close()
    pstmt.close()
    conn.close()
  }
}
用户行为日志概述
  • 用户行为日志:用户每次访问网站时所有的行为数据(访问、浏览、搜索、点击)

  • 日志数据内容
    访问的系统属性:操作系统、浏览器等等
    访问特征:点击的url,从哪个url跳转过来、页面工停留的时间等
    访问信息:session_id、访问ip(访问城市)等

  • 分析的意义

离线数据处理架构(流程)
  • 数据采集
    nginx记录日志信息
    Flume:web日志写入HDFS
  • 数据清洗
    spark、Hive、Mapreduce等
  • 使用Spark SQL解析访问日志
  • 解析出课程编号、类型
  • 根据IP解析出城市信息
  • 使用Spark SQL将访问时间按天进行分区输出

输入:访问时间、访问url、耗费的流量、访问的IP信息
输出:URL、cmsType(video/article)、cmsId(编号)、流量、ip、城市信息、访问时间、天

ip地址解析
下载:https://github.com/kangapp/ipdatabase
编译:mvn clean package -DskipTests
jar包入库:mvn install:install-file -Dfile=F:\ipdatabase-master\target\ipdatabase-1.0-SNAPSHOT.jar -DgroupId=com.ggstar -DartifactId=ipdatabase -Dversion=1.0 -Dpackaging=jar
pom文件引入,resources文件两个表格文件引入

  • 数据处理
    spark、Hive、Mapreduce进行业务统计和分析
    任务调度:Oozie、Azkaban

调优点:
1)控制文件输出大小:coalesce
2)分区字段的数据类型调整
.config("spark.sql.sources.partitionColumnTypeInference.enabled","false")

  • 处理结果入库
    RDBMS、NoSQL

需求一
create table day_video_access_topn_stat (
day varchar(8) not null,
cms_id bigint(10) not null,
times bigint(10) not null,
primary key (day,cms_id)
)

需求二
create table day_video_city_access_topn_stat (
day varchar(8) not null,
cms_id bigint(10) not null,
city varchar(20) not null,
times bigint(10) not null,
times_rank int not null,
primary key (day,city,cms_id)
)
需求三
create table day_video_traffics_access_topn_stat (
day varchar(8) not null,
cms_id bigint(10) not null,
traffics bigint(20) not null,
primary key (day,cms_id)
)

  • 数据可视化
    Echarts、HUE、Zeppelin
项目需求
  • 统计imooc主站最受欢迎的课程/手记Top N访问次数
  • 按地市统计imooc主站最受欢迎Top N课程
    根据IP提取城市信息
    窗口函数在Spark SQL中的使用
  • 按流量统计imocc主站最受欢迎的Top N课程
项目打包

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
mvn assembly:assembly
spark-submit \ --class com.test.SparkStatCleanJobYARN \ --name SparkStatCleanJobYARN \ --master yarn \ --executor-memory 1G \ --num-executors 1 \ --files /home/kang/project/SprakSQL/resource/ipDatabase.csv,/home/kang/project/SprakSQL/resource/ipRegion.xlsx \ /home/kang/project/SprakSQL/lib/sparksql.jar \ hdfs://192.168.247.100:9000/data/spark/output/* hdfs://192.168.247.100:9000/data/spark/partitionByDay

项目性能调优

https://segmentfault.com/a/1190000014876069

代码优化
  • 选用高性能的算子
  • 复用已有的数据
参数优化

并行度:spark.sql.shuffle.partitions
分区字段类型推测:spark.sql.sources.partitionColumnTypeInference.enabled

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,193评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,306评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,130评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,110评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,118评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,085评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,007评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,844评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,283评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,508评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,395评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,985评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,630评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,797评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,653评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,553评论 2 352

推荐阅读更多精彩内容