背景
用Spark作数据计算框架,将计算结果写入传统关系数据库,例如MySQL,供业务查询,这是工作中经常使用的模式。
在写入MySQL时,经常要加个自增的ID字段。
第一种方案,可以手动创建数据表,定义自增ID字段,Spark写入时用追加模式,ID设为空即可。
第二种方案,Spark写之前就生成好自增ID,直接覆盖写入MySQL。
实际中,我们使用更多的是覆盖写入(自动创建表),所以本文介绍一下方案二的实现。
实现
- Schema添加一列:ID
DataFrame df = ...
StructType schema = df.schema().add(DataTypes.createStructField("id", DataTypes.LongType, false));
- 使用RDD的zipWithIndex得到索引,作为ID值:
JavaRDD<Row> rdd = df
.javaRDD() // 转为JavaRDD
.zipWithIndex() // 添加索引,结果为JavaPairRDD<Row, Long>,即行数据和对应的索引
.map(new Function<Tuple2<Row, Long>, Row>() {
@Override
public Row call(Tuple2<Row, Long> v1) throws Exception {
Object[] objects = new Object[v1._1.size() + 1];
for (int i = 0; i < v1._1.size(); i++) {
objects[i] = v1._1.get(i);
}
objects[objects.length - 1] = v1._2;
return RowFactory.create(objects);
}
}); // 把索引值作为ID字段值,构造新的行数据
- 将RDD再转回DataFrame
df = sqlContext.createDataFrame(rdd, schema);
- 使用Overwrite模式写入MySQL
Properties props = new Properties();
props.setProperty("user", "user");
props.setProperty("password", "password");
props.setProperty("driver", "com.mysql.jdbc.Driver"));
df
.write()
.mode(SaveMode.Overwrite) // 覆盖模式,自动创建表
.jdbc("jdbcUrl", "tableName", props);