本来Streaming整合kafka是由两种方式的,第一种是Receiver DStream,第二种是Direct DStream
但是由于目前kafka版本升级到2.0以上了,并且我用的kafka版本是/usr/local/kafka_2.11-2.1.1,我们就不再介绍Receiver DStream这种方式了。
为什么要使用Direct DStream方式尼?他又三个大的优点,如下:
1、简化的并行性:使用DirectStream,Spark流将创建与要使用的Kafka分区一样多的RDD分区,这些分区都将并行地从Kafka读取数据。所以在Kafka和RDD分区之间有一对一的映射,这更容易理解和调整
2、高效率:在第一种方法中实现零数据丢失需要将数据存储在提前写入日志中,从而进一步复制数据。这实际上是低效的,因为数据有效地被复制了两次—一次是由Kafka复制的,第二次是由提前写入日志复制的。第二种方法消除了问题,因为没有接收器,因此不需要提前写入日志。只要您有足够的卡夫卡保留,就可以从卡夫卡恢复消息。
3、恰好一次语义:第一种方法使用Kafka的高级API将消耗的偏移存储在ZooKeeper中。这是传统上使用卡夫卡数据的方法。虽然这种方法(与提前写入日志结合)可以确保零数据丢失(即至少一次语义),但在某些故障下,某些记录可能会被消耗两次的可能性很小。这是因为火花流可靠接收的数据与ZooKeeper跟踪的偏移量不一致。因此,在第二种方法中,我们使用不使用ZooKeeper的简单Kafka API。偏移量由检查点内的火花流跟踪。这消除了Spark流和ZooKeeper/Kafka之间的不一致性,因此尽管失败,Spark流有效地接收到每个记录一次。为了实现结果输出的一次性语义,将数据保存到外部数据存储的输出操作必须是等幂的,或者是保存结果和偏移量的原子事务
但是这种方式有一个缺点:这种方式不能在zk中更新偏移量,因此也不能通过zk的监控工具看到运行进度,因此需要你自己处理这个偏移量,
下面是我写的整合步骤样例:、
第一步、启动zk和kafka、创建生产者和消费者
启动zk
./zkServer.sh start
启动kafka
./kafka-server-start.sh ${KAFKA_HOME}/config/server.properties
创建topic
./kafka-topics.sh --create --zookeeper 10.101.3.3:2181 --replication-factor 1 --partitions 1 --topic kafka_streaming
查看是否创建成功
./kafka-topics.sh --list --zookeeper 10.101.3.3:2181
创建topic的对应的生产者
./kafka-console-producer.sh --broker-list 10.101.3.3:9092 --topic kafka_streaming
创建topic对应的消费者
./kafka-console-consumer.sh --bootstrap-server 10.101.3.3:9092 --topic kafka_streaming --from-beginning
检验下是否能够正常通讯:
第二步、添加spark-streaming-kafka依赖jar包
第三步、使用jdk1.8和kafkka0.10_2.11和spark streaming2.11版本来编写整合程序。
首先我们先解释下用到的两个特殊类。
LocationStrategies
新的Kafka使用者API将预先获取消息到缓冲区。因此,出于性能原因,Spark集成将缓存的消费者保留在执行程序上(而不是为每个批处理重新创建它们),并且更喜欢在具有适当使用者的主机位置上安排分区,这一点很重要。
在大多数情况下,您应该使用LocationStrategies.PreferConsistent,如上所示。这将在可用执行程序之间均匀分配分区。如果您的执行程序与Kafka代理在同一主机上,请使用PreferBrokers,它更愿意为该分区安排Kafka领导者的分区。最后,如果分区之间的负载有明显偏差,请使用PreferFixed。这允许您指定分区到主机的显式映射(任何未指定的分区将使用一致的位置)。
消费者的缓存的默认最大大小为64.如果您希望处理超过(64 *个执行程序数)Kafka分区,则可以通过spark.streaming.kafka.consumer.cache.maxCapacity更改此设置。
如果要禁用Kafka使用者的缓存,可以将spark.streaming.kafka.consumer.cache.enabled设置为false。可能需要禁用缓存来解决SPARK-19185中描述的问题。一旦SPARK-19185解决,可以在Spark的更高版本中删除此属性。
缓存由topicpartition和group.id键入,因此每次调用createDirectStream时都要使用单独的group.id。
ConsumerStrategies
新的Kafka消费者API有许多不同的方法来指定主题,其中一些需要相当大的后对象实例化设置。 ConsumerStrategies提供了一种抽象,即使从检查点重新启动后,Spark也可以获得正确配置的消费者。
ConsumerStrategies.Subscribe,如上所示,允许您订阅固定的主题集合。 SubscribePattern允许您使用正则表达式来指定感兴趣的主题。请注意,与0.8集成不同,使用Subscribe或SubscribePattern应响应在正在运行的流期间添加分区。最后,Assign允许您指定固定的分区集合。所有这三个策略都重载了构造函数,允许您指定特定分区的起始偏移量。
如果您具有上述选项无法满足的特定使用者设置需求,则ConsumerStrategy是您可以扩展的公共类。
程序:
package com.liushun;/*
@auth:liushun
@date:2019-06-20 16:03
Spark Streaming整合kafka,使用Driect DStream方式
*/
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;
import java.util.*;
import java.util.regex.Pattern;
public class KaFKaUnionStreamingWordCnt {
private static final PatternSPACE = Pattern.compile(" ");
public static void main(String[] args) {
SparkConf sc=new SparkConf().setMaster("local[2]").setAppName("KaFKaUnionStreamingWordCnt");
JavaStreamingContext jssc=new JavaStreamingContext(sc, Durations.seconds(5));//5秒一次
jssc.checkpoint(".");//设置上一个批次的值存在的目录,在生产环境中,放在hdfs某个文件下,相对安全些
// 首先要创建一份kafka参数map
Map kafkaParams =new HashMap();
// 这里是不需要zookeeper节点,所以这里放broker.list
String brokerslist="10.101.3.3:9092";
String topics ="kafka_streaming";
//Kafka服务监听端口
kafkaParams.put("bootstrap.servers",brokerslist);
//指定kafka输出key的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
kafkaParams.put("key.deserializer", StringDeserializer.class);
//指定kafka输出value的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
kafkaParams.put("value.deserializer", StringDeserializer.class);
//消费者ID,随意指定
kafkaParams.put("group.id","jis");
//指定从latest(最新,其他版本的是largest这里不行)还是smallest(最早)处开始读取数据
kafkaParams.put("auto.offset.reset","latest");
//如果true,consumer定期地往zookeeper写入每个分区的offset
kafkaParams.put("enable.auto.commit",false);
//Topic分区
Map offsets =new HashMap<>();
offsets.put(new TopicPartition(topics,0),0L);
Collection topicsSet =new HashSet<>(Arrays.asList(topics.split(",")));
//通过KafkaUtils.createDirectStream(...)获得kafka数据,kafka相关参数由kafkaParams指定
try {
JavaInputDStream> kafkaStream = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams, offsets)
//也可以用不指定offset的构造函数替换上部的订阅
//ConsumerStrategies.Subscribe(topicsSet, kafkaParams)
);
//使用map先对InputDStream进行取值操作
JavaDStream lines=kafkaStream.map(new Function, String>() {
@Override
public String call(ConsumerRecord consumerRecord)throws Exception {
return consumerRecord.value();
}
});
//再使用flatMap进行行记录的空格操作
JavaDStream words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
JavaPairDStream pairs = words.mapToPair(s ->new Tuple2<>(s,1));
//状态保留
JavaPairDStream runningCounts = pairs.updateStateByKey(
new Function2, Optional, Optional>() {
@Override
public Optional call(List values, Optional state)throws Exception {
Integer updateValue =0;
if (state.isPresent()) {//是否为空
// 如果有值就获取
updateValue = state.get();
}
// 累加
for (Integer value : values) {
updateValue += value;
}
return Optional.of(updateValue);
}
}
);
runningCounts.print();
jssc.start();
jssc.awaitTermination();
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}
打包提交到spark集群中
./spark-submit --class com.liushun.KaFKaUnionStreamingWordCnt --master yarn /usr/local/spark-2.1.1-bin-hadoop2.6/bin/SparkStreamTest-1.1-SNAPSHOT.jar
验证正确性: