Spark读取MySQL数据的Java代码


import java.io.InputStream;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;

import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;

public class MysqlSparkConnector {

    private static Logger logger = Logger.getLogger(MysqlSparkConnector.class);

    private static String confilePrefix = "druid.properties";

    private static String driver = null;
    private static String dbHost = null;
    private static String dbPort = null;
    private static String username = null;
    private static String password = null;

    private static String environmentArray[] = { "development", "query", "master", "sandbox" };

    /**
     * 用Spark从MySQL中查询数据
     *      可在同一个Mysql数据库中联表查询
     * 
     * @param spark
     * @param dbEnv
     * @param dbName
     * @param tables
     * @param sql
     * @return
     */
    public static Dataset<Row> queryBySparkFromMysql(SparkSession spark, String dbEnv, String dbName, String[] tables,
            String sql) {

        if (!readProperties(dbEnv, dbName)) {
            logger.error("read properties error, please check configuration file: " + dbName + "." + dbEnv + "." + confilePrefix);
            return null;
        }

        String url = "jdbc:mysql://" + dbHost + ":" + dbPort + "/" + dbName;

        Properties connectionProperties = new Properties();
        connectionProperties.put("driver", driver);
        connectionProperties.put("user", username);
        connectionProperties.put("password", password);
        
        for (int i = 0; i < tables.length; i++) {
            spark.read().jdbc(url, tables[i], connectionProperties).createOrReplaceTempView(tables[i]);
        }
        
        return spark.sql(sql);
    }
    
    /**
     * 用Spark从MySQL中查询数据
     *      分片查询数据量大的表
     * 
     * @param spark
     * @param dbEnv
     * @param dbName
     * @param table
     * @param sql
     * @param partitionColumn   用于分片的字段,必须是整数类型
     * @param lowerBound        分片的下界
     * @param upperBound        分片的上界
     * @param numPartitions     要创建的分片数
     * @return
     */
    public static Dataset<Row> queryBySparkFromMysql(SparkSession spark, String dbEnv, String dbName, String[] tables,
            String sql, String partitionColumn, long lowerBound, long upperBound, int numPartitions) {

        if (!readProperties(dbEnv, dbName)) {
            logger.error("read properties error, please check configuration file: " + dbName + "." + dbEnv + "." + confilePrefix);
            return null;
        }

        String url = "jdbc:mysql://" + dbHost + ":" + dbPort + "/" + dbName;

        Properties connectionProperties = new Properties();
        connectionProperties.put("driver", driver);
        connectionProperties.put("user", username);
        connectionProperties.put("password", password);
        
        /**
         * Spark默认的jdbc并发度为1,所有的数据都会在一个partition中进行操作,无论你给的资源有多少,只有会有一个task在执行任务,如果查询的表比较大的话,很容易就会内存溢出
         * 所以当数据量达到百万甚至千万以上时,就需要对数据源进行分片查询,即多个进程同时查询部分数据,避免一次性查出的数据过多导致内存溢出
         *      如:
         *          partitionColumn : id
         *          lowerBound : 0
         *          upperBound : 10000
         *          numPartitions : 10
         *      只会找出(upperBound-lowerBound)=10000条记录,根据id字段分numPartitions=10次查询,每次找出(upperBound-lowerBound)/numPartitions=1000条记录
         *          select * from table where id between 0 and 1000
         *          select * from table where id between 1000 and 2000
         *              ......
         *          select * from table where id between 9000 and 10000
         */
        
        for (int i = 0; i < tables.length; i++) {           
            spark.read().jdbc(url, tables[i], partitionColumn, lowerBound, upperBound, numPartitions, connectionProperties).createOrReplaceTempView(tables[i]);
        }
        
        return spark.sql(sql);
    }
    
