Spark Streaming整合kafka

本来Streaming整合kafka是由两种方式的,第一种是Receiver DStream,第二种是Direct DStream

但是由于目前kafka版本升级到2.0以上了,并且我用的kafka版本是/usr/local/kafka_2.11-2.1.1,我们就不再介绍Receiver DStream这种方式了。

为什么要使用Direct DStream方式尼?他又三个大的优点,如下:

1、简化的并行性:使用DirectStream,Spark流将创建与要使用的Kafka分区一样多的RDD分区,这些分区都将并行地从Kafka读取数据。所以在Kafka和RDD分区之间有一对一的映射,这更容易理解和调整

简化的并行性

2、高效率:在第一种方法中实现零数据丢失需要将数据存储在提前写入日志中,从而进一步复制数据。这实际上是低效的,因为数据有效地被复制了两次—一次是由Kafka复制的,第二次是由提前写入日志复制的。第二种方法消除了问题,因为没有接收器,因此不需要提前写入日志。只要您有足够的卡夫卡保留,就可以从卡夫卡恢复消息。

高效性

3、恰好一次语义:第一种方法使用Kafka的高级API将消耗的偏移存储在ZooKeeper中。这是传统上使用卡夫卡数据的方法。虽然这种方法(与提前写入日志结合)可以确保零数据丢失(即至少一次语义),但在某些故障下,某些记录可能会被消耗两次的可能性很小。这是因为火花流可靠接收的数据与ZooKeeper跟踪的偏移量不一致。因此,在第二种方法中,我们使用不使用ZooKeeper的简单Kafka API。偏移量由检查点内的火花流跟踪。这消除了Spark流和ZooKeeper/Kafka之间的不一致性,因此尽管失败,Spark流有效地接收到每个记录一次。为了实现结果输出的一次性语义,将数据保存到外部数据存储的输出操作必须是等幂的,或者是保存结果和偏移量的原子事务

但是这种方式有一个缺点:这种方式不能在zk中更新偏移量,因此也不能通过zk的监控工具看到运行进度,因此需要你自己处理这个偏移量,

下面是我写的整合步骤样例:、

第一步、启动zk和kafka、创建生产者和消费者

启动zk 

./zkServer.sh start

启动kafka

./kafka-server-start.sh ${KAFKA_HOME}/config/server.properties

创建topic

./kafka-topics.sh --create --zookeeper 10.101.3.3:2181 --replication-factor 1 --partitions 1 --topic kafka_streaming

查看是否创建成功

./kafka-topics.sh --list --zookeeper 10.101.3.3:2181

创建topic的对应的生产者

./kafka-console-producer.sh --broker-list 10.101.3.3:9092 --topic kafka_streaming

创建topic对应的消费者

./kafka-console-consumer.sh --bootstrap-server 10.101.3.3:9092 --topic kafka_streaming --from-beginning

检验下是否能够正常通讯:

第二步、添加spark-streaming-kafka依赖jar包

第三步、使用jdk1.8和kafkka0.10_2.11和spark streaming2.11版本来编写整合程序。

首先我们先解释下用到的两个特殊类。

LocationStrategies

新的Kafka使用者API将预先获取消息到缓冲区。因此,出于性能原因,Spark集成将缓存的消费者保留在执行程序上(而不是为每个批处理重新创建它们),并且更喜欢在具有适当使用者的主机位置上安排分区,这一点很重要。

在大多数情况下,您应该使用LocationStrategies.PreferConsistent,如上所示。这将在可用执行程序之间均匀分配分区。如果您的执行程序与Kafka代理在同一主机上,请使用PreferBrokers,它更愿意为该分区安排Kafka领导者的分区。最后,如果分区之间的负载有明显偏差,请使用PreferFixed。这允许您指定分区到主机的显式映射(任何未指定的分区将使用一致的位置)。

消费者的缓存的默认最大大小为64.如果您希望处理超过(64 *个执行程序数)Kafka分区,则可以通过spark.streaming.kafka.consumer.cache.maxCapacity更改此设置。

如果要禁用Kafka使用者的缓存,可以将spark.streaming.kafka.consumer.cache.enabled设置为false。可能需要禁用缓存来解决SPARK-19185中描述的问题。一旦SPARK-19185解决,可以在Spark的更高版本中删除此属性。

缓存由topicpartition和group.id键入,因此每次调用createDirectStream时都要使用单独的group.id。 

ConsumerStrategies

新的Kafka消费者API有许多不同的方法来指定主题,其中一些需要相当大的后对象实例化设置。 ConsumerStrategies提供了一种抽象,即使从检查点重新启动后,Spark也可以获得正确配置的消费者。

