Spark菜鸟学习营Day5 分布式程序开发

Spark菜鸟学习营Day5

分布式程序开发

这一章会和我们前面进行的需求分析进行呼应,完成程序的开发。

开发步骤

分布式系统开发是一个复杂的过程,对于复杂过程,我们需要分解为简单步骤的组合。

  • 针对每个简单步骤,难度会降低,学习成本降低

  • 每个步骤都可以作为里程碑,可以反馈进展,同时,有助树立目标感。

  • Step1:需求分析

    • 1.1 拆分程序,形成步骤
      • 以语句为单位拆分,一般一个语句就是一个步骤
    • 1.2 对步骤进行分类
      • 根据需求分析指南,分入A1、A2、B1等规则小类
    • 1.3 分析每个步骤的输入输出
  • Step2:建表

    • 使用PojoMaker工具
  • Step3:新建测试程序

    • 具体可以看测试案例编写的指南
  • Step4:测试数据准备

    • 4.1 数据文件建立
      • 使用DataPrepareUtil工具
    • 4.2 将数据传入程序
      • 使用mock方法

开发准备

  • Step1:首先在测试类中,新建对处理类的调用。
    public void runTest(ComputeBatchNode cbn, JavaSparkContext sc, Hashtable params, ComputeResult result) {
        prepareData(cbn, sc, params, result);
        new QtslProcessor(cbn, sc, params, result).process();
    }
  • Step2:新建处理类

