Spark Streaming + Kafka整合

参考官网
http://spark.apache.org/docs/2.1.0/streaming-kafka-0-8-integration.html

  • 之前先确保以下操作:
    1、先启动ZK:./zkServer.sh start
    2、启动Kafka:./kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
    3、创建topic:
    ./kafka-topics.sh --create --zookeeper hadoop:2181 --replication-factor 1 --partitions 1 --topic kafka_streaming_topic
    ./kafka-topics.sh --list --zookeeper hadoop:2181
    4、通过控制台测试是否能正常生产与消费
    ./kafka-console-producer.sh --broker-list hadoop:9092 --topic kafka_streaming_topic
    ./kafka-console-consumer.sh --zookeeper hadoop:2181 --topic kafka_streaming_topic

Approach 1: Receiver-based Approach

  • Receiver方式的本地环境联调
    1、KafkaUtils.createStream Create an input stream that pulls messages from Kafka Brokers.
import org.apache.spark.streaming.kafka._

 val kafkaStream = KafkaUtils.createStream(streamingContext,
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

2、引入数组,含四个数->val Array(zkQuorum,group,topics,numThreads) = args

3、判断是否传入四个参数->构建topicMap:
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap

4、topicMap带入KafkaUtils参数
5、业务代码:
messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

6、到IDEA的edit configuration编辑以下内容:
hadoop:2181 test kafka_streaming_topic 1

注意:

test:group名
1:线程数
setMaster("local[2]")   一定要大于2

7、run下代码,在kafka 生产者窗口手动输入几个单词,在kafka consumer窗口即时看到单词的产生,在本地代码的console窗口看到单词计数

  • Receiver方式的生产环境联调
    1、在项目根目录下执行编译
    mvn clean package -DskipTests
    2、上传到服务器hadoop的lib目录下,执行:
spark-submit \
--class com.feiyue.bigdata.sparkstreaming.KafkaReceiverWordCount \
--master local[2] \
--name KafkaReceiverWordCount \
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 \
/home/hadoop/lib/spark-1.0-SNAPSHOT.jar hadoop:2181 test kafka_streaming_topic 1

3、运行后看4040端口Spark Streaming的UI界面

可以知道UI页面中,
Receiver是一直都在运作的,
而Direct方式没有此Jobs

Approach 2: Direct Approach (No Receivers)

Note that this feature was introduced in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API.

特点:
1、简化了并行度,不需要多个Input Stream,只需要一个DStream
2、加强了性能,真正做到了0数据丢失,而Receiver方式需要写到WAL才可以(即副本存储),Direct方式没有Receiver
3、只执行一次

缺点:
1、基于ZooKeeper的Kafka监控工具,无法展示出来,所以需要周期性地访问offset才能更新到ZooKeeper

  • 怎么做

基于Receiver方式的代码,将createStream改为createDirectStream,其余业务代码都不用改动。

    //kafkaParams: Map[String, String],
    //topics: Set[String]
    val Array(brokers, topics) = args


    //val sparkConf = new SparkConf().setAppName("KafkaDirectWordCount").setMaster("local[2]")
    val sparkConf = new SparkConf()

    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val topicsSet = topics.split(",").toSet

    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

    messages.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()

    ssc.start()
    ssc.awaitTermination()

  • Direct生产环境联调
    基于Receiver方式,参数只需要传brokers与topics,注意查看源码与泛型看返回类型并构造出来
spark-submit \
--class com.feiyue.bigdata.sparkstreaming.KafkaDirectWordCount \
--master local[2] \
--name KafkaDirectWordCount \
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 \
/home/hadoop/lib/spark-1.0-SNAPSHOT.jar hadoop:9092  kafka_streaming_topic

3、运行后看4040端口Spark Streaming的UI界面

可以知道UI页面中,Direct方式没有此Jobs
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,172评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,346评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,788评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,299评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,409评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,467评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,476评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,262评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,699评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,994评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,167评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,499评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,149评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,387评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,028评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,055评论 2 352

推荐阅读更多精彩内容