Spark的零散笔记(二)

1. 减少client模式下的日志输出

import org.apache.log4j.{ Level, Logger }
Logger.getLogger("org").setLevel(Level.ERROR)//或者WARN

2. 判断字符串是否为整数或小数

和spark没关系,是个scala语法问题。
实际项目中涉及过滤数据质量的问题,因此综合网上看到的例子整理出一个能同时适配整数和小数的方法。

  var pattern = """^[0-9]+([.]{1}[0-9]+){0,1}""".r
  def isNumber(s: String) = {
    s match {
      case null             => false
      case this.pattern(_*) => true
      case _                => false
    }
  }

3. spark传递对象的序列化的问题

原因是写了一个读取配置文件的config类,在构造函数中读取文件,然后赋值。报错:

object not serializable

原因:spark任务是由driver分配给executor的。在driver中建立的class(基本上就是各种rdd算子之外定义的class)必须通过序列化方式下发,否则就要回避这种用法:

//定义
class config extends java.io.Serializable {
    def this(path: String) {
        this() //调用主构造函数
      //自定义构造方法
    }
}
//使用
val taskconf =  new config(args(0))

其他的方法(不建议用,参见后续分析):
使用object或case object,此时不能定义构造函数,但是可以自己写一个load函数。使用的时候也不用new:

//定义
object config {
    def load(path: String) {

    }
}   
//使用
val taskconf =  config
taskconf.load(args(0))
//或者直接用config

相当于实现了一个静态类,而且感觉作为一个只读的配置类,这样搞就够了。

ps. 经过后续测试,object对象往executor里面传好像还是有问题,这个很奇怪:
我的obj中有个load函数,读配置文件然后给object中的变量赋值,此外变量自身有默认值。在开发用的虚机上,object应该是正确而传到executor了,里面的配置值是正确的。但是在实际服务器上,发现传过去的是初始值,load函数中读取的东西没有传过去,目前没有找出两个环境的关键差别(差别肯定是有的,但不知道哪些方面起的作用。)

4. scala的class和object

目前只遇到了case class和class的序列化问题,其他的没有深究

  • object
    • scala没有静态的概念,object可以模拟静态方法,
    • object不可用构造函数
    • main函数必须在object里面
    • 可以通过定义伴生对象,可以实现既有实例成员又有静态成员的类的功能(可互相访问private field)
    • object not serializable
  • case class和case object
    • 使用case 关键字定义类不需要创建对象直接调用
    • case class:样例类,常被用于模式匹配,比如定义rdd转df的表结构,感觉类似结构体(struct)。
    • case class支持序列化,初始化可以不用new,一般是只读的(val),系统自动创建伴生对象
    • case class必须有参数列表,否则报错,如果没有参数,可以用case object

5. scala读properties文件

之前没用过,也不太理解这个Properties,格式和读写都很好用,但遇到的主要问题是路径问题,网上很多教程是搞WEB开发的人写的,路径的使用上存在一些差异。
最终的解决:

import java.util.Properties
import java.io.{ FileInputStream }
//调用
val ins = this.getClass().getResourceAsStream(path);
properties.load(new FileInputStream(path))
this.mathod = properties.getProperty("mathod", "0").trim()

直接用properties.load方法,path中即便有绝对路径也不认,但加入第一句就没问题了。如果没有第一句,则要求properties文件在当前执行路径下(不是jar包位置)。

6. 累加器

  • 累加器在Driver端定义赋初始值,并只能由Driver端读取,Excutor端负责持续写入。注意,Spark2语法,和1的时候不太一样。
  • 一般建议用到foreach等action里,因为只会执行一次。transform中使用累加器有坑,例如可以把累加器放到map当中,如果读早了(在action之前)就没有值,如果后续有多个读取该rdd的操作,则tranform中的累加器会重复累加。此外还要考虑容错恢复(tranform也可能执行多遍)等情况。
  • 在简单计数时它和count()效果相同,但是count需要自己进行独立的遍历,累加器可以塞到其他的分析过程里(比如map)
  • 如果是dataframe计数,貌似不是很方便了,不如直接用sql。
  • 可能累加器在流分析等场景更有用一些。

对比一下:
方法1:map/reduce会遍历一遍数据,然后的count或foreach会再遍历一遍数据。

//先建立SparkSession:spark
val accum = spark.sparkContext.longAccumulator("My Accumulator")
rdd1.map().reduce()
rdd1.count())
//或者
rdd1.foreach( x => accum.add(1))

方法2:rdd1只被遍历的一遍,即完成map/reduce和计数过程。

