Spark Streaming整合kafka

本来Streaming整合kafka是由两种方式的,第一种是Receiver DStream,第二种是Direct DStream

但是由于目前kafka版本升级到2.0以上了,并且我用的kafka版本是/usr/local/kafka_2.11-2.1.1,我们就不再介绍Receiver DStream这种方式了。

为什么要使用Direct DStream方式尼?他又三个大的优点,如下:

1、简化的并行性:使用DirectStream,Spark流将创建与要使用的Kafka分区一样多的RDD分区,这些分区都将并行地从Kafka读取数据。所以在Kafka和RDD分区之间有一对一的映射,这更容易理解和调整

简化的并行性

2、高效率:在第一种方法中实现零数据丢失需要将数据存储在提前写入日志中,从而进一步复制数据。这实际上是低效的,因为数据有效地被复制了两次—一次是由Kafka复制的,第二次是由提前写入日志复制的。第二种方法消除了问题,因为没有接收器,因此不需要提前写入日志。只要您有足够的卡夫卡保留,就可以从卡夫卡恢复消息。

高效性

3、恰好一次语义:第一种方法使用Kafka的高级API将消耗的偏移存储在ZooKeeper中。这是传统上使用卡夫卡数据的方法。虽然这种方法(与提前写入日志结合)可以确保零数据丢失(即至少一次语义),但在某些故障下,某些记录可能会被消耗两次的可能性很小。这是因为火花流可靠接收的数据与ZooKeeper跟踪的偏移量不一致。因此,在第二种方法中,我们使用不使用ZooKeeper的简单Kafka API。偏移量由检查点内的火花流跟踪。这消除了Spark流和ZooKeeper/Kafka之间的不一致性,因此尽管失败,Spark流有效地接收到每个记录一次。为了实现结果输出的一次性语义,将数据保存到外部数据存储的输出操作必须是等幂的,或者是保存结果和偏移量的原子事务

但是这种方式有一个缺点:这种方式不能在zk中更新偏移量,因此也不能通过zk的监控工具看到运行进度,因此需要你自己处理这个偏移量,

下面是我写的整合步骤样例:、

第一步、启动zk和kafka、创建生产者和消费者

启动zk 

./zkServer.sh start

启动kafka

./kafka-server-start.sh ${KAFKA_HOME}/config/server.properties

创建topic

./kafka-topics.sh --create --zookeeper 10.101.3.3:2181 --replication-factor 1 --partitions 1 --topic kafka_streaming

查看是否创建成功

./kafka-topics.sh --list --zookeeper 10.101.3.3:2181

创建topic的对应的生产者

./kafka-console-producer.sh --broker-list 10.101.3.3:9092 --topic kafka_streaming

创建topic对应的消费者

./kafka-console-consumer.sh --bootstrap-server 10.101.3.3:9092 --topic kafka_streaming --from-beginning

检验下是否能够正常通讯:

第二步、添加spark-streaming-kafka依赖jar包

第三步、使用jdk1.8和kafkka0.10_2.11和spark streaming2.11版本来编写整合程序。

首先我们先解释下用到的两个特殊类。

LocationStrategies

新的Kafka使用者API将预先获取消息到缓冲区。因此,出于性能原因,Spark集成将缓存的消费者保留在执行程序上(而不是为每个批处理重新创建它们),并且更喜欢在具有适当使用者的主机位置上安排分区,这一点很重要。

在大多数情况下,您应该使用LocationStrategies.PreferConsistent,如上所示。这将在可用执行程序之间均匀分配分区。如果您的执行程序与Kafka代理在同一主机上,请使用PreferBrokers,它更愿意为该分区安排Kafka领导者的分区。最后,如果分区之间的负载有明显偏差,请使用PreferFixed。这允许您指定分区到主机的显式映射(任何未指定的分区将使用一致的位置)。

消费者的缓存的默认最大大小为64.如果您希望处理超过(64 *个执行程序数)Kafka分区,则可以通过spark.streaming.kafka.consumer.cache.maxCapacity更改此设置。

如果要禁用Kafka使用者的缓存,可以将spark.streaming.kafka.consumer.cache.enabled设置为false。可能需要禁用缓存来解决SPARK-19185中描述的问题。一旦SPARK-19185解决,可以在Spark的更高版本中删除此属性。

缓存由topicpartition和group.id键入,因此每次调用createDirectStream时都要使用单独的group.id。 

ConsumerStrategies

新的Kafka消费者API有许多不同的方法来指定主题,其中一些需要相当大的后对象实例化设置。 ConsumerStrategies提供了一种抽象,即使从检查点重新启动后,Spark也可以获得正确配置的消费者。

ConsumerStrategies.Subscribe,如上所示,允许您订阅固定的主题集合。 SubscribePattern允许您使用正则表达式来指定感兴趣的主题。请注意,与0.8集成不同,使用Subscribe或SubscribePattern应响应在正在运行的流期间添加分区。最后,Assign允许您指定固定的分区集合。所有这三个策略都重载了构造函数,允许您指定特定分区的起始偏移量。

