Spark菜鸟学习营Day2 分布式系统需求分析

Spark菜鸟学习营Day2

分布式系统需求分析

本分析主要针对从原有代码向Spark的迁移。要注意的是Spark和传统开发有着截然不同的思考思路,所以我们需要首先对原有代码进行需求分析,形成改造思路后,再着手开发。
对于输入和输出,请注意,指的是以程序为边界的输入和输出情况。

主要迁移点:

A:批量数据清理

  • 重点:分析要清理的表在哪里

    • A1.参数表:存放Oracle、Redis。清理Oracle就可以,Redis会同步清理
      • 表一般是以par_开头
    • A2.输入数据表(由数据接收或者其他渠道导入):存放Oracle、HBase,两边都要清理。
      • 表一般是以_temp结尾
    • A3.中间数据表(仅拆分内部使用):存放RDD,需要清理RDD。
      • 表仅被名称中含有split字样的程序调用
    • A4.输出数据表(非拆分模块使用):存放Oracle,可能存放HBase,两边都要清理。
      • 表被其他程序调用
  • 实际情况下表的用途会有组合的情况

    • A3+A4.中间数据表 and 输出数据表
  • 输入:可能有,需分析,比如删除条件有表关联情况

  • 输出:无

B:批量数据转换

  • 特征:insert ... select 语句
    这是最接近标准化的分布式处理,可以使用Dataframe或RDD编程来开发。
  • 开发方法:
    • B1.Dataframe
      • 所有输入表都是参数表、输入数据表、中间表,并且SQL语句支持(不包含not exists等特殊语法),使用Dataframe编程
    • B2. RDD
      • 不满足B1条件
  • 输入:肯定有
  • 输出:肯定有

C:单行循环转换

  • 特征:pl/sql的游标操作,包括for语法的游标操作。

C1. 游标转RDD

将游标逻辑转为RDD的操作。

  • 输入:肯定有
  • 输出:无

C2. 单行数据过滤

一般语句中有continue或者goto语句,指对单行数据进行判断,满足条件就处理,否则不处理
这里可能会出现对Oracle数据的判断,需提前把Oracle数据预先缓存出来,逻辑中访问缓存下来的数据,避免对于数据库的大量连接。

  • 输入:无
  • 输出:无

C3. 重复数据过滤

相比单行过滤更加复杂,如果与已处理数据不重复才会处理

  • 输入:无
  • 输出:无

C4.单行数据清理

会根据单行数据的条件执行数据清理操作

  • 输入:无
  • 输出:无

C5.单行数据输出

一般使用map或mapPartitions算子。
这部分设计比较难,有几个设计点:

  • 如果有多个输出,需要进行多次对的map。
  • 如果多次输出有公共数据,需要额外增加一次map来处理公共数据。
  • 输入:无
  • 输出:肯定有

D.优化处理

不是直接从原有代码转化,主要从性能角度出发来添加,包括:

  • D1.缓存Oracle数据
    • 输入:肯定有
    • 输出:无
  • D2.缓存Redis数据
    • 输入:肯定有
    • 输出:无

