SparkSql操作Hive数据入Mysql中的注意事项和问题

Version

 Spark:1.6.2
 Hive  : 1.1.0

先看下代码:::::::::

    object SparkSql_Hive_Mysql {
        def main(args: Array[String]): Unit = {
            val url = "jdbc:mysql://0.0.0.0:3306/data?characterEncoding=UTF-8&serverTimezone=CST"
            val tableName = "test"

            val prop = new Properties()
                  prop.setProperty("user", "username")
                  prop.setProperty("password", "pwd")
                  prop.setProperty("useSSL", "false");

            val conf = new SparkConf()
                    .setAppName("test")
                    .setMaster("local[5]")
            val sc = new SparkContext(conf)
            val sqlContext = new HiveContext(sc)

            val sql = "SELECT * FROM test"
            val retDF = sqlContext.sql(sql)

            /**
              * 将结果集已追加的方式写入 mysql 数据库
              * Overwrite 覆盖
              * Append    追加
              * Ignore    忽略
              */
            retDF.write.mode(SaveMode.Overwrite) jdbc(url, tableName, prop)
            sc.stop()
      }
    }

使用封装好的写入方式:::::

            retDF.write.mode(SaveMode.Overwrite) jdbc(url, tableName, prop)

现在说下我遇到的问题:::::
在任务执行过程中出现了超过Mysql 数据库链接的最大数

下面看下这个方法的源码:::::

       def saveTable(
             df: DataFrame,
             url: String,
             table: String,
             properties: Properties) {
           val dialect = JdbcDialects.get(url)
           val nullTypes: Array[Int] = df.schema.fields.map { field =>
                   getJdbcType(field.dataType, dialect).jdbcNullType
             }
           val rddSchema = df.schema
           val getConnection: () => Connection = createConnectionFactory(url, properties)
           val batchSize = properties.getProperty("batchsize", "1000").toInt
                 df.foreachPartition { iterator =>
           savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect)
       }
     }
#并且链接也都在使用后进行了关闭回收::::
       } finally {
             if (!committed) {
               // The stage must fail.  We got here through an exception path, so
               // let the exception through unless rollback() or close() want to
               // tell the user about another problem.
               if (supportsTransactions) {
                     conn.rollback()
                   }
               conn.close()
           } else {
             // The stage must succeed.  We cannot propagate any exception close() might throw.
             try {
                 conn.close()
           } catch {
               case e: Exception => logWarning("Transaction succeeded, but closing failed", e)
         }
       }
     }

也就数说再不能增加Mysql最大链接数配置的情况下,使用

Spark 中自带的 coalesce(“numPartition”)方法尽量减少分区数

以此来减少创建的Mysql 连接数,来减少因为超过最大连接数而导致的问题。

至于为什么用coalesce方法而不用repartition方法我在另一篇文章中有讲到。
感兴趣可以点击查看!传送门在此

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容