简介
Spark Streaming是对核心Spark API的一个扩展,它能够实现对实时数据流的流式处理,并具有很好的可扩展性、高吞吐量和容错性。Spark Streaming支持从多种数据源提取数据,如:Kafka、Flume、Twitter、ZeroMQ、Kinesis以及TCP套接字,并且可以提供一些高级API来表达复杂的处理算法,如:map、reduce、join和window等,最后会通过DStream输出算子将数据发送到下一个环节。
本次案例来源于一个及时任务,需要接收对接几条kafka流进行处理,结果数据发送到业务方。根据以前做实时数仓的经验,还是打算分层处理,首先对原始数据进行清洗和打宽(补充字段),然后第二步再进行业务相关的处理,本次只针对第一步的数据ETL进行讨论。
框架
由于此次需求时间紧任务重、开发人员紧缺,所以采用python进行开发,具体就是pyspark streaming了。虽然笔者也是最近才开始使用python进行开发,但是已经深刻的体会到了它的灵活性以及敏捷性,原本可能用java写flink需要花费2周的任务,现在3天就能搞定。
考虑到python开发的灵活性,我们在数据处理方面可以根据业务的规律性定义各种规则,从而可以用少量代码去适配大多数业务,然后通过各种配置实现处理逻辑。
由于此次处理的是医学数据,简单的将处理逻辑归纳为三部分:
- cpp_schema,数据结构化,将外部数据统一模型。
- cpp_filter,医学方面较纳排,我们可以将其理解为筛选业务需要的数据。
- cpp_table,定义输出字段,不同业务有不同的处理方法。
业务方面需要考虑的问题
- 自动维护kafka offset消费位点,采用redis存储offset数据
- 回写kafka的时候需要考虑分区键,写kafka是采用用户唯一标识unique_id作为key
代码
-
项目结构
github : https://github.com/dongpengfei2/spark-streaming-to-kafka
- bin目录下存放部署脚步,我们采用Azkaban进行统一部署
- jars目录下存放需要的java依赖包
- redis、rediscluster是python依赖库,由于yarn集群中可能有版本冲突,所以我们直接把依赖包打入程序中
- scripts目录下存放主程序代码
- utils目录下存放工具类
- shell脚本用于启动程序
- 本地Idea开发环境,建议采用python2.7版本
//安装合适的pyspark版本,如果本地已经安装了最新的版本,可以强制覆盖
sudo -H pip install --force-reinstall pyspark==2.4.6 -i [http://pypi.douban.com/simple/](http://pypi.douban.com/simple/) --trusted-host pypi.douban.com
//查看python工作目录,笔者本地为/Users/dongpengfei/PycharmProjects/Venv2.7/lib/python2.7/site-packages
pip show pyspark
//把java依赖包拷贝到python环境的jar目录下
cp jars/* /Users/dongpengfei/PycharmProjects/Venv2.7/lib/python2.7/site-packages/pyspark/jars/
- 主程序逻辑
from __future__ import print_function
import json
import os
import sys
sys.path.append(".")
from kafka import KafkaProducer
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark import SparkConf, SparkContext
cur_dir = os.path.dirname(os.path.realpath(sys.argv[0]))
sys.path.append(cur_dir + '/../scripts')
sys.path.append(cur_dir + '/../utils')
from scripts.norm_udf import run_cpp_func, mapping_unique_id, send_kafka
from utils.offset_manage import OffsetManage
from utils.redis_client import RedisClient
if __name__ == "__main__":
source_brokers = sys.argv[1]
source_topic = sys.argv[2]
group_id = 'offline'
target_brokers = sys.argv[3]
target_topic = sys.argv[4]
checkpoint_dir = sys.argv[5]
print("source_brokers : %s" % source_brokers)
print("source_topic : %s" % source_topic)
print("target_brokers : %s" % target_brokers)
print("target_topic : %s" % target_topic)
print("checkpoint_dir : %s" % checkpoint_dir)
app_name = 'spark-streaming-kafka[%s]-to-kafka[%s]' % (source_topic, target_topic)
offsetRanges = []
def storeOffsetRanges(rdd):
global offsetRanges
offsetRanges = rdd.offsetRanges()
return rdd
def foreach_partition_func(partitions):
producer = KafkaProducer(bootstrap_servers=target_brokers)
partition_conn = RedisClient()
for message in partitions:
line = mapping_unique_id(partition_conn, message)
key = line["unique_id"]
send_kafka(producer, target_topic, key, line)
producer.close()
partition_conn.close()
def foreach_rdd_func(rdds):
rdds.foreachPartition(lambda partitions: foreach_partition_func(partitions))
last_offset = {}
for o in offsetRanges:
last_offset[o.partition] = o.untilOffset
if len(last_offset) > 0:
rdd_conn = RedisClient()
rdd_om = OffsetManage(rdd_conn)
rdd_om.set_last_offset(source_topic, group_id, last_offset)
rdd_conn.close()
conf = SparkConf() \
.set('spark.io.compression.codec', "snappy")
sc = SparkContext(appName=app_name, conf=conf)
sc.setLogLevel("WARN")
sc.setCheckpointDir(checkpoint_dir)
ssc = StreamingContext(sc, 10)
kafkaParams = {"metadata.broker.list": source_brokers, "group.id": group_id, "compression.codec": "snappy",
"auto.offset.reset": "largest"}
fromOffsets = {}
conn = RedisClient()
om = OffsetManage(conn)
offset_dict = om.get_last_offset(source_topic, group_id)
for partition in offset_dict:
fromOffsets[TopicAndPartition(topic=source_topic, partition=int(partition))] = long(offset_dict[partition])
conn.close()
print("offset_dict : %s" % offset_dict)
kafka_streaming_rdd = KafkaUtils.createDirectStream(ssc, [source_topic],
kafkaParams=kafkaParams, fromOffsets=fromOffsets)
lines_rdd = kafka_streaming_rdd.transform(lambda line: storeOffsetRanges(line)).checkpoint(300) \
.map(lambda x: json.loads(x[1]))
schema_rdd = lines_rdd.map(lambda line: run_cpp_func(source_topic, "cpp_schema", line))
filter_rdd = schema_rdd.filter(lambda line: run_cpp_func(source_topic, "cpp_filter", line))
normal_rdd = filter_rdd.map(lambda line: run_cpp_func(source_topic, "cpp_table", line))
normal_rdd.foreachRDD(lambda rdd: foreach_rdd_func(rdd))
ssc.start()
ssc.awaitTermination()
主程序很简单,首先读取kafka的数据,然后根据逻辑的抽象,依次执行spark的map、filter、map算子,udf算子是根据配置获取,入口函数为run_cpp_func,具体函数在norm_udf.py文件中
# coding=utf8
import importlib
import json
import sys
import time
reload(sys)
sys.path.append('.')
sys.setdefaultencoding('utf-8')
sys.path.append('.')
def pcr_cpp_schema(line):
return line["after"]
def pcr_cpp_filter(line):
return "TEST_CODE" in line and line["TEST_CODE"] in ["NCOV", "NCOV2"]
def pcr_cpp_table(line):
norm_dict = {"event": "art", "event_time": line["check_time"]}
return norm_dict
def art_cpp_schema(line):
return line["data"]
def art_cpp_filter(line):
return "result" in line and line["result"] in ["Positive", "Negative"]
def art_cpp_table(line):
norm_dict = {"event": "art", "event_time": line["check_time"]}
return norm_dict
handler_func_dict = {
"dwd_pcr": {"cpp_schema": "pcr_cpp_schema",
"cpp_filter": "pcr_cpp_filter",
"cpp_table": "pcr_cpp_table"}
, "dwd_art": {"cpp_schema": "art_cpp_schema",
"cpp_filter": "art_cpp_filter",
"cpp_table": "art_cpp_table"}}
module = "scripts.norm_udf"
def run_cpp_func(topic, op_type, line):
handler_func = handler_func_dict[topic][op_type]
model_m = importlib.import_module(module)
func_run = getattr(model_m, handler_func)
return func_run(line)
def send_kafka(producer, topic, key, message_body):
timestamp = int(time.time())
message = {
"id": "%s-%s" % (key, timestamp),
"occur_time": timestamp,
"priority": "5",
"from": "data",
"type": "object",
"data": message_body
}
future = producer.send(topic=topic, key=key, value=json.dumps(message))
future.get(timeout=10)
def mapping_unique_id(redis_conn, message):
unique_key = message["key"]
if unique_key:
unique_id = redis_conn.hget("DATA:UNIQUE:KEY", unique_key)
if unique_id:
message["unique_id"] = unique_id
else:
message["unique_id"] = ""
return message
- 工具类。首先是OffsetManage,用于手动管理kafka偏移量
import sys
sys.path.append(".")
profile_key = "data_kafka_offset_%s_%s"
class OffsetManage:
def __init__(self, redis_client=None):
self.redis_client = redis_client
def get_last_offset(self, topic=None, group_id=None):
redis_key = profile_key % (topic, group_id)
return self.redis_client.hgetall(redis_key)
def set_last_offset(self, topic=None, group_id=None, partition_dict=None):
if partition_dict:
redis_key = profile_key % (topic, group_id)
self.redis_client.hmset(redis_key, partition_dict)
偏移量的数据存储在redis中,工具类如下
import sys
# import redis
from rediscluster import RedisCluster
sys.path.append(".")
cluster_nodes = [
{'host': '127.0.0.1', 'port': '6080'},
{'host': '127.0.0.1', 'port': '6081'},
{'host': '127.0.0.1', 'port': '6082'}
]
cluster_password = "123456"
host = "localhost"
class RedisClient:
def __init__(self):
self.conn = RedisCluster(startup_nodes=cluster_nodes, password=cluster_password)
# self.conn = redis.Redis(host=host)
def hget(self, name, key):
return self.conn.hget(name, key)
def hgetall(self, name):
return self.conn.hgetall(name)
def hmset(self, name, mapping):
self.conn.hmset(name, mapping)
def close(self):
self.conn.close()
工具类可以用来连接单机版的redis,也可以连redis集群
- 启动脚本
#!/usr/bin/env bash
export LANG="en_US.UTF-8"
export PYTHONIOENCODING=utf8
source_brokers="127.0.0.1:9092"
source_topic="dwd_pcr"
target_brokers="127.0.0.1:9092"
target_topic="dws_event"
checkpoint_dir="hdfs:///user/spark_streaming/checkpoint/pcr_report"
spark-submit \
--master yarn \
--queue t0 \
--num-executors 6 \
--driver-memory 2g \
--executor-memory 2g \
--executor-cores 2 \
--conf "spark.driver.extraClassPath=lz4-1.2.0.jar" \
--conf "spark.executor.extraClassPath=lz4-1.2.0.jar" \
--jars jars/lz4-1.2.0.jar,jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar \
--py-files ./spark-streaming-to-kafka.zip \
scripts/main.py $source_brokers $source_topic $target_brokers $target_topic $checkpoint_dir > /dev/null 2>&1 &
- 调优
a) 队列堵塞问题,Scheduling Delay过长导致消息处理延时
调查后发现一方面是资源配置过大,而且任务提交到yarn的t0队列Preemption:enabled,表示资源可以被抢占。由于平常会有很多hive查询任务抢占资源,所以调小资源配置,改用不能被抢占资源的t1队列后解决了该问题
b) python依赖包版本冲突问题
目前是直接在https://pypi.org/下载相应的包,直接打包到项目中,yarn在调度时优先使用项目中的依赖包。
结
开发过程中特别需要注意版本问题,python版本太高可能导致pyspark安装出现问题,后续我会讲解kafka的自动offset管理和delay监控问题。