分析样例1

  • 步骤1:清理中间表+结果数据表(A3+A4)
  • 输入:无
  • 输出:无
    DELETE out_trd_qtsl t WHERE t.rq = v_last_date;
  • 步骤2:输出数据表,清理Oracle(A4)
  • 输入:无
  • 输出:无
  DELETE FROM out_trd_qtsl_sub t WHERE t.rq = p_i_date;
  • 步骤3:输出数据表,清理Oracle(A4)
  • 输入:无
  • 输出:无
  DELETE FROM out_trd_qtsl_his t WHERE t.rq = p_i_date;
  • 步骤4:使用Dataframe的select语句来进行处理(B1)
  • 输入:qtsl_temp:Dataframe ; par_fund_partner:DataFrame
  • 输出:无
    INSERT INTO out_trd_qtsl
      (scdm,
       hydm,
       ...
            SELECT scdm,
             hydm,
             ...
         FROM qtsl_temp a
       WHERE a.rq = v_last_date
         AND a.zqzh IN (SELECT partner_code
                          FROM par_fund_partner
                         WHERE market_code = v_scdm -- 上海市场
                           AND sub_partner_code = '000000' --不含子股东代码
                           AND v_last_date BETWEEN inure_begin_date AND
                               inure_end_date);
  • 步骤5:游标转RDD(C1)
  • 输入:qtsl_temp:RDD ; par_sys_fill_partner:RDD
  • 输出:无
 SELECT nvl(a.scdm, '') scdm, --市场代码
           nvl(a.hydm, '') hydm, --结算参与人的清算编号
           nvl(a.sjlx, '') sjlx, --数据类型
           ...
      FROM qtsl_temp a
     WHERE a.rq = v_last_date
       AND (a.zqzh IN (SELECT t.partner_code FROM par_sys_fill_partner t) OR
           a.zqzh IS NULL OR a.zqzh = '0')
  • 步骤6:缓存Oracle数据(D1)
  • 输入:out_trd_qtsl_his:Oracle
  • 输出:无
      str := 'select count(1) from out_trd_qtsl_his t where ';
      IF r_qtsl_sub.scdm IS NOT NULL THEN
        str := str || 't.SCDM = ''' || r_qtsl_sub.scdm || ''' and '; --市场代码
      END IF;
      ...
      EXECUTE IMMEDIATE str
        INTO v_count;
  • 步骤7:第一次map操作,根据Oracle数据进行过滤,并生成补录编号(C2+C5)
  • 输入:无
  • 输出:无
 OPEN c_qtsl_sub;
  LOOP
    <<error_row>>
    FETCH c_qtsl_sub
      INTO r_qtsl_sub;
    EXIT WHEN c_qtsl_sub%NOTFOUND;
    ...
     --进行数据过滤
    IF v_count = 0 THEN
        --生成补录编号
        SELECT lpad(to_char(seq_filldata_no.NEXTVAL), 15, '0')
          INTO v_seq
          FROM dual;
    ...
    END IF;
      END LOOP;
  CLOSE c_qtsl_sub;
  • 步骤8:第二次map操作,输出数据(C5)
  • 输入:无
  • 输出:out_trd_qtsl_his:RDD
          INSERT INTO out_trd_qtsl_his
            (scdm, --市场代码
             hydm, --结算参与人的清算编号
            ...
             seq_no, --补录编号
             sub_no --内部顺序号
             )
          VALUES
            (nvl(r_qtsl_sub.scdm, ''), --市场代码
             nvl(r_qtsl_sub.hydm, ''), --结算参与人的清算编号
            ...
             v_seq,
             '0');
  • 步骤9:第三次map操作,输出数据(C5)
  • 输入:无
  • 输出:out_trd_qtsl_sub:RDD
INSERT INTO out_trd_qtsl_sub
            (scdm, --市场代码
             hydm, --结算参与人的清算编号
            ...
             seq_no, --补录编号
             sub_no, --内部顺序号
             sub_no_pre --父序号
             )
          VALUES
            (nvl(r_qtsl_sub.scdm, ''), --市场代码
             nvl(r_qtsl_sub.hydm, ''), --结算参与人的清算编号
             ...
             v_seq,
             '1',
             '0');

分析样例2

  • 步骤1:清理中间+输出表(A3+A4)
  • 输入:无
  • 输出:无
      DELETE out_trd_bloomberg t0
       WHERE t0.data_date BETWEEN v_last_date AND p_i_date;
  • 步骤2: 数据转换(B1)
  • 输入:bloomberg_temp:Dataframe
  • 输出:无
    BEGIN
      SELECT COUNT(1)
        INTO v_count2
        FROM bloomberg_temp t
       WHERE t.data_date BETWEEN v_last_date AND p_i_date;
    EXCEPTION
      WHEN OTHERS THEN
        v_count2 := 0;
    END;
  • 步骤3:游标转RDD(C1)
  • 输入:bloomberg_temp:RDD ; par_sys_stock_bmtx:RDD ; par_sys_coin:RDD ;par_exchange_coin_trans:RDD
  • 输出:无
SELECT t2.security_id security_id,
               t1.price_date price_date,
              ...
               decode(v1.to_coin, null, t1.coin, v1.to_coin) coin, --t1.coin,
               round(decode(v1.rate, null, t1.zspj, t1.zspj * v1.rate), 6) zspj, --t1.zspj,
               round(decode(v1.rate, null, t1.jkpj, t1.jkpj * v1.rate), 6) jkpj, --t1.jkpj,
              ...
          FROM bloomberg_temp t1,
               par_sys_stock_bmtx t2,
               (select t4.coin_name from_coin,
                       t5.coin_name to_coin,
                       t3.rate,
                       t3.inure_begin_date,
                       t4.inure_end_date
                  from par_exchange_coin_trans t3,
                       par_sys_coin            t4,
                       par_sys_coin            t5
                 where t3.from_coin = t4.coin_code
                   and t3.to_coin = t5.coin_code
                   and p_i_date BETWEEN t3.inure_begin_date AND
                       t3.inure_end_date
                   and p_i_date BETWEEN t4.inure_begin_date AND
                       t4.inure_end_date
                   and p_i_date BETWEEN t5.inure_begin_date AND
                       t5.inure_end_date) v1
         WHERE t1.data_date BETWEEN v_last_date AND p_i_date
           AND t1.stock_code = t2.bm_code
           AND t2.bm_type IN ('1','2','10','11') --ISIN code,RIC,CUSIP
           AND p_i_date BETWEEN t2.inure_begin_date AND t2.inure_end_date
           AND t1.coin = v1.from_coin(+)
           AND substr(t2.security_id, 3, 3) <> '056'
  • 步骤4:单行数据过滤(C2)
  • 输入:无
  • 输出:无
 IF rec.stock_kind = '01' THEN
          v_count := 0;
          SELECT COUNT(1)
            INTO v_count
            FROM par_sys_stock t6,
                 par_sys_coin t7
           WHERE rec.security_id = t6.security_id
             AND p_i_date BETWEEN t6.inure_begin_date AND t6.inure_end_date
             AND rec.coin = t7.coin_name
             AND t7.coin_code = t6.coin_code;
           IF v_count = 0 THEN
             continue;
           END IF;
        ...
        ELSE
          NULL;
        END IF;
  • 步骤5:重复数据过滤(C3)
  • 输入:无
  • 输出:无
v_count := 0;
        SELECT COUNT(1)
          INTO v_count
          FROM out_trd_bloomberg t
          WHERE t.coin = rec.coin
            AND t.security_id = rec.security_id
            AND t.price_date = rec.price_date
            AND t.data_date = rec.data_date
            AND t.country = rec.country
            AND t.market = rec.market;
        IF v_count > 0 THEN
               ...
              continue;
        END IF;
  • 步骤6:单行数据输出(C5)
  • 输入:无
  • 输出:out_trd_bloomberg:RDD
INSERT INTO out_trd_bloomberg
          ( SECURITY_ID,
            PRICE_DATE,
            COUNTRY,
            ...
          ) VALUES
          (rec.security_id,
          rec.price_date,
          rec.country,
          ...
          );
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,335评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,895评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,766评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,918评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,042评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,169评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,219评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,976评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,393评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,711评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,876评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,562评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,193评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,903评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,699评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,764评论 2 351

推荐阅读更多精彩内容