PySpark Streaming数据生产

简介

Spark Streaming是对核心Spark API的一个扩展,它能够实现对实时数据流的流式处理,并具有很好的可扩展性、高吞吐量和容错性。Spark Streaming支持从多种数据源提取数据,如:Kafka、Flume、Twitter、ZeroMQ、Kinesis以及TCP套接字,并且可以提供一些高级API来表达复杂的处理算法,如:map、reduce、join和window等,最后会通过DStream输出算子将数据发送到下一个环节。

ETL架构

本次案例来源于一个及时任务,需要接收对接几条kafka流进行处理,结果数据发送到业务方。根据以前做实时数仓的经验,还是打算分层处理,首先对原始数据进行清洗和打宽(补充字段),然后第二步再进行业务相关的处理,本次只针对第一步的数据ETL进行讨论。

框架

由于此次需求时间紧任务重、开发人员紧缺,所以采用python进行开发,具体就是pyspark streaming了。虽然笔者也是最近才开始使用python进行开发,但是已经深刻的体会到了它的灵活性以及敏捷性,原本可能用java写flink需要花费2周的任务,现在3天就能搞定。

考虑到python开发的灵活性,我们在数据处理方面可以根据业务的规律性定义各种规则,从而可以用少量代码去适配大多数业务,然后通过各种配置实现处理逻辑。

由于此次处理的是医学数据,简单的将处理逻辑归纳为三部分:

  • cpp_schema,数据结构化,将外部数据统一模型。
  • cpp_filter,医学方面较纳排,我们可以将其理解为筛选业务需要的数据。
  • cpp_table,定义输出字段,不同业务有不同的处理方法。

业务方面需要考虑的问题

  • 自动维护kafka offset消费位点,采用redis存储offset数据
  • 回写kafka的时候需要考虑分区键,写kafka是采用用户唯一标识unique_id作为key

代码

  1. 项目结构


    工程结构

github : https://github.com/dongpengfei2/spark-streaming-to-kafka

  • bin目录下存放部署脚步,我们采用Azkaban进行统一部署
  • jars目录下存放需要的java依赖包
  • redis、rediscluster是python依赖库,由于yarn集群中可能有版本冲突,所以我们直接把依赖包打入程序中
  • scripts目录下存放主程序代码
  • utils目录下存放工具类
  • shell脚本用于启动程序
  1. 本地Idea开发环境,建议采用python2.7版本
