版本信息:
spark:2.2.0
kakfa:0.10.1.0
scala:2.11.8
scalikejdbc:3.3.2
Pom文件:
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.2.0</spark.version>
<scalikejdbc.version>3.3.2</scalikejdbc.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!--scalikejdbc 依赖 -->
<dependency>
<groupId>org.scalikejdbc</groupId>
<artifactId>scalikejdbc_2.11</artifactId>
<version>${scalikejdbc.version}</version>
</dependency>
<dependency>
<groupId>org.scalikejdbc</groupId>
<artifactId>scalikejdbc-config_2.11</artifactId>
<version>${scalikejdbc.version}</version>
</dependency>
<!--Spark 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--mysql 依赖 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.6</version>
</dependency>
</dependencies>
application.conf文件
db.default.driver="com.mysql.jdbc.Driver"
db.default.url="jdbc:mysql://hadoop000:3306/hadoop_train?characterEncoding=utf-8"
db.default.user="root"
db.default.password="root"
dataSourceClassName=com.mysql.jdbc.jdbc2.optional.MysqlDataSource
#Kafka信息
metadata.broker.list = "192.168.245.100:9092"
#从老数据开始消费
auto.offset.reset = "smallest"
group.id = "baidu_offset_group"
kafka.topics = "baidu"
serializer.class = "kafka.serializer.StringEncoder"
request.required.acks = "1"
ValueUtils
package com.soul.bigdata.spark.streaming01
import com.typesafe.config.ConfigFactory
import org.apache.commons.lang3.StringUtils
object ValueUtils {
val load = ConfigFactory.load()
def getStringValue(key: String, defaultValue: String = "") = {
val value = load.getString(key)
if (StringUtils.isNotEmpty(value)) {
value
} else {
defaultValue
}
}
}
MySQL Offset表
create table baidu_offset(
topic varchar(32),
groupid varchar(50),
partitions int,
fromoffset bigint,
untiloffset bigint,
primary key(topic,groupid,partitions)
);
代码:
package com.soul.bigdata.spark.streaming01
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scalikejdbc.{DB, SQL}
import scalikejdbc.config.DBs
object StreamingOffsetMySQL {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("StreamingOffsetMySQL")
val ssc = new StreamingContext(conf, Seconds(10))
//Topic
val topics = ValueUtils.getStringValue("kafka.topics").split(",").toSet
//kafka参数
//这里应用了自定义的ValueUtils工具类,来获取application.conf里的参数,方便后期修改
val kafkaParams = Map[String, String](
"metadata.broker.list" -> ValueUtils.getStringValue("metadata.broker.list"),
"auto.offset.reset" -> ValueUtils.getStringValue("auto.offset.reset"),
"group.id" -> ValueUtils.getStringValue("group.id")
)
//先使用scalikejdbc从MySQL数据库中读取offset信息
//+------------+------------------+------------+------------+-------------+
//| topic | groupid | partitions | fromoffset | untiloffset |
//+------------+------------------+------------+------------+-------------+
//MySQL表结构如上,将“topic”,“partitions”,“untiloffset”列读取出来
//组成 fromOffsets: Map[TopicAndPartition, Long],后面createDirectStream用到
DBs.setup()
val fromOffset = DB.readOnly(implicit session => {
SQL("select * from baidu_offset").map(rs => {
(TopicAndPartition(rs.string("topic"), rs.int("partitions")), rs.long("untiloffset"))
}).list().apply()
}).toMap
//如果MySQL表中没有offset信息,就从0开始消费;如果有,就从已经存在的offset开始消费
val messages = if (fromOffset.isEmpty) {
println("从头开始消费...")
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
} else {
println("从已存在记录开始消费...")
val messageHandler = (mm: MessageAndMetadata[String, String]) => (mm.key(), mm.message())
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffset, messageHandler)
}
messages.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
//输出rdd的数据量
println("数据统计记录为:" + rdd.count())
//官方案例给出的获得rdd offset信息的方法,offsetRanges是由一系列offsetRange组成的数组
// trait HasOffsetRanges {
// def offsetRanges: Array[OffsetRange]
// }
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetRanges.foreach(x => {
//输出每次消费的主题,分区,开始偏移量和结束偏移量
println(s"---${x.topic},${x.partition},${x.fromOffset},${x.untilOffset}---")
//将最新的偏移量信息保存到MySQL表中
DB.autoCommit(implicit session => {
SQL("replace into baidu_offset(topic,groupid,partitions,fromoffset,untiloffset) values (?,?,?,?,?)")
.bind(x.topic, ValueUtils.getStringValue("group.id"), x.partition, x.fromOffset, x.untilOffset)
.update().apply()
})
})
}
})
ssc.start()
ssc.awaitTermination()
}
}
运行
停掉程序,重新运行,开始offset是从411开始消费的就达到了我们的目的