项目架构
概述:
本项目是模拟通过Log4j日志生成,配置输出数据源,将日志输出到flume日志收集系统,然后在flume中配置输出流到kafka,
最后启动SparkStreamin项目,配置Kafka为数据输入源,进行实时单词频次统计,使用redis保存offset偏移量,防止出现数据丢失
情框
Java端配置log4j日志输出
log4j日志生成端:
package kafkaDemo;
import org.apache.log4j.*;
/**
* 该类是用来模拟生成日志记录
*/
public class logDemo {
// 获取Loggger构建器
private static Logger logger = Logger.getLogger(logDemo.class.getName());
public static void main(String[] args) {
for (int i = 0; i <= 9; i++) {
logger.info("hello " + "spark" + " demo" + " shanghai " + i);
}
}
}
log4j.properties
log4j.rootLogger =INFO,stdout,flume
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout =org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 10.10.2.155
log4j.appender.flume.Port = 44444
log4j.appender.flume.UnsafeMode = true
所需jars:可以使用Maven
flume-avro-source-1.6.0.jar
flume-dataset-sink-1.6.0.jar
flume-file-channel-1.6.0.jar
flume-hdfs-sink-1.6.0.jar
flume-hive-sink-1.6.0.jar
flume-irc-sink-1.6.0.jar
flume-jdbc-channel-1.6.0.jar
flume-jms-source-1.6.0.jar
flume-kafka-channel-1.6.0.jar
flume-kafka-source-1.6.0.jar
flume-ng-auth-1.6.0.jar
flume-ng-configuration-1.6.0.jar
flume-ng-core-1.6.0.jar
flume-ng-elasticsearch-sink-1.6.0.jar
flume-ng-embedded-agent-1.6.0.jar
flume-ng-hbase-sink-1.6.0.jar
flume-ng-kafka-sink-1.6.0.jar
flume-ng-log4jappender-1.6.0.jar
flume-ng-morphline-solr-sink-1.6.0.jar
flume-ng-node-1.6.0.jar
flume-ng-sdk-1.6.0.jar
flume-scribe-source-1.6.0.jar
flume-spillable-memory-channel-1.6.0.jar
flume-thrift-source-1.6.0.jar
flume-tools-1.6.0.jar
flume-twitter-source-1.6.0.jar
python: 需要将spark-streaming-kafka jar包引入
概述:
jar包啥的就不说了,网上都可以百度到,直接上代码。
code:
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark.sql import SparkSession
from redisDemo.redisDemo import conn
import redis
import os
def getOrCreate(num):
"""获取重复次数,没有数据给0"""
if num is None:
return 0
else:
return num
def setOffset(num):
"""将SparkStreaming消费Kafka的offset保存在redis中"""
# 这样保证数据不丢失,失败重启根据redis中保存的offset当做起始点
conn = redis.Redis(host="10.10.1.186", port=6379, password="")
conn.set("op_consumer", num)
def printOffsetRanges(rdd):
"""打印offsetRanges"""
for o in offsetRanges:
print("%s %s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset, o.untilOffset - o.fromOffset))
def storeOffsetRanges(rdd):
"""获取当前消费到的Kafka offset偏移量;"""
# 为保证Exactly once消费语义,可以使用Synchronize添加偏移量
global offsetRanges
offsetRanges = rdd.offsetRanges()
for o in offsetRanges:
setOffset(o.untilOffset)
printOffsetRanges(rdd)
return rdd
# 这里需要设置Pyspark运行的环境,尤其在Python2和python3同时在一起的情况下
os.environ["PYSPARK_PYTHON"] = "/usr/local/bin/python3"
# 指定SparkStreaming消费的主题
topic = 'op_consumer'
# 指定Zookeeper集群
zkQuorum = "odata-slave2:2181,odata-salve1:2181"
# 指定topic的分区
partition = 0
brokers = "odata-slave2:9092"
# 初始化SparkContext
sc = SparkSession.builder.master("local[2]").appName("streaminDemo").getOrCreate().sparkContext
sc.setLogLevel("ERROR")
# 初始化StreamingContext对象
ssc = StreamingContext(sc, 5)
# 设置SparkStreaming还原点
ssc.checkpoint("/home/soft/log/")
# 这里表示把检查点放入本地F盘系统中
# 将topic转换为hashMap形式,而python中字典就是一种hashmap
# 获取上次消费到的offset,我们从redis中拿到的是二进制数据,需要转换为int类型的数据
start = int(conn.get("op_consumer"))
# 将topic和paritition当做参数初始化TopicAndPartition对象
topicPartion = TopicAndPartition(topic, partition)
# 指定SparkStreamin消费Kakfa的起始offset
fromOffset = {topicPartion: start}
# lines = KafkaUtils.createStream(ssc, zkQuorum, 'test-consumer-group', topicMap)
lines = KafkaUtils.createDirectStream(ssc, ["op_consumer"],
kafkaParams={"metadata.broker.list": "odata-slave2:9092,"},
fromOffsets=fromOffset
)
# 注意, 取tuple下的第二个即为接收到的Kafka流
# 获取这次消费的Offset
words = lines.foreachRDD(storeOffsetRanges)
# 对获取到的Kafka数据进行单词统计
words = lines.map(lambda x: x[1]).flatMap(lambda x: x.split(" "))
result = words.map(lambda x: (x, 1)).updateStateByKey(lambda x, y: int(sum(x)) + getOrCreate(y))
result.pprint()
# 将结果保存到本地文件中
# result.saveAsTextFiles("/home/soft/1.txt")
ssc.start()
ssc.awaitTermination()