如果您具有上述选项无法满足的特定使用者设置需求,则ConsumerStrategy是您可以扩展的公共类。

程序:

package com.liushun;/*

@auth:liushun

@date:2019-06-20 16:03

Spark Streaming整合kafka,使用Driect DStream方式

*/

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.common.TopicPartition;

import org.apache.kafka.common.serialization.StringDeserializer;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.Optional;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.PairFunction;

import org.apache.spark.streaming.Durations;

import org.apache.spark.streaming.api.java.JavaDStream;

import org.apache.spark.streaming.api.java.JavaInputDStream;

import org.apache.spark.streaming.api.java.JavaPairDStream;

import org.apache.spark.streaming.api.java.JavaStreamingContext;

import org.apache.spark.streaming.kafka010.ConsumerStrategies;

import org.apache.spark.streaming.kafka010.KafkaUtils;

import org.apache.spark.streaming.kafka010.LocationStrategies;

import scala.Tuple2;

import java.util.*;

import java.util.regex.Pattern;

public class KaFKaUnionStreamingWordCnt {

private static final PatternSPACE = Pattern.compile(" ");

public static void main(String[] args) {

SparkConf sc=new SparkConf().setMaster("local[2]").setAppName("KaFKaUnionStreamingWordCnt");

JavaStreamingContext jssc=new JavaStreamingContext(sc, Durations.seconds(5));//5秒一次

        jssc.checkpoint(".");//设置上一个批次的值存在的目录,在生产环境中,放在hdfs某个文件下,相对安全些

// 首先要创建一份kafka参数map

        Map kafkaParams =new HashMap();

// 这里是不需要zookeeper节点,所以这里放broker.list

        String brokerslist="10.101.3.3:9092";

String topics ="kafka_streaming";

//Kafka服务监听端口

        kafkaParams.put("bootstrap.servers",brokerslist);

//指定kafka输出key的数据类型及编码格式(默认为字符串类型编码格式为uft-8)

        kafkaParams.put("key.deserializer", StringDeserializer.class);

//指定kafka输出value的数据类型及编码格式(默认为字符串类型编码格式为uft-8)

        kafkaParams.put("value.deserializer", StringDeserializer.class);

//消费者ID,随意指定

        kafkaParams.put("group.id","jis");

//指定从latest(最新,其他版本的是largest这里不行)还是smallest(最早)处开始读取数据

        kafkaParams.put("auto.offset.reset","latest");

//如果true,consumer定期地往zookeeper写入每个分区的offset

        kafkaParams.put("enable.auto.commit",false);

//Topic分区

        Map offsets =new HashMap<>();

offsets.put(new TopicPartition(topics,0),0L);

Collection topicsSet =new HashSet<>(Arrays.asList(topics.split(",")));

//通过KafkaUtils.createDirectStream(...)获得kafka数据,kafka相关参数由kafkaParams指定

        try {

JavaInputDStream> kafkaStream = KafkaUtils.createDirectStream(

jssc,

LocationStrategies.PreferConsistent(),

ConsumerStrategies.Subscribe(topicsSet, kafkaParams, offsets)

//也可以用不指定offset的构造函数替换上部的订阅

//ConsumerStrategies.Subscribe(topicsSet, kafkaParams)

);

//使用map先对InputDStream进行取值操作

            JavaDStream lines=kafkaStream.map(new Function, String>() {

@Override

                public String call(ConsumerRecord consumerRecord)throws Exception {

return consumerRecord.value();

}

});

//再使用flatMap进行行记录的空格操作

            JavaDStream words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());

JavaPairDStream pairs  = words.mapToPair(s ->new Tuple2<>(s,1));

//状态保留

            JavaPairDStream runningCounts = pairs.updateStateByKey(

new Function2, Optional, Optional>() {

@Override

                        public Optional call(List values, Optional state)throws Exception {

Integer updateValue =0;

if (state.isPresent()) {//是否为空

// 如果有值就获取

                                updateValue = state.get();

}

// 累加

                            for (Integer value : values) {

updateValue += value;

}

return Optional.of(updateValue);

}

}

);

runningCounts.print();

jssc.start();

jssc.awaitTermination();

}catch (InterruptedException e) {

e.printStackTrace();

}

}

}

打包提交到spark集群中

./spark-submit --class com.liushun.KaFKaUnionStreamingWordCnt --master yarn /usr/local/spark-2.1.1-bin-hadoop2.6/bin/SparkStreamTest-1.1-SNAPSHOT.jar

验证正确性:

到此,我们就结束了对spark与kafak0.10版本以上的整合实验。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,390评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,821评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,632评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,170评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,033评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,098评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,511评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,204评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,479评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,572评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,341评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,893评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,171评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,486评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,676评论 2 335

推荐阅读更多精彩内容