    /**
     * 用Spark从MySQL中查询数据
     *      可连接多个Mysql数据库后联表查询
     * 
     *          {
     *              "cets_swapdata": [
     *                  {
     *                      "isPartition": true,
     *                      "partitionInfo": {
     *                          "upperBound": 500000000,
     *                          "numPartitions": 100,
     *                          "partitionColumn": "id",
     *                          "lowerBound": 0
     *                      },
     *                      "table": "fba_inventory"
     *                  }
     *              ],
     *              "cets": [
     *                  {
     *                      "isPartition": false,
     *                      "table": "stock_location"
     *                  }
     *              ]
     *          }
     * 
     * @param spark
     * @param dbEnv
     * @param dbTableJson
     * @param sql
     * @return
     * @throws JSONException 
     */
    public static Dataset<Row> jointQueryBySparkFromMysql(SparkSession spark, String dbEnv, JSONObject dbTableJson,
            String sql) {
        
        String url = null;
        Properties connectionProperties = null;
        String dbName = null;
        
        JSONArray tablesJSONArray = null;
        JSONObject tableInfo = null;
        
        String tableName = null;
        boolean isPartition = false;
        JSONObject partitionInfo = null;
        
        String partitionColumn = null;
        long lowerBound = 0;
        long upperBound = 100;
        int numPartitions = 1;
        
        try {
            @SuppressWarnings("unchecked")
            Iterator<String> iterator = dbTableJson.keys();
            while (iterator.hasNext()) {
                dbName = iterator.next();
                tablesJSONArray = dbTableJson.getJSONArray(dbName);
                
                if (!readProperties(dbEnv, dbName)) {
                    logger.error("read properties error, please check configuration file: " + dbName + "." + dbEnv + "." + confilePrefix);
                    return null;
                }
                
                url = "jdbc:mysql://" + dbHost + ":" + dbPort + "/" + dbName;
                
                connectionProperties = new Properties();
                connectionProperties.put("driver", driver);
                connectionProperties.put("user", username);
                connectionProperties.put("password", password);
                
                for (int i = 0; i < tablesJSONArray.length(); i++) {
                    tableInfo = tablesJSONArray.getJSONObject(i);
                    
                    tableName = tableInfo.getString("table");
                    isPartition = tableInfo.getBoolean("isPartition");
                    if (isPartition) {
                        partitionInfo = tableInfo.getJSONObject("partitionInfo");
                        
                        // 数据量大的表,分片读取
                        partitionColumn = partitionInfo.getString("partitionColumn");
                        lowerBound = partitionInfo.getLong("lowerBound");
                        upperBound = partitionInfo.getLong("upperBound");
                        numPartitions = partitionInfo.getInt("numPartitions");
                        
                        spark.read().jdbc(url, tableName, partitionColumn, lowerBound, upperBound, numPartitions, connectionProperties).createOrReplaceTempView(tableName);
                        
                    } else {
                        // 数据量小的表,一次读取
                        spark.read().jdbc(url, tableName, connectionProperties).createOrReplaceTempView(tableName);
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        
        return spark.sql(sql);
    }
    
    /**
     * 用Spark从MySQL中查询数据
     *      可以根据任意字段分片查询
     * 
     * @param spark
     * @param dbEnv
     * @param dbName
     * @param tableName
     * @param predicates    查询条件,每个条件都是一个task,即分片数==数组长度
     * @return
     */
    public static Dataset<Row> queryBySparkFromMysqlByConditions(SparkSession spark, String dbEnv, String dbName,
            String tableName, String[] predicates) {
        
        if (!readProperties(dbEnv, dbName)) {
            logger.error("read properties error, please check configuration file: " + dbName + "." + dbEnv + "." + confilePrefix);
            return null;
        }

        String url = "jdbc:mysql://" + dbHost + ":" + dbPort + "/" + dbName;

        Properties connectionProperties = new Properties();
        connectionProperties.put("driver", driver);
        connectionProperties.put("user", username);
        connectionProperties.put("password", password);
        
        /**
         * 根据查询条件predicates分片进行查询,如根据时间类型的字段modifiedOn,一个task查询一天的数据:
         *      String[] predicates = {
         *          "modifiedOn >= '2017-09-02' and modifiedOn < '2017-09-03'",
         *          "modifiedOn >= '2017-09-03' and modifiedOn < '2017-09-04'",
         *          "modifiedOn >= '2017-09-04' and modifiedOn < '2017-09-05'",
         *              ......
         *          "modifiedOn >= '2017-09-17' and modifiedOn < '2017-09-18'",
         *          "modifiedOn >= '2017-09-18' and modifiedOn < '2017-09-19'"
         *      }
         * 
         *  也可以根据多个字段分片查询,就像是SQL语句中的where条件一样:
         *      String[] predicates = {
         *          "modifiedOn >= '2017-09-02' and modifiedOn < '2017-09-23'",
         *          "modifiedOn >= '2018' and modifiedOn < '2019'",
         *          "id < 998",
         *          "skuId = 'a17052200ux1459'"
         *      }
         * 
         *  注意:如果条件重复的话,是会查出重复数据来的,如:
         *      String[] predicates = {"id=1", "id=2", "id=2"}
         *      这样会找出三条记录,其中两条id=2的记录是一模一样的
         */
        
        return spark.read().jdbc(url, tableName, predicates, connectionProperties);
    }
    
    /**
     * 
     * 读取对应MySQL的配置信息
     * 
     * @param environment
     * @param dbName
     * @return
     */
    private static boolean readProperties(String environment, String dbName) {
        String errorMsg = "";
        boolean isContinue = true;

        String confile = null;
        InputStream is = null;

        try {
            // environment
            List<String> environmentList = Arrays.asList(environmentArray);
            if (!environmentList.contains(environment)) {
                errorMsg = "environment must be one of: " + environmentList.toString();
                isContinue = false;
            }

            // dbName
            if (CommonFunction.isEmptyOrNull(dbName)) {
                errorMsg = "dbName can not be null";
                isContinue = false;
            }

            // 读取配置文件
            if (isContinue) {
                confile = dbName + "." + environment + "." + confilePrefix;
                is = MysqlSparkConnector.class.getClassLoader().getResourceAsStream(confile);

                if (is == null) {
                    errorMsg = "resource file: [" + confile + "] not find.";
                    isContinue = false;
                }

                if (isContinue) {
                    Properties properties = new Properties();
                    properties.load(is);

                    driver = properties.getProperty("driverClassName");
                    dbHost = properties.getProperty("dbHost");
                    dbPort = properties.getProperty("dbPort");
                    username = properties.getProperty("username");
                    password = properties.getProperty("password");

                    if (isContinue) {
                        if (CommonFunction.isEmptyOrNull(driver)) {
                            errorMsg = "the driver can not be null, please set in confile: " + confile;
                            isContinue = false;
                        }
                    }

                    if (isContinue) {
                        if (CommonFunction.isEmptyOrNull(dbHost)) {
                            errorMsg = "the dbHost can not be null, please set in confile: " + confile;
                            isContinue = false;
                        }
                    }

                    if (isContinue) {
                        if (CommonFunction.isEmptyOrNull(dbPort)) {
                            errorMsg = "the dbPort can not be null, please set in confile: " + confile;
                            isContinue = false;
                        }
                    }

                    if (isContinue) {
                        if (CommonFunction.isEmptyOrNull(username)) {
                            errorMsg = "the username can not be null, please set in confile: " + confile;
                            isContinue = false;
                        }
                    }

                    if (isContinue) {
                        if (CommonFunction.isEmptyOrNull(password)) {
                            errorMsg = "the password can not be null, please set in confile: " + confile;
                            isContinue = false;
                        }
                    }

                }
            }

            if (!isContinue) {
                logger.error(errorMsg);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

        return isContinue;
    }

}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 221,135评论 6 514
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,317评论 3 397
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 167,596评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,481评论 1 296
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,492评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,153评论 1 309
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,737评论 3 421
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,657评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,193评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,276评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,420评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,093评论 5 349
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,783评论 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,262评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,390评论 1 271
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,787评论 3 376
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,427评论 2 359

推荐阅读更多精彩内容