在标红处Alt+Enter,新建处理类
输入参数:

  • cbn 运行环境,会存公共对象,比如数据库连接、redis连接等
  • sc Spark实例,调用各种Spark命令
  • params 参数,可以保存内部参数,也可以保存外部参数
  • result 输出,程序的对外输出
    ![2016-07-27 13-43-31](http://o6jujlzry.bkt.clouddn.com/2016-09-03-2016-07-27 13-43-31.jpg)

建立如下的初始代码:

public class QtslProcessor extends SplitProcessor {
    public QtslProcessor(ComputeBatchNode cbn, JavaSparkContext sc, Hashtable params, ComputeResult result) {
        super(cbn, sc, params, result);
    }
    @Override
    public String process() {
        return null;
    }
}

开发开发开发

A.数据清理

  • 因为不同的数据库的删除逻辑不一致,所以采用统一api的方式调用,可以将不同数据库的写法统一
    • 语句入口方法是delete()
    • 语句需要通过appendDeleteCondition来进行输出

B.批量数据转换

B1.Dataframe方式

分为几个步骤:

  • Step1 对SQL语句进行规范化
  • Step1.1 列名对准
        insert into tab1(a1,a2,a3)
            select b1,b2,b3 from tab2

替换为

        insert into tab1(a1,a2,a3)
            select b1 a1,b2 a2,b3 a3 from tab2
  • Step1.2 替换逻辑
  • IN语句
           Select a from tab1 t 
                where t.a in (select a from tab1)         

替换为

           Select a from tab1 t , (select distinct a from tab1) t2
                where t.a = t1.a            
  • Step1.3 替换函数

    • 待补充
  • Step1.4 替换变量

    • 在变量外面增加#{}
    select a from tab1 t where a = v_a

替换为

   select a from tab1 t where a = #{v_a}    
  • Step2 初始化变量

    • 调用putSqlParam
  • Step3 执行Sql

    • 调用runSparkSql方法
  • Step4 输出结果

    • 调用appendResultDataframe方法

C.逐笔循环数据转换

  • C1.Cursor转RDD
    这是最为复杂的一个步骤,需要掌握RDD的开发基础

  • C2.单行数据过滤
    采用fiter方法,内部采用where方法来定义条件

            JavaRDD<QtslTempPojo> perparedQtslRDD = filtedQtslRDD.filter(
                v1 -> where(() -> v1.getZQZH().equals("0"))
                        .or(() -> v1.getZQZH().equals(""))
                        .get());

对于in,exists,not in , not exists 这样的单行过滤条件,我们需要采用anyMatch方法来进行判断
可以使用comparePojo方法对两个pojo进行比较
如下是一个not exists逻辑

                perparedQtslRDD
                        .filter(v1 -> where().and_not(
                                () -> oracleData.stream().anyMatch(
                                        record -> comparePojo(record, v1)
                                ))
                                .get()
                        )
  • C3.过滤重复数据
    采用groupBy方法,对每个分组只返回一条记录
    .groupBy(
                        v1 -> new Tuple2<>(v1.getFundCode(), v1.getSecurityId()))
    .map(
                        v1 -> toList(v1._2).get(0));    
  • C4.单行数据删除
    待补充

  • C5.单行数据输出
    实际就是从一个pojo转换成另外一个pojo,考虑到可能出现数据异常的情况,推荐采用flatMap方法实现
    可以使用clonePojo方法,将两个pojo中相同的字段自动转换,差异字段需要额外赋值

        .flatMap(new FlatMapFunction<QtslTempPojoExtend, OutTrdQtslSubPojo>() {
            @Override
            public Iterable<OutTrdQtslSubPojo> call(QtslTempPojoExtend v1) {
                OutTrdQtslSubPojo outTrdQtslSubPojo = clonePojo(v1, OutTrdQtslSubPojo.class).orElseGet(null);
                if (outTrdQtslSubPojo != null) {
                    outTrdQtslSubPojo.setDEAL_FLAG("0");
                    outTrdQtslSubPojo.setSEQ_NO(Long.toString(v1.getSeqNo()));
                    outTrdQtslSubPojo.setSUB_NO("1");
                    outTrdQtslSubPojo.setSUB_NO_PRE("0");
                }
                return result(outTrdQtslSubPojo);
            }

D. 优化处理

  • D1.从Oracle取数
    第一步,需要在sqlmap文件中配置sql语句
    <select id="selectFundCjqsTmp" resultType="java.util.Map" parameterType="HashMap">
        <![CDATA[
            SELECT t.*
                          FROM dat_fund_cjqs_tmp t
                         WHERE t.bcrq = #{businessDate}
                           AND t.comfirm_status = '1')
        ]]>
    </select>

第二步,通过getPojoListFromMybatis方法获取数据

        List<OutTrdQtslHisPojo> oracleData = this
                .getPojoListFromMybatis("splitSqlMapper.getQtslHis", OutTrdQtslHisPojo.class);

样例代码1

  • 步骤1:清理中间表+结果数据表(A3+A4) *
        appendDeleteCondition(
                delete("out_trd_qtsl").where(field("rq").eq(lastDate))
        );
  • 步骤2:输出数据表,清理Oracle(A4) *
appendDeleteCondition(
                delete("out_trd_qtsl_sub").where(field("rq").eq(splitDate)));
  • 步骤3:输出数据表,清理Oracle(A4) *
        appendDeleteCondition(
                delete("out_trd_qtsl_his").where(field("rq").eq(splitDate)));
  • 步骤4:使用Dataframe的select语句来进行处理(B1) *
        putSqlParam("v_scdm", "001");
        putSqlParam("v_last_date", lastDate);
        DataFrame df2 = runSparkSql("  SELECT scdm, hydm, sjlx, zqzh, xwh, zqdm, zqlb, ltlx, qylb, gpnf, sl1, sl2,\n" +
                "         bh1, bh2, fzdm, rq, bcsm, byn\n" +
                "           FROM qtsl_temp a, (SELECT distinct partner_code\n" +
                "            FROM par_fund_partner\n" +
                "           WHERE market_code =  #{v_scdm}\n" +
                "             AND sub_partner_code = '000000'\n" +
                "             AND #{v_last_date} BETWEEN inure_begin_date AND inure_end_date) b\n" +
                "           WHERE a.rq =  #{v_last_date}\n" +
                "               AND a.zqzh = b.partner_code\n" +
                "        ");
        appendResultDataframe(df2, OutTrdQtslPojo.class);

待续...

练习3

题目

进行RDD去重操作的训练

  1. 读取交易记录
  2. 按照fundCode+SecurityId进行分组
  3. 取出quantity最小的那条记录
  4. 输出结果
  • 步骤1:对RDD数据进行分组,groupBy方法传入的是分组条件,请注意这里是对两个字段分组,所以我们输出一个Tuple2。
return this.getInputRDD(PracticePojo.class)
                .groupBy(
                        v1 -> new Tuple2<>(v1.getFundCode(), v1.getSecurityId()))

这里需要注意的是,groupBy方法的返回是:
JavaPairRDD<Tuple2<String, String>, Iterable<PracticePojo>>
这是一个key-value结果,其中key是分组的键值,而value是一个数组。
比如下:输入[ (1 2 3); (1 2 4); (2 3 4) ]
如果按照字段1进行groupBy
结果为: (1, [(1 2 3 ) , (1,2,4)])
(2, [(2 3 4)])

  • 步骤2:对数据的迭代数据排序,并返回第一条记录。比较算法,采用if逻辑可以防止变量溢出。
 .map(
                        v1 -> toOrderedList(v1._2,
                                (t1, t2) -> {
                                    //t1比t2大,返回正数
                                    //t1比t2小,返回负数
                                    //t1和t2一样大,返回0
                                    if (t1.getQuantity() > t2.getQuantity()) return 1;
                                    if (t1.getQuantity() < t2.getQuantity()) return -1;
                                    return 0;
                                }
                        ).get(0));

练习4

题目:数据关联过滤练习

  1. 读取交易记录
  2. 按照PracticeSecurity对交易数据进行过滤
  3. 输出结果

思路1:采用join操作

  • 步骤1:获取输入RDD
        JavaRDD<PracticePojo> inputRDD = this.getInputRDD(PracticePojo.class);
        JavaRDD<PracticeSecurityPojo> securityRDD = this.getInputRDD(PracticeSecurityPojo.class);
  • 步骤2:将两个RDD转换为PairRDD,因为仅有PairRDD支持join操作
        JavaPairRDD<String, PracticePojo> pairInputRDD = inputRDD.mapToPair(
                practicePojo -> new Tuple2(practicePojo.getSecurityId(), practicePojo));

        JavaPairRDD<String, PracticeSecurityPojo> pairSecurityRDD = securityRDD.mapToPair(
                practiceSecurityPojo -> new Tuple2<>(practiceSecurityPojo.getSecurityId(), practiceSecurityPojo));
  • 步骤3:执行join操作,并返回结果
return pairInputRDD
                .join(pairSecurityRDD)
                .map(new Function<Tuple2<String, Tuple2<PracticePojo, PracticeSecurityPojo>>, PracticePojo>() {
                    @Override
                    public PracticePojo call(Tuple2<String, Tuple2<PracticePojo, PracticeSecurityPojo>> v1) throws Exception {
                        return v1._2._1;
                    }
                });

这里没有采用lambda表达式方式,因为类型提示比较有用。
我们可以看到join方法的返回是Tuple2<String, Tuple2<PracticePojo, PracticeSecurityPojo>>
举个例子说明:
数据集1:[(1,3,4) ; (1,5,6) ]
数据集2:[(1,2)]
如果按照第一个字段进行join
结果为:[ (1, ( (1,3,4) , (1,2) ) ,
(1, ( (1,5,6) , (1,2) ) ]
在练习中,需要返回的是[(1,3,4);(1,5,6)],所以需要获得 v1._2._1

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容