1. 减少client模式下的日志输出
import org.apache.log4j.{ Level, Logger }
Logger.getLogger("org").setLevel(Level.ERROR)//或者WARN
2. 判断字符串是否为整数或小数
和spark没关系,是个scala语法问题。
实际项目中涉及过滤数据质量的问题,因此综合网上看到的例子整理出一个能同时适配整数和小数的方法。
var pattern = """^[0-9]+([.]{1}[0-9]+){0,1}""".r
def isNumber(s: String) = {
s match {
case null => false
case this.pattern(_*) => true
case _ => false
}
}
3. spark传递对象的序列化的问题
原因是写了一个读取配置文件的config类,在构造函数中读取文件,然后赋值。报错:
object not serializable
原因:spark任务是由driver分配给executor的。在driver中建立的class(基本上就是各种rdd算子之外定义的class)必须通过序列化方式下发,否则就要回避这种用法:
//定义
class config extends java.io.Serializable {
def this(path: String) {
this() //调用主构造函数
//自定义构造方法
}
}
//使用
val taskconf = new config(args(0))
其他的方法(不建议用,参见后续分析):
使用object或case object,此时不能定义构造函数,但是可以自己写一个load函数。使用的时候也不用new:
//定义
object config {
def load(path: String) {
}
}
//使用
val taskconf = config
taskconf.load(args(0))
//或者直接用config
相当于实现了一个静态类,而且感觉作为一个只读的配置类,这样搞就够了。
ps. 经过后续测试,object对象往executor里面传好像还是有问题,这个很奇怪:
我的obj中有个load函数,读配置文件然后给object中的变量赋值,此外变量自身有默认值。在开发用的虚机上,object应该是正确而传到executor了,里面的配置值是正确的。但是在实际服务器上,发现传过去的是初始值,load函数中读取的东西没有传过去,目前没有找出两个环境的关键差别(差别肯定是有的,但不知道哪些方面起的作用。)
4. scala的class和object
目前只遇到了case class和class的序列化问题,其他的没有深究
- object
- scala没有静态的概念,object可以模拟静态方法,
- object不可用构造函数
- main函数必须在object里面
- 可以通过定义伴生对象,可以实现既有实例成员又有静态成员的类的功能(可互相访问private field)
- object not serializable
- case class和case object
- 使用case 关键字定义类不需要创建对象直接调用
- case class:样例类,常被用于模式匹配,比如定义rdd转df的表结构,感觉类似结构体(struct)。
- case class支持序列化,初始化可以不用new,一般是只读的(val),系统自动创建伴生对象
- case class必须有参数列表,否则报错,如果没有参数,可以用case object
5. scala读properties文件
之前没用过,也不太理解这个Properties,格式和读写都很好用,但遇到的主要问题是路径问题,网上很多教程是搞WEB开发的人写的,路径的使用上存在一些差异。
最终的解决:
import java.util.Properties
import java.io.{ FileInputStream }
//调用
val ins = this.getClass().getResourceAsStream(path);
properties.load(new FileInputStream(path))
this.mathod = properties.getProperty("mathod", "0").trim()
直接用properties.load方法,path中即便有绝对路径也不认,但加入第一句就没问题了。如果没有第一句,则要求properties文件在当前执行路径下(不是jar包位置)。
6. 累加器
- 累加器在Driver端定义赋初始值,并只能由Driver端读取,Excutor端负责持续写入。注意,Spark2语法,和1的时候不太一样。
- 一般建议用到foreach等action里,因为只会执行一次。transform中使用累加器有坑,例如可以把累加器放到map当中,如果读早了(在action之前)就没有值,如果后续有多个读取该rdd的操作,则tranform中的累加器会重复累加。此外还要考虑容错恢复(tranform也可能执行多遍)等情况。
- 在简单计数时它和count()效果相同,但是count需要自己进行独立的遍历,累加器可以塞到其他的分析过程里(比如map)
- 如果是dataframe计数,貌似不是很方便了,不如直接用sql。
- 可能累加器在流分析等场景更有用一些。
对比一下:
方法1:map/reduce会遍历一遍数据,然后的count或foreach会再遍历一遍数据。
//先建立SparkSession:spark
val accum = spark.sparkContext.longAccumulator("My Accumulator")
rdd1.map().reduce()
rdd1.count())
//或者
rdd1.foreach( x => accum.add(1))
方法2:rdd1只被遍历的一遍,即完成map/reduce和计数过程。
//先建立SparkSession:spark
val accum = spark.sparkContext.longAccumulator("My Accumulator")
//"My Accumulator"这个名字会在spark的web ui中显示出来
rdd1.map( x => {accum.add(1);
x //这里指map的真正输出
}).reduce()
println(accum.value)
方法2中,如果:
rdd2 = rdd1.map( x => {accum.add(1);
x //这里指map的真正输出
})
rdd2.reduce()
rdd2.reducebykey()
println(accum.value)
rdd2之后经历了两个action,则计数器所在的map会被执行两遍。此时累加器的value会翻倍。
(注意rdd2的定义实际是惰性的,因为没有action)
数组累加器:
val a1 = spark.sparkContext.collectionAccumulator[String]("a1")
//在算子中使用的时候
a1.add("some string")
//driver端读取的时候
a1.value.toArray().mkString("\n") //有点土
如果将累加器作为参数传递(或者其他场景需要引用头文件):
import org.apache.spark.util.{ LongAccumulator, CollectionAccumulator }
8. 广播变量
//先建立SparkSession:spark
spark.sparkContext.broadcast(prop)
广播变量允许程序员在每台机器上缓存一个只读变量(RDD数据也行),而不是将变量与任务一起发送。
目前看到obj是无法被广播的,因为无法序列化。
在join、跨stage、多task等场景可以优化。
还有就是官网当中rdd、累加器和共享变量是放在一个标题下的,不知道官方如何定位dataframe中的累加器和共享变量。
补充:后续广播了一个dataframe,但想在另一个dataframe之中嵌套使用时出错,这个问题暂时没解决。
9. 有关scala中字符串split和array长度
遇到一个小坑,应该是我孤陋寡闻了:
val s = "0,0,0,0,0,,0,,,,,,,,"
val a = s.split(",")
println(a.length)
出来的长度是7,也就是最后的空值没有计算在内,中间的空值是考虑的
但如果:
val s = "0,0,0,0,0,,0,,,,,,,,"
val a = s.split(",",-1)
println(a.length)
则输出长度是15,此时举例来说,a(8)=""(empty),不是null。
需要注意字符串isEmpty和null的区别
10. 有关history-server和eventlog,以及通过rest接口监控
- 其实通过网页监控任务,比通过rest接口要方便。
- 可参考的官网信息,比如rest接口和history-server配置方法等:http://spark.apache.org/docs/latest/monitoring.html
网上有很多教程,但有些不是很全,我的体会如下:
- spark的web监控界面,默认是8080,但这个主要看的是集群,当然也能看任务。spark提交的任务详情,转到默认4040查看(注意如果有多个任务上下文,则依次是4041、4042),但如果是完成的任务就需要通过history-server(默认18080端口查看了)。注意:4040端口只在程序运行时有响应,但程序运行完毕之后,4040端口立马不响应了。但是18080是可以查看所有完成或未完成任务细节的(我还没有对比显示细节是否有差异)
- 如果想通过rest接口查看任务状态(获得json数据),通过8080是查不到的(我做错了吗?),只能通过4040+18080查看,目前看来18080更方便,但前提是配置并启动history-server。
- 提交任务之后,是不知道任务id的,这时候(如果通过rest方式)只能通过 http://master:18080/api/v1/applications?status=[completed|running] 先获得任务列表,再来找任务,感觉有点不爽,不知道还有什么更好的办法没有。
- 首先配置history-server:
4.1. 在conf/spark-env.sh中添加
export SPARK_HISTORY_OPTS=" -Dspark.history.fs.logDirectory=hdfs://node1:9000/history/"
spark.history.fs.logDirectory这一项是必须配置的,也可以用“file:///”指定本地目录。但路径必须是已经存在的。
还可以利用-D添加更多的指令,在上面官网的“Spark History Server Configuration Options”一节中记录了很多可以修改的配置,常见的比如重新指定端口(spark.history.ui.port),以及cache的任务数量(spark.history.retainedApplications)。
之后启动服务:start-history-server.sh(sbin下面)
4.2. 如果不做4.1的配置,则需要启动history-server的时候使用:start-history-server.sh dfs://node1:9000/history/
4.3.在提交的任务中配置:
spark.eventLog.enabled true
spark.eventLog.dir hdfs://node1:9000/history
spark.eventLog.compress true (可选)
注意:spark.eventLog.dir是针对任务的,spark.history.fs.logDirectory是针对history server的,这两项很容易迷惑,它们的配置文件不同,但内容(路径)必须是一致的,且必须提前建立的!否则history server找不到日志的。
另外,理论上任务的配置,直接在spark-defaults.conf中配就好了,submit的时候,会自动加载这些默认参数,但是在我的项目中不生效,不知道什么原因(现在只能理解我的spark-defaults.conf格式有错),于是我把上述配置内容写入代码,就没问题了。
:
5