Spark Streaming =>很火,在流处理中得到了广泛的应用。TensorFlow=>很火,由Google大神开源,目前已经在深度学习领域展现了超高的流行潜质。那么如何在Spark Streaming中调用TensorFlow?笔者此文尝试使用了简单粗暴的方式在Spark Streaming中调用TensorFlow.
1. 需求和目标
笔者已经实现了一个基于Spark Streaming的流处理平台,能够对Kafka输入的流数据进行运算,并且可以通过Sql的方式对数据进行过滤和逻辑判断。
笔者玩了一段时间的TensorFlow,一直在琢磨在机器学习的Model Serving阶段,能否由Kafka喂数据,将TF融入到已有的流处理中,流入的每条数据都可以进行predict,处理结果还能通过Spark DataFrame进行SQL配置过滤。
2. 第一次尝试,失败。
因为在已有的Spark Streaming代码框架中,有一个自定义的函数接口,能够依次处理RDD的每一个record,所以刚开始考虑在此函数接口中通过调用Python脚本实现调用TensorFlow。
步骤如下:
- 在Spark executor 执行的机器上安装TensorFlow环境
- 在每个Record的执行逻辑中import sys.process._ ,并调用外部TF的Python脚本。伪代码如下:
//加入依赖库
import sys.process._
// 执行TF hello world脚本
val result = "sudo python /data/tf/hello.py".!!
// 把执行脚本的结果输出到Map,最终在另外的地方输出到Kafka中
labelMap += (isAILabel -> result.toString)
hello.py脚本非常简单,就是简单的调用TensorFlow的hello world.
import tensorflow as tf
hello = tf.constant('Hello, TensorFlow!')
sess = tf.Session()
print sess.run(hello)
- 看起来很简单很粗暴,最简单的方法而已。运行的结果能正常调用起来,但是因为sys.process._是fork出进程去执行脚本命令,在流处理过程中,如果每条数据去fork新的进程,会导致执行效率非常底下。因此,在实际测试中,一台机器每个macro batch 700 - 800 这样极少的数据下,执行一个批次竟然都需要消耗5分钟以上。
因此,这种方法虽然看似简单,但无法使用。
3. 第二种简单粗暴方式 RDD pipe.
对每一条数据执行TF脚本不可取,那么换个思路,针对整个RDD进行处理,尝试使用了RDD pipe的方法进行处理。在测试中性能比上一种方法有了极大的提升,基本上在几秒内处理完成。
方法如下:
- 在Spark executor 执行的机器上安装TensorFlow环境
- 在RDD处理中,使用rdd的pipe方法,执行python TF的脚本,然后生成新的pipedRDD。
val piped = labelRDD.pipe("sudo python /data/tf/hello.py", Map("SEPARATOR" -> ","))
- 因为在笔者环境中,原始的rdd是json类型的,需要使用zip方法,将pipedRDD合并到原始的labelRDD中。这里用了自定义function的zipPartitions的方法。
val aiRDD = labelRDD.zipPartitions(piped){
(rdd1Iter,rdd2Iter) => {
var result = List[String]()
while(rdd1Iter.hasNext && rdd2Iter.hasNext) {
val firstStr = rdd1Iter.next()
result::=(firstStr.substring(0, firstStr.length - 1) + "," + rdd2Iter.next() + "}")
}
result.iterator
}
}
- hello.py 稍加改造,解析RDD的json类型,然后如果某一字段值为f1,则输出Hello world. 这里输出的结果为json串,以方便和RDD中的json合并。
import sys
import json
import tensorflow as tf
hello = tf.constant('Hello, TensorFlow!')
sess = tf.Session()
while True:
line = sys.stdin.readline()
if not line:
break
data = json.loads(line)
if (data['f1'] == 'f1'):
print '"aiout":"' + sess.run(hello) + '"'
else:
print '"aiout":"No"'
- 这样简单的TF hello world程序就可以在原有的Spark Streaming中被launch 起来。
NOTE: 本文只是简单的在Spark streaming中拉起Python调用TensorFlow,后续还需要从API层面看如何能够更好的集成。