输出MySQL的表
需要向MySQLstats_visitor_basic
表和stats_visitor_browser
表中插入数据,插入数据的sql语句为:
query-mapping.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>new_visitor_basic</name>
<value>
INSERT INTO `stats_visitor_basic`(
`platform_dimension_id`,
`date_dimension_id`,
`new_install_users`,
`created`)
VALUES(?, ?, ?, ?)
ON DUPLICATE KEY UPDATE `new_install_users` = ?
</value>
</property>
<property>
<name>new_visitor_browser</name>
<value>
INSERT INTO `stats_visitor_browser`(
`platform_dimension_id`,
`date_dimension_id`,
`browser_dimension_id`,
`new_install_users`,
`created`)
VALUES(?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE
`new_install_users` = ?
</value>
</property>
<configuration>
sql语句参数设置
不同sql语句需要设置的参数个数不同。因此针对不同的表,需要使用不同的参数设置方法。
参数设置接口
/**
* 定义具体mapreduce对应的输出操作代码
*
* @author liangxw
*/
public interface IOutputCollector {
/**
* 定义具体执行sql数据插入的方法
*/
void setArgs(Configuration conf, Dimension key, OutputValue value,
PreparedStatement pstmt, IDimensionHandler idh
) throws IOException;
}
具体的参数设置方式
对于stats_visitor_basic
表的参数设置:
public class NewVisitorBasicCollector implements IOutputCollector {
@Override
public void setArgs(Configuration conf, Dimension key, OutputValue value,
PreparedStatement pstmt, IDimensionHandler rpcConn)
throws IOException {
UserStatsDimension userBehavior = (UserStatsDimension) key;
IntWritable newVisitorsBasic = (IntWritable) value.getNumberMap().get(new IntWritable(-1));
int i = 0;
try {
pstmt.setInt(++i, rpcConn.getDimensionId(userBehavior.getCommonDimension().getPlatformDimension()));
pstmt.setInt(++i, rpcConn.getDimensionId(userBehavior.getCommonDimension().getDateDimension()));
pstmt.setInt(++i, newVisitorsBasic.get());
pstmt.setString(++i, conf.get(GlobalConstant.RUNNING_DATE));
pstmt.setInt(++i, newVisitorsBasic.get());
// 添加一次预处理参数
pstmt.addBatch();
} catch (SQLException e) {
throw new IOException("sql异常", e);
}
}
}
对于stats_visitor_browser
表的参数设置:
public class NewVisitorBrowserCollector implements IOutputCollector {
@Override
public void setArgs(Configuration conf, Dimension key, OutputValue value, PreparedStatement pstmt,
IDimensionHandler convertere) throws IOException {
UserStatsDimension userBehavior = (UserStatsDimension) key;
IntWritable newVisitorsBrowser = (IntWritable) value.getNumberMap().get(new IntWritable(-1));
int i = 0;
try {
pstmt.setInt(++i, convertere.getDimensionId(userBehavior.getCommonDimension().getPlatformDimension()));
pstmt.setInt(++i, convertere.getDimensionId(userBehavior.getCommonDimension().getDateDimension()));
pstmt.setInt(++i, convertere.getDimensionId(userBehavior.getBrowserD()));
pstmt.setInt(++i, newVisitorsBrowser.get());
pstmt.setString(++i, conf.get(GlobalConstant.RUNNING_DATE));
pstmt.setInt(++i, newVisitorsBrowser.get());
// 批量执行
pstmt.addBatch();
} catch (SQLException e) {
throw new IOException("sql异常", e);
}
}
}
自定义输出到MySQL的OutputFormat类
/**
* 自定义输出到mysql的OutputFormat类
*
* @author liangxw
*/
public class MysqlOutputFormat extends OutputFormat<Dimension, OutputValue> {
/*
返回一个具体定义如何输出数据的对象, RecordWriter被称为数据的输出器
*/
@Override
public RecordWriter<Dimension, OutputValue> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
Connection conn;
Configuration conf = context.getConfiguration();
// 远程访问RPC服务
IDimensionHandler rpcConn = DimensionHandlerClient.createDimensionConnector(conf);
try {
conn = JdbcManager.getConnection(GlobalConstant.MYSQL_DATABASE);
conn.setAutoCommit(false); // 关闭自动提交机制
} catch (Exception e) {
throw new RuntimeException("获取数据库连接失败", e);
}
return new MySQLRecordWriter(conn, conf, rpcConn);
}
@Override
public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
// 这个方法在自己实现的时候不需要关注,如果你非要关注,最多检查一下表数据存在
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
return new FileOutputCommitter(FileOutputFormat.getOutputPath(context), context);
}
/**
* 自定义的数据输出器
*/
private class MySQLRecordWriter extends RecordWriter<Dimension, OutputValue> {
private Connection conn = null;
private Configuration conf = null;
private IDimensionHandler rpcConn = null;
// kpiSqlMap中,存放Kpi以及对应的sql语句
private Map<KpiType, PreparedStatement> kpiSqlMap = new HashMap<>();
// kpiNumberMap中,存放kpi以及对应出现的次数
private Map<KpiType, Integer> kpiNumberMap = new HashMap<>();
MySQLRecordWriter(Connection conn, Configuration conf, IDimensionHandler rpcConn) {
super();
this.conn = conn;
this.conf = conf;
this.rpcConn = rpcConn;
}
// 当Reduce调用context.write()时,底层调用的是该方法
@Override
public void write(Dimension key, OutputValue value) throws IOException, InterruptedException {
KpiType kpiType = value.getKpiType();
//从query-mappiing.xml中拿出sql字符串
String sql = this.conf.get(kpiType.name);
PreparedStatement pstmt;
int count = 1;
try {
pstmt = kpiSqlMap.get(kpiType);
if (pstmt == null) {// 第一次创建
pstmt = this.conn.prepareStatement(sql);
kpiSqlMap.put(kpiType, pstmt);
} else {// 表示已经存在
if (!kpiNumberMap.containsKey(kpiType)) {
kpiNumberMap.put(kpiType, count);
}
count = kpiNumberMap.get(kpiType);
count++;
}
kpiNumberMap.put(kpiType, count);
/*针对不同的Kpi(不同的表)有不同的参数设置方法*/
String collectorClassName = conf.get(GlobalConstant.OUTPUT_COLLECTOR_PREFIX + kpiType.name);
Class<?> clazz = Class.forName(collectorClassName);
// 创建对象, 要求实现子类一定要有一个无参数的构造方法
IOutputCollector collector = (IOutputCollector) clazz.newInstance();
collector.setArgs(conf, key, value, pstmt, rpcConn);
// 批量执行
if (count % conf.getInt(GlobalConstant.JDBC_BATCH_NUMBER, GlobalConstant.DEFAULT_JDBC_BATCH_NUMBER) == 0) {
pstmt.executeBatch();
conn.commit();
// 移除已经执行过的Kpi
kpiNumberMap.remove(kpiType);
}
} catch (Exception e) {
throw new IOException("数据输出产生异常", e);
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
// 关闭资源使用,最终一定会调用
try {
try {
for (Map.Entry<KpiType, PreparedStatement> entry : this.kpiSqlMap.entrySet()) {
entry.getValue().executeBatch();
}
} catch (Exception e) {
throw new IOException("输出数据出现异常", e);
} finally {
try {
if (conn != null) {
conn.commit();
}
} catch (Exception e) {
// nothings
} finally {
if (conn != null) {
for (Map.Entry<KpiType, PreparedStatement> entry : this.kpiSqlMap.entrySet()) {
try {
entry.getValue().close();
} catch (SQLException e) {
// nothings
}
}
try {
conn.close();
} catch (SQLException e) {
// nothings
}
}
}
}
} finally {
// 关闭远程连接
DimensionHandlerClient.stopDimensionHandlerProxy(rpcConn);
}
}
}
}