pysparkStreaming模拟生产环境收集日志进行SparkStreamin实时流处理

项目架构

概述:
    本项目是模拟通过Log4j日志生成,配置输出数据源,将日志输出到flume日志收集系统,然后在flume中配置输出流到kafka,
    最后启动SparkStreamin项目,配置Kafka为数据输入源,进行实时单词频次统计,使用redis保存offset偏移量,防止出现数据丢失
    情框

Java端配置log4j日志输出

log4j日志生成端:
    package kafkaDemo;
    import org.apache.log4j.*;
    /**
     * 该类是用来模拟生成日志记录
     */
    public class logDemo {
        // 获取Loggger构建器
        private static Logger logger = Logger.getLogger(logDemo.class.getName());
    
        public static void main(String[] args) {
            for (int i = 0; i <= 9; i++) {
                logger.info("hello " + "spark" + " demo" + " shanghai " + i);
            }
        }
    }    
log4j.properties
    log4j.rootLogger                              =INFO,stdout,flume
    log4j.appender.stdout                         = org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.target                  = System.out
    log4j.appender.stdout.layout                  =org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n        
    
    log4j.appender.flume                          = org.apache.flume.clients.log4jappender.Log4jAppender
    log4j.appender.flume.Hostname                 = 10.10.2.155
    log4j.appender.flume.Port                     = 44444
    log4j.appender.flume.UnsafeMode               = true
所需jars:可以使用Maven
    flume-avro-source-1.6.0.jar
    flume-dataset-sink-1.6.0.jar
    flume-file-channel-1.6.0.jar
    flume-hdfs-sink-1.6.0.jar
    flume-hive-sink-1.6.0.jar
    flume-irc-sink-1.6.0.jar
    flume-jdbc-channel-1.6.0.jar
    flume-jms-source-1.6.0.jar
    flume-kafka-channel-1.6.0.jar
    flume-kafka-source-1.6.0.jar
    flume-ng-auth-1.6.0.jar
    flume-ng-configuration-1.6.0.jar
    flume-ng-core-1.6.0.jar
    flume-ng-elasticsearch-sink-1.6.0.jar
    flume-ng-embedded-agent-1.6.0.jar
    flume-ng-hbase-sink-1.6.0.jar
    flume-ng-kafka-sink-1.6.0.jar
    flume-ng-log4jappender-1.6.0.jar
    flume-ng-morphline-solr-sink-1.6.0.jar
    flume-ng-node-1.6.0.jar
    flume-ng-sdk-1.6.0.jar
    flume-scribe-source-1.6.0.jar
    flume-spillable-memory-channel-1.6.0.jar
    flume-thrift-source-1.6.0.jar
    flume-tools-1.6.0.jar
    flume-twitter-source-1.6.0.jar

python: 需要将spark-streaming-kafka jar包引入

概述:
    jar包啥的就不说了,网上都可以百度到,直接上代码。
code:
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition 
    from pyspark.sql import SparkSession
    from redisDemo.redisDemo import conn
    import redis
    import os
    
    
    def getOrCreate(num):
        """获取重复次数,没有数据给0"""
        if num is None:
            return 0
        else:
            return num
    
    
    def setOffset(num):
        """将SparkStreaming消费Kafka的offset保存在redis中"""
        # 这样保证数据不丢失,失败重启根据redis中保存的offset当做起始点
        conn = redis.Redis(host="10.10.1.186", port=6379, password="")
        conn.set("op_consumer", num)
    
    
    def printOffsetRanges(rdd):
        """打印offsetRanges"""
        for o in offsetRanges:
            print("%s %s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset, o.untilOffset - o.fromOffset))
    
    
    def storeOffsetRanges(rdd):
        """获取当前消费到的Kafka offset偏移量;"""
        # 为保证Exactly once消费语义,可以使用Synchronize添加偏移量
        global offsetRanges
        offsetRanges = rdd.offsetRanges()
        for o in offsetRanges:
            setOffset(o.untilOffset)
        printOffsetRanges(rdd)
        return rdd
    
    
    # 这里需要设置Pyspark运行的环境,尤其在Python2和python3同时在一起的情况下
    os.environ["PYSPARK_PYTHON"] = "/usr/local/bin/python3"
    # 指定SparkStreaming消费的主题
    topic = 'op_consumer'
    # 指定Zookeeper集群
    zkQuorum = "odata-slave2:2181,odata-salve1:2181"
    # 指定topic的分区
    partition = 0
    brokers = "odata-slave2:9092"
    # 初始化SparkContext
    sc = SparkSession.builder.master("local[2]").appName("streaminDemo").getOrCreate().sparkContext
    sc.setLogLevel("ERROR")
    # 初始化StreamingContext对象
    ssc = StreamingContext(sc, 5)
    # 设置SparkStreaming还原点
    ssc.checkpoint("/home/soft/log/")
    # 这里表示把检查点放入本地F盘系统中
    # 将topic转换为hashMap形式,而python中字典就是一种hashmap
    
    # 获取上次消费到的offset,我们从redis中拿到的是二进制数据,需要转换为int类型的数据
    start = int(conn.get("op_consumer"))
    # 将topic和paritition当做参数初始化TopicAndPartition对象
    topicPartion = TopicAndPartition(topic, partition)
    # 指定SparkStreamin消费Kakfa的起始offset
    fromOffset = {topicPartion: start}
    # lines = KafkaUtils.createStream(ssc, zkQuorum, 'test-consumer-group', topicMap)
    lines = KafkaUtils.createDirectStream(ssc, ["op_consumer"],
                                          kafkaParams={"metadata.broker.list": "odata-slave2:9092,"},
                                          fromOffsets=fromOffset
                                          )
    # 注意, 取tuple下的第二个即为接收到的Kafka流
    # 获取这次消费的Offset
    words = lines.foreachRDD(storeOffsetRanges)
    # 对获取到的Kafka数据进行单词统计
    words = lines.map(lambda x: x[1]).flatMap(lambda x: x.split(" "))
    
    result = words.map(lambda x: (x, 1)).updateStateByKey(lambda x, y: int(sum(x)) + getOrCreate(y))
    result.pprint()
    # 将结果保存到本地文件中
    # result.saveAsTextFiles("/home/soft/1.txt")
    
    ssc.start()
    ssc.awaitTermination()
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,163评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,301评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,089评论 0 352
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,093评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,110评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,079评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,005评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,840评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,278评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,497评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,394评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,980评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,628评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,649评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,548评论 2 352

推荐阅读更多精彩内容