自定义输出多个MySQL表的OutputFormat

输出MySQL的表

需要向MySQLstats_visitor_basic表和stats_visitor_browser表中插入数据,插入数据的sql语句为:

query-mapping.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>new_visitor_basic</name>
        <value>
            INSERT INTO `stats_visitor_basic`(
            `platform_dimension_id`,
            `date_dimension_id`,
            `new_install_users`,
            `created`)
            VALUES(?, ?, ?, ?)
            ON DUPLICATE KEY UPDATE `new_install_users` = ?
        </value>
    </property>

    <property>
        <name>new_visitor_browser</name>
        <value>
            INSERT INTO `stats_visitor_browser`(
            `platform_dimension_id`,
            `date_dimension_id`,
            `browser_dimension_id`,
            `new_install_users`,
            `created`)
            VALUES(?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE
            `new_install_users` = ?
        </value>
    </property>
<configuration>

sql语句参数设置

不同sql语句需要设置的参数个数不同。因此针对不同的表,需要使用不同的参数设置方法。

参数设置接口

/**
 * 定义具体mapreduce对应的输出操作代码
 *
 * @author liangxw
 */

public interface IOutputCollector {

    /**
     * 定义具体执行sql数据插入的方法
     */
    void setArgs(Configuration conf, Dimension key, OutputValue value,
                 PreparedStatement pstmt, IDimensionHandler idh
    ) throws IOException;
}

具体的参数设置方式

对于stats_visitor_basic表的参数设置:

public class NewVisitorBasicCollector implements IOutputCollector {

    @Override
    public void setArgs(Configuration conf, Dimension key, OutputValue value,
                        PreparedStatement pstmt, IDimensionHandler rpcConn)
            throws IOException {

        UserStatsDimension userBehavior = (UserStatsDimension) key;
        IntWritable newVisitorsBasic = (IntWritable) value.getNumberMap().get(new IntWritable(-1));

        int i = 0;
        try {
            pstmt.setInt(++i, rpcConn.getDimensionId(userBehavior.getCommonDimension().getPlatformDimension()));
            pstmt.setInt(++i, rpcConn.getDimensionId(userBehavior.getCommonDimension().getDateDimension()));
            pstmt.setInt(++i, newVisitorsBasic.get());
            pstmt.setString(++i, conf.get(GlobalConstant.RUNNING_DATE));
            pstmt.setInt(++i, newVisitorsBasic.get());

            // 添加一次预处理参数
            pstmt.addBatch();
        } catch (SQLException e) {
            throw new IOException("sql异常", e);
        }
    }

}

对于stats_visitor_browser表的参数设置:

public class NewVisitorBrowserCollector implements IOutputCollector {

    @Override
    public void setArgs(Configuration conf, Dimension key, OutputValue value, PreparedStatement pstmt,
                        IDimensionHandler convertere) throws IOException {

        UserStatsDimension userBehavior = (UserStatsDimension) key;
        IntWritable newVisitorsBrowser = (IntWritable) value.getNumberMap().get(new IntWritable(-1));

        int i = 0;
        try {
            pstmt.setInt(++i, convertere.getDimensionId(userBehavior.getCommonDimension().getPlatformDimension()));
            pstmt.setInt(++i, convertere.getDimensionId(userBehavior.getCommonDimension().getDateDimension()));
            pstmt.setInt(++i, convertere.getDimensionId(userBehavior.getBrowserD()));
            pstmt.setInt(++i, newVisitorsBrowser.get());
            pstmt.setString(++i, conf.get(GlobalConstant.RUNNING_DATE));
            pstmt.setInt(++i, newVisitorsBrowser.get());

            // 批量执行
            pstmt.addBatch();
        } catch (SQLException e) {
            throw new IOException("sql异常", e);
        }
    }

}

自定义输出到MySQL的OutputFormat类

/**
 * 自定义输出到mysql的OutputFormat类
 *
 * @author liangxw
 */
public class MysqlOutputFormat extends OutputFormat<Dimension, OutputValue> {

    /*
      返回一个具体定义如何输出数据的对象, RecordWriter被称为数据的输出器
    */
    @Override
    public RecordWriter<Dimension, OutputValue> getRecordWriter(TaskAttemptContext context)
            throws IOException, InterruptedException {

        Connection conn;
        Configuration conf = context.getConfiguration();
        // 远程访问RPC服务
        IDimensionHandler rpcConn = DimensionHandlerClient.createDimensionConnector(conf);

        try {
            conn = JdbcManager.getConnection(GlobalConstant.MYSQL_DATABASE);
            conn.setAutoCommit(false); // 关闭自动提交机制
        } catch (Exception e) {
            throw new RuntimeException("获取数据库连接失败", e);
        }
        return new MySQLRecordWriter(conn, conf, rpcConn);
    }

    @Override
    public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
        // 这个方法在自己实现的时候不需要关注,如果你非要关注,最多检查一下表数据存在
    }

    @Override
    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
        return new FileOutputCommitter(FileOutputFormat.getOutputPath(context), context);
    }

    /**
     * 自定义的数据输出器
     */
    private class MySQLRecordWriter extends RecordWriter<Dimension, OutputValue> {

        private Connection conn = null;
        private Configuration conf = null;
        private IDimensionHandler rpcConn = null;

        // kpiSqlMap中,存放Kpi以及对应的sql语句
        private Map<KpiType, PreparedStatement> kpiSqlMap = new HashMap<>();
        // kpiNumberMap中,存放kpi以及对应出现的次数
        private Map<KpiType, Integer> kpiNumberMap = new HashMap<>();

        MySQLRecordWriter(Connection conn, Configuration conf, IDimensionHandler rpcConn) {
            super();
            this.conn = conn;
            this.conf = conf;
            this.rpcConn = rpcConn;
        }

        // 当Reduce调用context.write()时,底层调用的是该方法
        @Override
        public void write(Dimension key, OutputValue value) throws IOException, InterruptedException {

            KpiType kpiType = value.getKpiType();

            //从query-mappiing.xml中拿出sql字符串
            String sql = this.conf.get(kpiType.name);

            PreparedStatement pstmt;

            int count = 1;
            try {
                pstmt = kpiSqlMap.get(kpiType);

                if (pstmt == null) {// 第一次创建
                    pstmt = this.conn.prepareStatement(sql);
                    kpiSqlMap.put(kpiType, pstmt);
                } else {// 表示已经存在
                    if (!kpiNumberMap.containsKey(kpiType)) {
                        kpiNumberMap.put(kpiType, count);
                    }
                    count = kpiNumberMap.get(kpiType);
                    count++;
                }
                kpiNumberMap.put(kpiType, count);

                /*针对不同的Kpi(不同的表)有不同的参数设置方法*/
                String collectorClassName = conf.get(GlobalConstant.OUTPUT_COLLECTOR_PREFIX + kpiType.name);
                Class<?> clazz = Class.forName(collectorClassName);
                // 创建对象, 要求实现子类一定要有一个无参数的构造方法
                IOutputCollector collector = (IOutputCollector) clazz.newInstance();
                collector.setArgs(conf, key, value, pstmt, rpcConn);

                // 批量执行
                if (count % conf.getInt(GlobalConstant.JDBC_BATCH_NUMBER, GlobalConstant.DEFAULT_JDBC_BATCH_NUMBER) == 0) {
                    pstmt.executeBatch();
                    conn.commit();
                    // 移除已经执行过的Kpi
                    kpiNumberMap.remove(kpiType);
                }
            } catch (Exception e) {
                throw new IOException("数据输出产生异常", e);
            }
        }

        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            // 关闭资源使用,最终一定会调用
            try {

                try {
                    for (Map.Entry<KpiType, PreparedStatement> entry : this.kpiSqlMap.entrySet()) {
                        entry.getValue().executeBatch();
                    }
                } catch (Exception e) {
                    throw new IOException("输出数据出现异常", e);
                } finally {
                    try {
                        if (conn != null) {
                            conn.commit();
                        }
                    } catch (Exception e) {
                        // nothings
                    } finally {
                        if (conn != null) {
                            for (Map.Entry<KpiType, PreparedStatement> entry : this.kpiSqlMap.entrySet()) {
                                try {
                                    entry.getValue().close();
                                } catch (SQLException e) {
                                    // nothings
                                }
                            }
                            try {
                                conn.close();
                            } catch (SQLException e) {
                                // nothings
                            }
                        }
                    }
                }
            } finally {
                // 关闭远程连接
                DimensionHandlerClient.stopDimensionHandlerProxy(rpcConn);
            }
        }

    }

}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352

推荐阅读更多精彩内容