天地四方皆江湖,世人聪明反糊涂。
名利场上风浪起,赢到头来却是输。
----侠客行
先上maven依赖
<properties>
<fastjson.version>1.2.68</fastjson.version>
<scala.version>2.11.12</scala.version>
<spark.version>2.4.0</spark.version>
<hadoop.version>3.0.0</hadoop.version>
<zookeeper.version>3.4.5</zookeeper.version>
<kafka.version>2.1.0</kafka.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.6.6</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.scala-lang</groupId>-->
<!-- <artifactId>scala-library</artifactId>-->
<!-- <version>${scala.version}</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
先来个sparkStreaming的启动程序
import java.util.Properties
import com.tiens.bigdata.order.insert.tradeParentOrders.analyzed.TradeParentOrderesAna
import com.tiens.bigdata.utils.{KafkaOffset, KafkaSink, PropertiesUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.{Assign, Subscribe}
import org.apache.spark.streaming.kafka010.{ KafkaUtils}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
val bootstrapServer = PropertiesUtils.getProperties("kafka.bootstrap.server")
//初始化kafka参数
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> bootstrapServer,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "test_insert_trade_parent_orders",
// "auto.offset.reset" -> "latest"
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean))
//初始化sparkcontext
val sparkConf = new SparkConf()
.set("spark.streaming.stopGracefullyOnShutdown","true")
.set("spark.streaming.backpressure.enabled","true")
.set("spark.streaming.backpressure.initialRate","100")
.set("spark.streaming.kafka.maxRatePerPartition","100")
.setMaster(PropertiesUtils.getProperties("model"))
.setAppName("isnertTradeParentOrders")
val sparkContext = new SparkContext(sparkConf)
//设置日志级别
sparkContext.setLogLevel("WARN")
//定义要消费的topic
val topics: Array[String] = Array("olap_ods_ordercenter_trade_parent_orders_v3")
val zkPath = PropertiesUtils.getProperties("kafkaOffsetUrl")+topics(0)
val zkHosts = PropertiesUtils.getProperties("zkCi")
//创建zk的客户端实例
val zkClient = new ZkClient(zkHosts, 30000, 30000)
//创建这个方法,所有业务逻辑在此方法中进行
val sss = setupEsc(sparkContext, kafkaParams, topics, bootstrapServer,zkClient,zkPath,zkHosts)
sss.start()
sss.awaitTermination()
}
def setupEsc(sparkContext: SparkContext, kafkaParams: Map[String, Object],topics: Array[String],
bootstrapServer:String,zkClient: ZkClient,zkPath: String,zkHoust:String ) ={
//初始化streaming对象
val streamingContext = new StreamingContext(sparkContext,Seconds(1))
//初始化Kafka生产者对象
//#################kafka 生产者创建
// 广播KafkaSink
val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
val kafkaProducerConfig = {
val p = new Properties()
p.setProperty("bootstrap.servers", bootstrapServer)
p.setProperty("key.serializer", classOf[StringSerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
}
streamingContext.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
}
//创建数据流
val stream: InputDStream[ConsumerRecord[String, String]] = getOrCreateDirectStream(streamingContext, kafkaParams, topics, zkClient, zkPath, zkHoust)
stream.foreachRDD(rdd=>{
//这里可以处理你的业务逻辑了
})
//把偏移量保存到自己的数据库中
stream.foreachRDD { rdd =>
KafkaOffset.saveOffsets(zkClient,zkHoust,zkPath,rdd)
}
streamingContext
}
def getOrCreateDirectStream(streamingContext:StreamingContext, kafkaParams: Map[String, Object],topics: Array[String],
zkClient: ZkClient,zkPath: String,zkHoust:String):InputDStream[ConsumerRecord[String, String]] = {
//读取保存在zk中的偏移量
val offsets: Option[Map[TopicPartition, Long]] = KafkaOffset.readOffsets(zkClient, zkHoust, zkPath, topics(0))
//拿到zk中的偏移量需要考虑两种情况
//1 如果zk中没有拿到数据,则从头消费
//2如果zk中有数据,则通过偏移量进行消费
// begin from the the offsets committed to the database
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = offsets match {
case None => KafkaUtils.createDirectStream[String, String](streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
case Some(fromOffset) =>
KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Assign[String, String](fromOffset.keys.toList, kafkaParams, fromOffset))
}
kafkaDStream
}
再来个zookeeper的工具类
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException}
import org.apache.kafka.common.TopicPartition
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.kafka010.HasOffsetRanges
import org.apache.zookeeper.data.Stat
object KafkaOffset {
val logger = Logger.getLogger("KafkaOffset")
/**
* 从zookeeper指定的path路径中去读数据
* @param client zookeeperClient
* @param path 路径
* @return option 对象
*/
def readDataMaybeNull(client: ZkClient, path: String): (Option[String], Stat) = {
val stat: Stat = new Stat()
val dataAndStat = try {
(Some(client.readData(path, stat)), stat)
} catch {
case e: ZkNoNodeException =>
(None, stat)
case e2: Throwable => throw e2
}
dataAndStat
}
/**
* 将数据写入zookeeper中指定的路径中,如果该路径存在父目录且父目录不存在则先创建父目录
* @param client zookeeperClient
* @param path 路径
* @param data 数据
*/
def updatePersistentPath(client: ZkClient, path: String, data: String) = {
try {
client.writeData(path, data)
} catch {
case e: ZkNoNodeException => {
createParentPath(client, path)
try {
client.createPersistent(path, data)
} catch {
case e: ZkNodeExistsException =>
client.writeData(path, data)
case e2: Throwable => throw e2
}
}
case e2: Throwable => throw e2
}
}
/**
* 创建父目录
* @param client zookeeperClient
* @param path 路径
*/
private def createParentPath(client: ZkClient, path: String): Unit = {
val parentDir = path.substring(0, path.lastIndexOf('/'))
if (parentDir.length != 0)
client.createPersistent(parentDir, true)
}
/*
Read the previously saved offsets from Zookeeper
*/
/**
* 取kafka中的偏移量
* @param zkClient zookeeperClient
* @param zkHosts zookeeperhosts节点
* @param zkPath zookeeper路径
* @param topic kafka中的topic
* @return Option对象
*/
def readOffsets(zkClient: ZkClient,
zkHosts: String,
zkPath: String,
topic: String): Option[Map[TopicPartition, Long]] = {
logger.info("Reading offsets from Zookeeper")
val stopwatch = new Stopwatch()
val (offsetsRangesStrOpt, _) = readDataMaybeNull(zkClient, zkPath)
offsetsRangesStrOpt match {
case Some(offsetsRangesStr) =>
logger.info(s"Read offset ranges: ${offsetsRangesStr}")
val offsets = offsetsRangesStr.split(",")
.map(s => s.split(":"))
.map { case Array(partitionStr, offsetStr) => (new TopicPartition(topic, partitionStr.toInt) -> offsetStr.toLong) }
.toMap
logger.info("Done reading offsets from Zookeeper. Took " + stopwatch)
Some(offsets)
case None =>
logger.info("No offsets found in Zookeeper. Took " + stopwatch)
None
}
}
/**
* 将offest偏移量保存到zookeeper中,并打上日志
* @param zkClient zookeeperClient
* @param zkHosts zookeeperhosts节点
* @param zkPath zookeeper路径
* @param rdd 偏移量的rdd
*/
def saveOffsets(zkClient: ZkClient,
zkHosts: String,
zkPath: String,
rdd: RDD[_]): Unit = {
logger.info("Saving offsets to Zookeeper")
val stopwatch = new Stopwatch()
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetRanges.foreach(offsetRange=>{
val offsetsRangesStr = offsetRanges.map(offsetRange=>offsetRange.partition+":"+offsetRange.fromOffset)
.mkString(",")
logger.info("Writing offsets to Zookeeper zkClient=" + zkClient + " zkHosts=" + zkHosts + "zkPath=" + zkPath + " offsetsRangesStr:" + offsetsRangesStr)
updatePersistentPath(zkClient, zkPath, offsetsRangesStr)
logger.info("Done updating offsets in Zookeeper. Took " + stopwatch)
})
}
/**
* 过程时间
*/
class Stopwatch {
private val start = System.currentTimeMillis()
override def toString() = (System.currentTimeMillis() - start) + " ms"
}
没有别的意思,就是看看这个方案帮助了多少朋友
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。
。。。。。。。。。。。点赞 。。。。。。。。。。。。。。。。