1、SparkConsumer
import java.text.SimpleDateFormat
import java.util.Calendar
import com.alibaba.fastjson.{JSON, TypeReference}
import kafka.serializer.StringDecoder
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import utils.{PropertyUtil, RedisUtil}
object SparkConsumer {
def main(args: Array[String]): Unit = {
//初始化Spark
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("TrafficStreaming")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint("./ssc/checkpoint")
//配置kafka参数
val kafkaParams = Map("metadata.broker.list" -> PropertyUtil.getProperty("metadata.broker.list"))
//配置消费主题
val topics = Set(PropertyUtil.getProperty("kafka.topics"))
//读取kafka中的value数据
val kafkaLineDStream = KafkaUtils.createDirectStream[
String,
String,
StringDecoder,
StringDecoder](ssc, kafkaParams, topics)
.map(_._2)
ssc.start
ssc.awaitTermination
}
}
2、kafka.properties
bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
acks=all
retires=0
metadata.broker.list=hadoop102:9092,hadoop103:9092,hadoop104:9092
group.id=g_graffic1
enable.auto.commit=true
auto.commit.interval.ms=30000
kafka.topics=traffc
zookeeper.sync.time.ms=250
num.io.threads=12
batch.size=65536
buffer.memory=524288
log.retention.hours=5
3、PropertyUtil
import java.util.Properties
object PropertyUtil {
val properties = new Properties()
try {
val inputStream = ClassLoader.getSystemResourceAsStream("kafka.properties")
properties.load(inputStream)
} catch {
case e: Exception => e.getStackTrace
} finally {}
def getProperty(key: String): String = properties.getProperty(key)
}
4、pom.xml
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.3</version>
</dependency>
</dependencies>