引言
在很多领域,如股市走向分析, 气象数据测控,网站用户行为分析等,具有数据产生快,实时性强,数据量大的特点。Spark Streaming 在数据流动的过程中实时地进行捕捉和处理,并根据业务需求对数据进行计算分析。本文将介绍如何使用 Apache Kafka 和 Spark Streaming 模块构建一个实时的数据处理系统。
Spark Streaming 结合 kafka 介绍
kafka 简介
Zookeeper 在 Kafka 中的作用
流数据产生系统(网页、APP等用户操作数据)作为 Kafka 的 producer 将数据流分发给 Kafka 的 Topic,流数据计算系统 (Storm,Spark Streaming 等) 实时消费并计算数据。这也是本文将要介绍的应用场景。
关于 Spark Steaming
Kafka 集群搭建步骤
1. 机器准备
本文中,我们将准备三台机器搭建 Kafka 集群,IP 地址分别是 192.168.1.1,192.168.1.2,192.168.1.3,并且三台机器网络互通。
2. 下载并安装 kafka_2.10-0.8.2.1
下载地址: https://kafka.apache.org/downloads.html
下载完成后,上传到目标机器中的一个,如 192.168.1.1 , 使用以下命令解压:
tar –xvf kafka_2.10-0.8.2.1
安装完成。
3. 创建 zookeeper 数据目录并设定服务器编号
在所有三台服务器上执行下面操作。
切换到当前用户工作目录,如/home/fams , 创建 zookeeper 保存数据的目录, 然后在这个目录下新建服务器编号文件。
mkdir zk_data
// 创建 myid 文本,把 N 输入到 myid
cat N > myid
注意需要保证 N 在三台服务器上取不同值,如分别取 1,2,3。
4. 编辑 zookeeper 配置文件
Kafka 安装包中内置 zookeeper 服务。进入 Kafka 安装目录, 如/home/fams/kafka_2.10-0.8.2.1, 编辑 config/zookeeper.properties 文件,增加以下配置:
tickTime=2000
dataDir=/home/fams/zk_data/
clientPort=2181
initLimit=5
syncLimit=2
server.1=192.168.1.1:2888:3888
server.2=192.168.1.2:2888:3888
server.3=192.168.1.3:2888:3888
这些配置项的解释如下:
- tickTime:zookeeper 服务器之间的心跳时间间隔,以毫秒为单位。
- dataDir:zookeeper 的数据保存目录,我们也把 zookeeper 服务器的 ID 文件保存到这个目录下,下文会介绍。
- clientPort:zookeeper 服务器会监听这个端口,然后等待客户端连接。
- initLimit:zookeeper 集群中 follower 服务器和 leader 服务器之间建立初始连接时所能容忍的心跳次数的极限值。
- syncLimit:zookeeper 集群中 follower 服务器和 leader 服务器之间请求和应答过程中所能容忍的心跳次数的极限值。
- server.N:N 代表的是 zookeeper 集群服务器的编号。对于配置值,以 192.168.1.1:2888:3888 为例,192.168.1.1 表示该服务器的 IP 地址,2888 端口表示该服务器与 leader 服务器的数据交换端口,3888 表示选举新的 leader 服务器时候用到的通信端口。
5. 编辑 Kafka 配置文件
a. 编辑 config/server.properties 文件
broker.id=0
port=9092
host.name=192.168.1.1
zookeeper.contact=192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181
log.dirs=/home/fams/kafka-logs
这些配置项解释如下:
- broker.id:Kafka broker 的唯一标识,集群中不能重复。
- port: Broker 的监听端口,用于监听 Producer 或者 Consumer 的连接。
- host.name:当前 Broker 服务器的 IP 地址或者机器名。
- zookeeper.contact:Broker 作为 zookeeper 的 client,可以连接的 zookeeper 的地址信息。
- log.dirs:日志保存目录。
b. 编辑 config/producer.properties 文件
添加或者修改以下配置:
broker.list=192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092
producer.type=async
这些配置项解释如下:
- broker.list:集群中 Broker 地址列表。
- producer.type: Producer 类型,async 异步生产者,sync 同步生产者。
c. 编辑 config/consumer.properties 文件
zookeeper.contact=192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181
配置项解释如下:
- zookeeper.contact: Consumer 可以连接的 zookeeper 服务器地址列表。
6. 上传修改好的安装包到其他机器
至此,我们已经在 192.168.1.1 机器上修改好了所有需要的配置文件,那么接下来请用以下命令打包该 Kafka 安装包,并上传至 192.168.1.2 和 192.168.1.3 两台机器上。
tar –cvf kafka_2.10-0.8.2.1.tar ./kafka_2.10-0.8.2.1
scp ./kafka_2.10-0.8.2.1.tar <a href="mailto:fams@192.168.1.2:/home/fams"><code>fams@192.168.1.2:/home/fams</code></a>
scp ./kafka_2.10-0.8.2.1.tar <a href="mailto:fams@192.168.1.3:/home/fams"><code>fams@192.168.1.3:/home/fams</code></a>
上传完成后,我们需要到 192.168.1.2 和 192.168.1.3 两台机器上解压刚才上传的 tar 包,命令如清单一。之后需要分别在两台机器上修改 config/server.properties 文件中的 broker.id 和 host.name. broker.id,可以分别复制 1 和 2,host.name 需要改成当前机器的 IP。
7. 启动 zookeeper 和 Kafka 服务
分别在三台机器上运行下面命令启动 zookeeper 和 Kafka 服务。
//启动 zookeeper 服务
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
//启动 kafka 服务
nohup bin/kafka-server-start.sh config/server.properties &
8. 验证安装
我们的验证步骤有两个。
第一步,分别在三台机器上使用下面命令查看是否有 Kafka 和 zookeeper 相关服务进程。
查看 Kafka 和 zookeeper 服务进程
ps –ef | grep kafka
第二步,创建消息主题,并通过 console producer 和 console consumer 验证消息可以被正常的生产和消费。
创建消息主题
bin/kafka-topics.sh --create \
--replication-factor 3 \
--partition 3 \
--topic user-behavior-topic \
--zookeeper 192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181
运行下面命令打开打开 console producer。
启动 Console Producer
bin/kafka-console-producer.sh --broker-list 192.168.1.1:9092 --topic user-behavior-topic
在另一台机器打开 console consumer。
启动 Console Consumer
./kafka-console-consumer.sh --zookeeper 192.168.1.2:2181 --topic user-behavior-topic --from-beginning
然后如果在 producer console 输入一条消息,能从 consumer console 看到这条消息就代表安装是成功的。
其他一些操作命令:
2.config/server.properties
添加zk地址:zookeeper.connect=node-1:2181,node-2:2181,node-3:2181
修改broker.id(唯一的):broker.id=0
3.启动
/bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &
4.创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test
5.列出所有topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
6.向topic中写入数据
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
7.消费数据
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
8.查看指定topic的详情
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
案例介绍与编程实现
1. 案例介绍
该案例中,我们假设某论坛需要根据用户对站内网页的点击量,停留时间,以及是否点赞,来近实时的计算网页热度,进而动态的更新网站的今日热点模块,把最热话题的链接显示其中。
2. 案例分析
对于某一个访问论坛的用户,我们需要对他的行为数据做一个抽象,以便于解释网页话题热度的计算过程。
首先,我们通过一个向量来定义用户对于某个网页的行为即点击的网页,停留时间,以及是否点赞,可以表示如下:
(page001.html, 1, 0.5, 1)
向量的第一项表示网页的 ID,第二项表示从进入网站到离开对该网页的点击次数,第三项表示停留时间,以分钟为单位,第四项是代表是否点赞,1 为赞,-1 表示踩,0 表示中立。
其次,我们再按照各个行为对计算网页话题热度的贡献,给其设定一个权重,在本文中,我们假设点击次数权重是 0.8,因为用户可能是由于没有其他更好的话题,所以再次浏览这个话题。停留时间权重是 0.8,因为用户可能同时打开多个 tab 页,但他真正关注的只是其中一个话题。是否点赞权重是 1,因为这一般表示用户对该网页的话题很有兴趣。
最后,我们定义用下列公式计算某条行为数据对于该网页热度的贡献值。
f(x,y,z)=0.8x+0.8y+z
那么对于上面的行为数据 (page001.html, 1, 0.5, 1),利用公式可得:
H(page001)=f(x,y,z)= 0.8x+0.8y+z=0.81+0.80.5+1*1=2.2
读者可以留意到,在这个过程中,我们忽略了用户本身,也就是说我们不关注用户是谁,而只关注它对于网页热度所做的贡献。
3. 生产 行为数据 消息
在本案例中我们将使用一段程序来模拟用户行为,该程序每隔 5 秒钟会随机的向 user-behavior-topic 主题推送 0 到 50 条行为数据消息,显然,这个程序扮演消息生产者的角色,在实际应用中,这个功能一般会由一个系统来提供。为了简化消息处理,我们定义消息的格式如下:
网页 ID|点击次数|停留时间 (分钟)|是否点赞
并假设该网站只有 100 个网页。以下是该类的 Scala 实现源码。
UserBehaviorMsgProducer 类源码
import scala.util.Random
import java.util.Properties
import kafka.producer.KeyedMessage
import kafka.producer.ProducerConfig
import kafka.producer.Producer
class UserBehaviorMsgProducer(brokers: String, topic: String) extends Runnable {
private val brokerList = brokers
private val targetTopic = topic
private val props = new Properties()
props.put("metadata.broker.list", this.brokerList)
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("producer.type", "async")
private val config = new ProducerConfig(this.props)
private val producer = new Producer[String, String](this.config)
private val PAGE_NUM = 100
private val MAX_MSG_NUM = 3
private val MAX_CLICK_TIME = 5
private val MAX_STAY_TIME = 10
private val LIKE_OR_NOT = Array[Int](1, 0, -1)
def run(): Unit = {
val rand = new Random()
while (true) {
val msgNum = rand.nextInt(MAX_MSG_NUM) + 1
for (i <- 0 to msgNum) {
var msg = new StringBuilder()
msg.append("page" + (rand.nextInt(PAGE_NUM) + 1))
msg.append("|")
msg.append(rand.nextInt(MAX_CLICK_TIME) + 1)
msg.append("|")
msg.append(rand.nextInt(MAX_CLICK_TIME) + rand.nextFloat())
msg.append("|")
msg.append(LIKE_OR_NOT(rand.nextInt(3)))
println(msg.toString())
sendMessage(msg.toString())
}
println("%d user behavior messages produced.".format(msgNum+1))
Thread.sleep(5000)
}
}
def sendMessage(message: String) = {
val data = new KeyedMessage[String, String](this.topic, message);
producer.send(data);
}
}
object UserBehaviorMsgProducerClient {
def main(args: Array[String]) {
new Thread(new UserBehaviorMsgProducer(args(0), args(1))).start()
}
}
4. 编写 Spark Streaming 程序 消费消息
在弄清楚了要解决的问题之后,就可以开始编码实现了。对于本案例中的问题,在实现上的基本步骤如下:
- 构建 Spark 的 StreamingContext 实例,并且开启 checkpoint 功能。因为我们需要使用 updateStateByKey 原语去累计的更新网页话题的热度值。
- 利用 Spark 提供的 KafkaUtils.createStream 方法消费消息主题,这个方法会返回 ReceiverInputDStream 对象实例。
- 对于每一条消息,利用上文的公式计算网页话题的热度值。
- 定义一个匿名函数去把网页热度上一次的计算结果值和新计算的值相加,得到最新的热度值。
- 调用 updateStateByKey 原语并传入上面定义的匿名函数更新网页热度值。
- 最后得到最新结果后,需要对结果进行排序,最后打印热度值最高的 10 个网页。
源代码如下。
WebPagePopularityValueCalculator 类源码
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.HashPartitioner
import org.apache.spark.streaming.Duration
object WebPagePopularityValueCalculator {
private val checkpointDir = "popularity-data-checkpoint"
private val msgConsumerGroup = "user-behavior-topic-message-consumer-group"
def main(args: Array[String]) {
val Array(zkServers, processingInterval) = args
val conf = new SparkConf().setAppName("Web Page Popularity Value Calculator")
val ssc = new StreamingContext(conf, Seconds(processingInterval.toInt))
ssc.checkpoint(checkpointDir)
val kafkaStream = KafkaUtils.createStream(ssc, zkServers, msgConsumerGroup,
//Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread
Map("user-behavior-topic" -> 3))
val popularityData = kafkaStream.map(_._2).map {
msgLine => {
val dataArr: Array[String] = msgLine.split("\\|")
val pageID = dataArr(0)
val popValue: Double = dataArr(1).toFloat * 0.8 + dataArr(2).toFloat * 0.8 + dataArr(3).toFloat * 1
(pageID, popValue)
}
}
//sum the previous popularity value and current value
val updatePopularityValue = (iterator: Iterator[(String, Seq[Double], Option[Double])]) => {
iterator.flatMap(t => {
val newValue:Double = t._2.sum
val stateValue:Double = t._3.getOrElse(0);
Some(newValue + stateValue)
}.map(sumedValue => (t._1, sumedValue)))
}
val initialRDD = ssc.sparkContext.parallelize(List(("page1", 0.00)))
val stateDstream = popularityData.updateStateByKey[Double](updatePopularityValue,
new HashPartitioner(ssc.sparkContext.defaultParallelism), true, initialRDD)
//set the checkpoint interval to avoid too frequently data checkpoint which may
//may significantly reduce operation throughput
stateDstream.checkpoint(Duration(8*processingInterval.toInt*1000))
//after calculation, we need to sort the result and only show the top 10 hot pages
stateDstream.foreachRDD {
rdd => {
val sortedData = rdd.map{ case (k,v) => (v,k) }.sortByKey(false)
val topKData = sortedData.take(10).map{ case (v,k) => (k,v) }
topKData.foreach(println(_))
}
}
ssc.start()
ssc.awaitTermination()
}
}
部署和测试
读者可以参考以下步骤部署并测试本案例提供的示例程序。
第一步,启动行为消息生产者程序, 可以直接在 Scala IDE 中启动,不过需要添加启动参数,第一个是 Kafka Broker 地址,第二个是目标消息主题的名称。
图 1. UserBehaviorMsgProducer 类启动参数
启动后,可以看到控制台有行为消息数据生成。
图 2. 生成的行为消息数据预览
第二步,启动作为行为消息消费者的 Spark Streaming 程序,需要在 Spark 集群环境中启动,命令如下:
清单 16. WebPagePopularityValueCalculator 类启动命令
bin/spark-submit \
--jars $SPARK_HOME/lib/spark-streaming-kafka_2.10-1.3.1.jar, \
$SPARK_HOME/lib/spark-streaming-kafka-assembly_2.10-1.3.1.jar, \
$SPARK_HOME/lib/kafka_2.10-0.8.2.1.jar, \
$SPARK_HOME/lib/kafka-clients-0.8.2.1.jar \
--class com.ibm.spark.exercise.streaming.WebPagePopularityValueCalculator
--master spark://<spark_master_ip>:7077 \
--num-executors 4 \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 2 \
/home/fams/sparkexercise.jar \
192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181 2
由于程序中我们要用到或者间接调用 Kafka 的 API,并且需要调用 Spark Streaming 集成 Kafka 的 API(KafkaUtils.createStream), 所以需要提前将启动命令中的 jar 包上传到 Spark 集群的每个机器上 (本例中我们将它们上传到 Spark 安装目录的 lib 目录下,即$SPARK_HOME/lib),并在启动命令中引用它们。
启动后,我们可以看到命令行 console 下面有消息打印出来,即计算的热度值最高的 10 个网页。
图 3. 网页话题热度当前排序预览
我们也可以到 Spark Web Console 上去查看当前 Spark 程序的运行状态, 默认地址为: http://spark_master_ip:8080。
图 4. 查看 Spark Streaming 程序的运行状态
注意事项
利用 Spark Streaming 构建一个高效健壮的流数据计算系统,我们还需要注意以下方面。
需要合理的设置数据处理的间隔,即需要保证每一批数据的处理时间必须小于处理间隔,保证在处理下一批数据的时候,前一批已经处理完毕。显然这需要由您的 Spark 集群的计算能力还有 input 数据的量决定。
需要尽可能的提升读取 input 数据的能力。在 Spark Streaming 与外部系统如 Kafka,Flume 等集成时,为了避免接收数据环节成为系统的瓶颈,我们可以启动多个 ReceiverInputDStream 对象实例。
虽然本文案例中,我们只是把 (近) 实时计算结果打印出来,但是实际上很多时候这些结果会被保存到 数据库,HDFS, 或者发送回 Kafka, 以供其他系统利用这些数据做进一步的业务处理。
由于流计算对实时性要求很高,所以任何由于 JVM Full GC 引起的系统暂停都是不可接受的。除了在程序中合理使用内存,并且定期清理不需要的缓存数据外,CMS(Concurrent Mark and Sweep) GC 也是被 Spark 官方推荐的 GC 方式,它能有效的把由于 GC 引起的暂停维持在一个在很低的水平。我们可以在使用 spark-submit 命令时通过增加 --driver-java-options 选项来添加 CMS GC 相关的参数。
在 Spark 官方提供关于集成 Kafka 和 Spark Streaming 的指导文档中,提到了两种方式,第一种是 Receiver Based Approach,即通过在 Receiver 里实现 Kafka consumer 的功能来接收消息数据; 第二种是 Direct Approach, 即不通过 Receiver,而是周期性的主动查询 Kafka 消息分区中的最新 offset 值,进而去定义在每个 batch 中需要处理的消息的 offset 范围。本文采用的是第一种方式,因为目前第二种方式还处于试验阶段。
如果采用 Receiver Based Approach 集成 Kafka 和 Spark Streaming,就需要考虑到由于 Driver 或者 Worker 节点宕机而造成的数据丢失的情况,在默认配置下,是有可能造成数据丢失的,除非我们开启 Write Ahead Log(WAL) 功能。在这种情况下,从 Kafka 接收到的消息数据会同步的被写入到 WAL 并保存到可靠的分布式文件系统上,如 HDFS。可以通过在 Spark 配置文件中 (conf/spark-defaults.conf) 把 spark.streaming.receiver.writeAheadLog.enable 配置项设置成 true 开启这个功能。当然在开启 WAL 的情况下,会造成单个 Receiver 吞吐量下降,这时候,我们可能需要并行的运行多个 Receiver 来改善这种情况。
由于 updateStateByKey 操作需要开启 checkpoint 功能,但是频繁的 checkpoint 会造成程序处理时间增长,也会造成吞吐量下降。默认情况下,checkpoint 时间间隔会取 steaming 程序数据处理间隔或者 10 秒两者中较大的那个。官方推荐的间隔是 streaming 程序数据处理间隔的 5-10 倍。可以通过 dsteam.checkpoint(checkpointInterval) 来设置,参数需要用样本类 Duration 包装下,单位是毫秒。