ConsumerStrategies.Subscribe,如上所示,允许您订阅固定的主题集合。 SubscribePattern允许您使用正则表达式来指定感兴趣的主题。请注意,与0.8集成不同,使用Subscribe或SubscribePattern应响应在正在运行的流期间添加分区。最后,Assign允许您指定固定的分区集合。所有这三个策略都重载了构造函数,允许您指定特定分区的起始偏移量。

如果您具有上述选项无法满足的特定使用者设置需求,则ConsumerStrategy是您可以扩展的公共类。

程序:

package com.liushun;/*

@auth:liushun

@date:2019-06-20 16:03

Spark Streaming整合kafka,使用Driect DStream方式

*/

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.common.TopicPartition;

import org.apache.kafka.common.serialization.StringDeserializer;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.Optional;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.PairFunction;

import org.apache.spark.streaming.Durations;

import org.apache.spark.streaming.api.java.JavaDStream;

import org.apache.spark.streaming.api.java.JavaInputDStream;

import org.apache.spark.streaming.api.java.JavaPairDStream;

import org.apache.spark.streaming.api.java.JavaStreamingContext;

import org.apache.spark.streaming.kafka010.ConsumerStrategies;

import org.apache.spark.streaming.kafka010.KafkaUtils;

import org.apache.spark.streaming.kafka010.LocationStrategies;

import scala.Tuple2;

import java.util.*;

import java.util.regex.Pattern;

public class KaFKaUnionStreamingWordCnt {

private static final PatternSPACE = Pattern.compile(" ");

public static void main(String[] args) {

SparkConf sc=new SparkConf().setMaster("local[2]").setAppName("KaFKaUnionStreamingWordCnt");

JavaStreamingContext jssc=new JavaStreamingContext(sc, Durations.seconds(5));//5秒一次

        jssc.checkpoint(".");//设置上一个批次的值存在的目录,在生产环境中,放在hdfs某个文件下,相对安全些

// 首先要创建一份kafka参数map

        Map kafkaParams =new HashMap();

// 这里是不需要zookeeper节点,所以这里放broker.list

        String brokerslist="10.101.3.3:9092";

String topics ="kafka_streaming";

//Kafka服务监听端口

        kafkaParams.put("bootstrap.servers",brokerslist);

//指定kafka输出key的数据类型及编码格式(默认为字符串类型编码格式为uft-8)

        kafkaParams.put("key.deserializer", StringDeserializer.class);

//指定kafka输出value的数据类型及编码格式(默认为字符串类型编码格式为uft-8)

        kafkaParams.put("value.deserializer", StringDeserializer.class);

//消费者ID,随意指定

        kafkaParams.put("group.id","jis");

//指定从latest(最新,其他版本的是largest这里不行)还是smallest(最早)处开始读取数据

        kafkaParams.put("auto.offset.reset","latest");

//如果true,consumer定期地往zookeeper写入每个分区的offset

        kafkaParams.put("enable.auto.commit",false);

//Topic分区

        Map offsets =new HashMap<>();

offsets.put(new TopicPartition(topics,0),0L);

Collection topicsSet =new HashSet<>(Arrays.asList(topics.split(",")));

//通过KafkaUtils.createDirectStream(...)获得kafka数据,kafka相关参数由kafkaParams指定

        try {

JavaInputDStream> kafkaStream = KafkaUtils.createDirectStream(

jssc,

LocationStrategies.PreferConsistent(),

ConsumerStrategies.Subscribe(topicsSet, kafkaParams, offsets)

//也可以用不指定offset的构造函数替换上部的订阅

//ConsumerStrategies.Subscribe(topicsSet, kafkaParams)

);

//使用map先对InputDStream进行取值操作

            JavaDStream lines=kafkaStream.map(new Function, String>() {

@Override

                public String call(ConsumerRecord consumerRecord)throws Exception {

return consumerRecord.value();

}

});

//再使用flatMap进行行记录的空格操作

            JavaDStream words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());

JavaPairDStream pairs  = words.mapToPair(s ->new Tuple2<>(s,1));

//状态保留

            JavaPairDStream runningCounts = pairs.updateStateByKey(

new Function2, Optional, Optional>() {

@Override

                        public Optional call(List values, Optional state)throws Exception {

Integer updateValue =0;

if (state.isPresent()) {//是否为空

// 如果有值就获取

                                updateValue = state.get();

}

// 累加

                            for (Integer value : values) {

updateValue += value;

}

return Optional.of(updateValue);

}

}

);

runningCounts.print();

jssc.start();

jssc.awaitTermination();

}catch (InterruptedException e) {

e.printStackTrace();

}

}

}

打包提交到spark集群中

./spark-submit --class com.liushun.KaFKaUnionStreamingWordCnt --master yarn /usr/local/spark-2.1.1-bin-hadoop2.6/bin/SparkStreamTest-1.1-SNAPSHOT.jar

验证正确性:

到此,我们就结束了对spark与kafak0.10版本以上的整合实验。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容