//先建立SparkSession:spark
val accum = spark.sparkContext.longAccumulator("My Accumulator")
//"My Accumulator"这个名字会在spark的web ui中显示出来
rdd1.map( x => {accum.add(1);
x //这里指map的真正输出
}).reduce()
println(accum.value)

方法2中,如果:

rdd2 = rdd1.map( x => {accum.add(1);
x //这里指map的真正输出
})
rdd2.reduce()
rdd2.reducebykey()
println(accum.value)

rdd2之后经历了两个action,则计数器所在的map会被执行两遍。此时累加器的value会翻倍。
(注意rdd2的定义实际是惰性的,因为没有action)

数组累加器:

val a1 = spark.sparkContext.collectionAccumulator[String]("a1")
//在算子中使用的时候
a1.add("some string")
//driver端读取的时候
a1.value.toArray().mkString("\n") //有点土

如果将累加器作为参数传递(或者其他场景需要引用头文件):

import org.apache.spark.util.{ LongAccumulator, CollectionAccumulator }

8. 广播变量

//先建立SparkSession:spark
spark.sparkContext.broadcast(prop)

广播变量允许程序员在每台机器上缓存一个只读变量(RDD数据也行),而不是将变量与任务一起发送。
目前看到obj是无法被广播的,因为无法序列化。
在join、跨stage、多task等场景可以优化。

还有就是官网当中rdd、累加器和共享变量是放在一个标题下的,不知道官方如何定位dataframe中的累加器和共享变量。
补充:后续广播了一个dataframe,但想在另一个dataframe之中嵌套使用时出错,这个问题暂时没解决。

9. 有关scala中字符串split和array长度

遇到一个小坑,应该是我孤陋寡闻了:

      val s  = "0,0,0,0,0,,0,,,,,,,,"
      val a = s.split(",")
      println(a.length)

出来的长度是7,也就是最后的空值没有计算在内,中间的空值是考虑的
但如果:

      val s  = "0,0,0,0,0,,0,,,,,,,,"
      val a = s.split(",",-1)
      println(a.length)

则输出长度是15,此时举例来说,a(8)=""(empty),不是null。
需要注意字符串isEmpty和null的区别

10. 有关history-server和eventlog,以及通过rest接口监控

网上有很多教程,但有些不是很全,我的体会如下:

  1. spark的web监控界面,默认是8080,但这个主要看的是集群,当然也能看任务。spark提交的任务详情,转到默认4040查看(注意如果有多个任务上下文,则依次是4041、4042),但如果是完成的任务就需要通过history-server(默认18080端口查看了)。注意:4040端口只在程序运行时有响应,但程序运行完毕之后,4040端口立马不响应了。但是18080是可以查看所有完成或未完成任务细节的(我还没有对比显示细节是否有差异)
  2. 如果想通过rest接口查看任务状态(获得json数据),通过8080是查不到的(我做错了吗?),只能通过4040+18080查看,目前看来18080更方便,但前提是配置并启动history-server。
  3. 提交任务之后,是不知道任务id的,这时候(如果通过rest方式)只能通过 http://master:18080/api/v1/applications?status=[completed|running] 先获得任务列表,再来找任务,感觉有点不爽,不知道还有什么更好的办法没有。
  4. 首先配置history-server:
    4.1. 在conf/spark-env.sh中添加

export SPARK_HISTORY_OPTS=" -Dspark.history.fs.logDirectory=hdfs://node1:9000/history/"

spark.history.fs.logDirectory这一项是必须配置的,也可以用“file:///”指定本地目录。但路径必须是已经存在的。
还可以利用-D添加更多的指令,在上面官网的“Spark History Server Configuration Options”一节中记录了很多可以修改的配置,常见的比如重新指定端口(spark.history.ui.port),以及cache的任务数量(spark.history.retainedApplications)。
之后启动服务:start-history-server.sh(sbin下面)
4.2. 如果不做4.1的配置,则需要启动history-server的时候使用:start-history-server.sh dfs://node1:9000/history/
4.3.在提交的任务中配置:

spark.eventLog.enabled true
spark.eventLog.dir hdfs://node1:9000/history
spark.eventLog.compress true (可选)

注意:spark.eventLog.dir是针对任务的,spark.history.fs.logDirectory是针对history server的,这两项很容易迷惑,它们的配置文件不同,但内容(路径)必须是一致的,且必须提前建立的!否则history server找不到日志的。
另外,理论上任务的配置,直接在spark-defaults.conf中配就好了,submit的时候,会自动加载这些默认参数,但是在我的项目中不生效,不知道什么原因(现在只能理解我的spark-defaults.conf格式有错),于是我把上述配置内容写入代码,就没问题了。

5

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容