基于Receiver的方式
这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。
然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。
如何进行Kafka数据源连接
在maven添加依赖
groupId = org.apache.spark
artifactId = spark-streaming-kafka_2.10
version = 1.5.1使用第三方工具类创建输入DStream
JavaPairReceiverInputDStream<String, String> kafkaStream =
KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]);
需要注意的要点
- Kafka中的topic的partition,与Spark中的RDD的partition是没有关系的。所以,在KafkaUtils.createStream()中,提高partition的数量,只会增加一个Receiver中,读取partition的线程的数量。不会增加Spark处理数据的并行度。
- 可以创建多个Kafka输入DStream,使用不同的consumer group和topic,来通过多个receiver并行接收数据。
- 如果基于容错的文件系统,比如HDFS,启用了预写日志机制,接收到的数据都会被复制一份到预写日志中。因此,在KafkaUtils.createStream()中,设置的持久化级别是StorageLevel.MEMORY_AND_DISK_SER。
Kafka命令
bin/kafka-topics.sh --zookeeper hadoop-100:2181,hadoop-101:2181,hadoop-102:2181 --topic WordCount --replication-factor 1 --partitions 1 --create
bin/kafka-console-producer.sh --broker-list hadoop-100:9092,hadoop-101:9092,hadoop-102:9092 --topic WordCount
案例
Java版本
public class KafkaReceiverWordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("KafkaReceiverWordCountJava").setMaster("local[2]");
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(conf, Durations.seconds(10));
// 使用KafkaUtils.createStream()方法,创建针对Kafka的输入数据流
Map<String, Integer> topicThreadMap = new HashMap<String, Integer>();
// 使用多少个线程去拉取topic的数据
topicThreadMap.put("WordCount", 1);
JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(javaStreamingContext,
"hadoop-100:2181,hadoop-101:2181,hadoop-102:2181",
"DefaultConsumerGroup",
topicThreadMap);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {
@Override
public Iterable<String> call(Tuple2<String, String> stringStringTuple2) throws Exception {
return Arrays.asList(stringStringTuple2._2.split(" "));
}
});
JavaPairDStream<String, Integer> wordsNum = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s, 1);
}
});
JavaPairDStream<String, Integer> result = wordsNum.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
result.print();
javaStreamingContext.start();
javaStreamingContext.awaitTermination();
javaStreamingContext.close();
}
}
Scala版本
object KafkaReceiverWordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("KafkaReceiverWordCountScala").setMaster("local[2]")
val streamingContext = new StreamingContext(conf, Seconds(10))
val topicThreadMap: util.Map[String, Integer] = new util.HashMap[String, Integer]
// 使用多少个线程去拉取topic的数据
topicThreadMap.put("WordCount", 1)
val zkQuorum = "hadoop-100:2181,hadoop-101:2181,hadoop-102:2181"
val group = "DefaultConsumerGroup"
val topics = "WordCount"
val numThreads = 1
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap //(topics,2)
val lines = KafkaUtils.createStream(streamingContext, zkQuorum, group, topicpMap).map(_._2)
val words = lines.flatMap(line => line.split(" "))
val wordsNumber = words.map(word => (word, 1))
val result = wordsNumber.reduceByKey(_ + _)
result.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}