//安装合适的pyspark版本,如果本地已经安装了最新的版本,可以强制覆盖
sudo -H pip install --force-reinstall pyspark==2.4.6 -i [http://pypi.douban.com/simple/](http://pypi.douban.com/simple/) --trusted-host pypi.douban.com
//查看python工作目录,笔者本地为/Users/dongpengfei/PycharmProjects/Venv2.7/lib/python2.7/site-packages
pip show pyspark
//把java依赖包拷贝到python环境的jar目录下
cp jars/* /Users/dongpengfei/PycharmProjects/Venv2.7/lib/python2.7/site-packages/pyspark/jars/
  1. 主程序逻辑
from __future__ import print_function
import json
import os
import sys

sys.path.append(".")

from kafka import KafkaProducer
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark import SparkConf, SparkContext

cur_dir = os.path.dirname(os.path.realpath(sys.argv[0]))
sys.path.append(cur_dir + '/../scripts')
sys.path.append(cur_dir + '/../utils')

from scripts.norm_udf import run_cpp_func, mapping_unique_id, send_kafka
from utils.offset_manage import OffsetManage
from utils.redis_client import RedisClient


if __name__ == "__main__":

    source_brokers = sys.argv[1]
    source_topic = sys.argv[2]
    group_id = 'offline'

    target_brokers = sys.argv[3]
    target_topic = sys.argv[4]

    checkpoint_dir = sys.argv[5]

    print("source_brokers : %s" % source_brokers)
    print("source_topic : %s" % source_topic)
    print("target_brokers : %s" % target_brokers)
    print("target_topic : %s" % target_topic)
    print("checkpoint_dir : %s" % checkpoint_dir)

    app_name = 'spark-streaming-kafka[%s]-to-kafka[%s]' % (source_topic, target_topic)

    offsetRanges = []


    def storeOffsetRanges(rdd):
        global offsetRanges
        offsetRanges = rdd.offsetRanges()
        return rdd


    def foreach_partition_func(partitions):
        producer = KafkaProducer(bootstrap_servers=target_brokers)
        partition_conn = RedisClient()
        for message in partitions:
            line = mapping_unique_id(partition_conn, message)
            key = line["unique_id"]
            send_kafka(producer, target_topic, key, line)
        producer.close()
        partition_conn.close()


    def foreach_rdd_func(rdds):
        rdds.foreachPartition(lambda partitions: foreach_partition_func(partitions))
        last_offset = {}
        for o in offsetRanges:
            last_offset[o.partition] = o.untilOffset

        if len(last_offset) > 0:
            rdd_conn = RedisClient()
            rdd_om = OffsetManage(rdd_conn)
            rdd_om.set_last_offset(source_topic, group_id, last_offset)
            rdd_conn.close()


    conf = SparkConf() \
        .set('spark.io.compression.codec', "snappy")

    sc = SparkContext(appName=app_name, conf=conf)
    sc.setLogLevel("WARN")
    sc.setCheckpointDir(checkpoint_dir)
    ssc = StreamingContext(sc, 10)

    kafkaParams = {"metadata.broker.list": source_brokers, "group.id": group_id, "compression.codec": "snappy",
                   "auto.offset.reset": "largest"}

    fromOffsets = {}
    conn = RedisClient()
    om = OffsetManage(conn)
    offset_dict = om.get_last_offset(source_topic, group_id)
    for partition in offset_dict:
        fromOffsets[TopicAndPartition(topic=source_topic, partition=int(partition))] = long(offset_dict[partition])
    conn.close()

    print("offset_dict : %s" % offset_dict)

    kafka_streaming_rdd = KafkaUtils.createDirectStream(ssc, [source_topic],
                                                        kafkaParams=kafkaParams, fromOffsets=fromOffsets)
    lines_rdd = kafka_streaming_rdd.transform(lambda line: storeOffsetRanges(line)).checkpoint(300) \
        .map(lambda x: json.loads(x[1]))
    schema_rdd = lines_rdd.map(lambda line: run_cpp_func(source_topic, "cpp_schema", line))
    filter_rdd = schema_rdd.filter(lambda line: run_cpp_func(source_topic, "cpp_filter", line))
    normal_rdd = filter_rdd.map(lambda line: run_cpp_func(source_topic, "cpp_table", line))
    normal_rdd.foreachRDD(lambda rdd: foreach_rdd_func(rdd))

    ssc.start()
    ssc.awaitTermination()

主程序很简单,首先读取kafka的数据,然后根据逻辑的抽象,依次执行spark的map、filter、map算子,udf算子是根据配置获取,入口函数为run_cpp_func,具体函数在norm_udf.py文件中

# coding=utf8
import importlib
import json
import sys
import time

reload(sys)
sys.path.append('.')
sys.setdefaultencoding('utf-8')
sys.path.append('.')


def pcr_cpp_schema(line):
    return line["after"]


def pcr_cpp_filter(line):
    return "TEST_CODE" in line and line["TEST_CODE"] in ["NCOV", "NCOV2"]


def pcr_cpp_table(line):
    norm_dict = {"event": "art", "event_time": line["check_time"]}

    return norm_dict


def art_cpp_schema(line):
    return line["data"]


def art_cpp_filter(line):
    return "result" in line and line["result"] in ["Positive", "Negative"]


def art_cpp_table(line):
    norm_dict = {"event": "art", "event_time": line["check_time"]}

    return norm_dict


handler_func_dict = {
    "dwd_pcr": {"cpp_schema": "pcr_cpp_schema",
                "cpp_filter": "pcr_cpp_filter",
                "cpp_table": "pcr_cpp_table"}
    , "dwd_art": {"cpp_schema": "art_cpp_schema",
                  "cpp_filter": "art_cpp_filter",
                  "cpp_table": "art_cpp_table"}}
module = "scripts.norm_udf"


def run_cpp_func(topic, op_type, line):
    handler_func = handler_func_dict[topic][op_type]
    model_m = importlib.import_module(module)
    func_run = getattr(model_m, handler_func)
    return func_run(line)


def send_kafka(producer, topic, key, message_body):
    timestamp = int(time.time())
    message = {
        "id": "%s-%s" % (key, timestamp),
        "occur_time": timestamp,
        "priority": "5",
        "from": "data",
        "type": "object",
        "data": message_body
    }
    future = producer.send(topic=topic, key=key, value=json.dumps(message))
    future.get(timeout=10)


def mapping_unique_id(redis_conn, message):
    unique_key = message["key"]
    if unique_key:
        unique_id = redis_conn.hget("DATA:UNIQUE:KEY", unique_key)
        if unique_id:
            message["unique_id"] = unique_id
        else:
            message["unique_id"] = ""

    return message
  1. 工具类。首先是OffsetManage,用于手动管理kafka偏移量
import sys

sys.path.append(".")

profile_key = "data_kafka_offset_%s_%s"


class OffsetManage:

    def __init__(self, redis_client=None):
        self.redis_client = redis_client

    def get_last_offset(self, topic=None, group_id=None):
        redis_key = profile_key % (topic, group_id)
        return self.redis_client.hgetall(redis_key)

    def set_last_offset(self, topic=None, group_id=None, partition_dict=None):
        if partition_dict:
            redis_key = profile_key % (topic, group_id)
            self.redis_client.hmset(redis_key, partition_dict)

偏移量的数据存储在redis中,工具类如下

import sys
# import redis
from rediscluster import RedisCluster

sys.path.append(".")

cluster_nodes = [
    {'host': '127.0.0.1', 'port': '6080'},
    {'host': '127.0.0.1', 'port': '6081'},
    {'host': '127.0.0.1', 'port': '6082'}
]

cluster_password = "123456"

host = "localhost"


class RedisClient:
    def __init__(self):
        self.conn = RedisCluster(startup_nodes=cluster_nodes, password=cluster_password)
        # self.conn = redis.Redis(host=host)

    def hget(self, name, key):
        return self.conn.hget(name, key)

    def hgetall(self, name):
        return self.conn.hgetall(name)

    def hmset(self, name, mapping):
        self.conn.hmset(name, mapping)

    def close(self):
        self.conn.close()

工具类可以用来连接单机版的redis,也可以连redis集群

  1. 启动脚本
#!/usr/bin/env bash

export LANG="en_US.UTF-8"
export PYTHONIOENCODING=utf8

source_brokers="127.0.0.1:9092"
source_topic="dwd_pcr"

target_brokers="127.0.0.1:9092"
target_topic="dws_event"

checkpoint_dir="hdfs:///user/spark_streaming/checkpoint/pcr_report"

spark-submit \
--master yarn \
--queue t0 \
--num-executors 6 \
--driver-memory 2g \
--executor-memory 2g \
--executor-cores 2 \
--conf "spark.driver.extraClassPath=lz4-1.2.0.jar" \
--conf "spark.executor.extraClassPath=lz4-1.2.0.jar" \
--jars jars/lz4-1.2.0.jar,jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar \
--py-files ./spark-streaming-to-kafka.zip \
scripts/main.py $source_brokers $source_topic $target_brokers $target_topic $checkpoint_dir > /dev/null 2>&1 &
  1. 调优

a) 队列堵塞问题,Scheduling Delay过长导致消息处理延时


阻塞

调查后发现一方面是资源配置过大,而且任务提交到yarn的t0队列Preemption:enabled,表示资源可以被抢占。由于平常会有很多hive查询任务抢占资源,所以调小资源配置,改用不能被抢占资源的t1队列后解决了该问题


正常

b) python依赖包版本冲突问题

目前是直接在https://pypi.org/下载相应的包,直接打包到项目中,yarn在调度时优先使用项目中的依赖包。

开发过程中特别需要注意版本问题,python版本太高可能导致pyspark安装出现问题,后续我会讲解kafka的自动offset管理和delay监控问题。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,287评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,346评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,277评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,132评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,147评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,106评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,019评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,862评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,301评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,521评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,682评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,405评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,996评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,651评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,803评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,674评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,563评论 2 352

推荐阅读更多精彩内容