的确太容易懂了,就不解释了。
的确我们在用Spark做了几件事:数据处理与分析,特征工程和模型结果的预测与分析以及正在进行的爬虫结果分析(主要用来分析个人或企业的互联网舆情帮助我们对欺诈及综合信用状况给出判定)。就像我对自己技术团队要求的一样,基本的Spark提供的模型算法库里面的LR, SVM, 决策树,随机森林都是技术团队必须掌握的(不一定对,根据各自的业务来决定,我们做互联网信贷风控,掌握这些基本差不多了)。而且希望我自有技术团队的学习路径也是从http://spark.apache.org/examples.html 开始。然后深入学习MLLib和ML里面的基本特性,再然后就是了解我们信贷风控在系统建设过程中匹配的业务场景以及各家银行客户的偏好程度,对应的金融产品状况并深入到项目中进行具体的开发工作。实际的开发工作也会从认识银行数据开始,不要盲目的一上来就进行编码工作,比如:我曾经扔给团队一个图:
Spark跟基础数据源的对接基本靠Spark Streaming,比如:Kafka (Scala Code)
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
比如:MapR (Scala Code)
import com.mapr.db.spark.sql._
import org.apache.spark.sql.SparkSession
val dataFromMapR = spark.loadFromMapRDB("/apps/auction_json_table")
比如:ES (Scala Code)
val conf = ...
val sc = new SparkContext(conf)
val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
sc.makeRDD.saveToEs("spark/docs")
实际工作的时候,我们的大多数项目的初始数据来源基本都是DB和一些导出的CSV文件。Spark在数据分析和建模方面给我们的工作提供的极大的便利,下面也会给出两个MLLib的例子来说明一下Spark的简洁之美。(不是说别的不好比如Storm的Mahout,比如Flink有自己的ML库,的确我们在用,特别是在我们的业务场景下学习门槛并不高)
1、协同过滤【在真实的资产推介场景下,可以使用协同过滤的推荐算法】不得不说的三种分类
a)、基于用户:根据相似用户的偏好信息产生的对目标用户的推荐(9年前在一家创业公司利用MongoDB和对用户的标签化做过这样的尝试)
b)、基于项目: 根据用户对相似项目的评分数据来预测目标项目的额评分,但是它的假设前提条件如下,如果大部分用户对某些项目的打分比较接近,那么新近用户对这些项目的打分也会比较接近。说的直白点,就是根据最近邻居查询进行推荐。
c)、基于模型:这也是现在用的比较多的一个方式,需要计算用户之间或项目之间的相似度然后利用机器学习的方法对user-item交互关系进行学习得到一个模型,也是基于前两种推荐算法上的个性化推荐方式(我们做贷超也在用类似的算法)当然最差的情况时间复杂度为O(m*n)
这张图刚好说明了该推荐模型。当然常用的系统过滤矩阵分解算法包括了:奇异值分解法(SVD,这种方法常常需要配合数据整理和清洗补足空数据来进行,但是的确比较简单,我们也拿它来举例)正则化矩阵分解和带偏置的矩阵分解。Spark很容易实现了这种基于奇异值分解的协同过滤算法
import org.apache.spark.mllib.linalg.distributed._
val data = sc.textFile("my.data")
val parsedData = data.map(_.split(',') match{
case Array(user, item, rate) => MatrixEntry(user.toLong -1, item.toLong - 1, rate.toDouble)
})
val ratings = new CoordinateMatrix(parsedData)
//转换的完成,并且调用自带的ComputeSVD函数
val matrix = ratings.toRowMatrix
val sad = matrix.computeSVD(2, true)
val score = (0 to 1).map(t => sad.U.rows.toArray()(0)(t) * *svd.V.transpose.toArray(t))
//那么 score.sum*svd.s(0) 就是用户1 对项目1的评分
2、逻辑回归,我们用的最多解释性最好的算法模型之一。Spark给我们提供了很多代码样例和数据,
运行下面的代码需要自己安装NumPy (pip install numpy --save),由于我们自己的开发环境也以Python为主,建议可以使用pySpark来入门,反正以后的很多数据清洗工作也都是用python在做,好好学习python没坏处。
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD
def parsePoint(line):
"""
Parse a line of text into an MLlib LabeledPoint object.
"""
values = [float(s) for s in line.split(' ')]
if values[0] == -1: # Convert -1 labels to 0 for MLlib
values[0] = 0
return LabeledPoint(values[0], values[1:])
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: logistic_regression <file> <iterations>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonLR")
points = sc.textFile(sys.argv[1]).map(parsePoint)
iterations = int(sys.argv[2])
model = LogisticRegressionWithSGD.train(points, iterations)
print("Final weights: " + str(model.weights))
print("Final intercept: " + str(model.intercept))
sc.stop()
关于基础的Spark的工具准备,我们先说这么多,在下面关于个人金融风控的设计中还会提及。
-Leon 2019-2-7