flink实现exactly-once语义分为source,process和sink三个阶段,本片文章主要实现的是Process和sink阶段。在实现exactly-once中,process阶段可以通过开启checkpoint对中间状态进行存储,sink阶段需要实现事务或者两阶段提交,但是一部分数据库不能实现事务,所以两阶段提交用的更多一些。步骤如下:
1,项目开启checkpoint:
状态后端的文件地址我设置在了本地,为了方便查看,有条件的最好放在hdfs上。
2,加载kafka中的数据源,每一秒发送一次数据(为了自己观察)
3,实现两阶段提交的sink方法
public class MysqlTwoPhaseCommitSinkextends TwoPhaseCommitSinkFunction {
private static final long serialVersionUID =5234069130115628661L;
ObjectMappermapper =new ObjectMapper();
public MysqlTwoPhaseCommitSink() {
super(new KryoSerializer<>(Connection.class,new ExecutionConfig()), VoidSerializer.INSTANCE);
}
/** 执行入库操作 */
@Override
protected void invoke(Connection transaction, VehicleInfoToMysql value, Context context)throws Exception {
String sql ="insert into t_bosch_test_records(vin,samplingTime,samplingTimeSec,engineStatus,chargingStatus,runningModel)values(?,?,?,?,?,?)";
PreparedStatement ps = transaction.prepareStatement(sql);
String s =mapper.writeValueAsString(value);
System.out.println(s+"数据过来了。。。。。");
ps.setString(1,value.getVin());
ps.setLong(2,value.getSamplingTime());
ps.setLong(3,value.getSamplingTimeSec());
ps.setInt(4,value.getEngineStatus());
ps.setInt(5,value.getChargingStatus());
ps.setInt(6,value.getRunningModel());
//执行insert语句
int i = ps.executeUpdate();
System.out.println(i+"执行结果。。。。");
}
@Override
protected ConnectionbeginTransaction()throws Exception {
System.out.println("开始建立连接。。。。。。。");
String url ="jdbc:mysql://10.17.19.18:3306/integration_db?allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8&useSSL=false";
String driver ="com.mysql.jdbc.Driver";
Connection connection = DBConnectUtil.getConnection(driver,url, "root", "123");
return connection;
}
@Override
protected void preCommit(Connection transaction)throws Exception {
System.out.println("预提交开始。。。。。");
}
@Override
protected void commit(Connection transaction) {
System.out.println("手动提交开始。。。。");
DBConnectUtil.commit(transaction);
}
/** 回滚操作,如果invoke异常则回滚事务,下一次checkpoint操作也不会执行 */
@Override
protected void abort(Connection transaction) {
System.out.println("执行异常,事务回滚。。。。。。。。");
DBConnectUtil.rollback(transaction);
}
}
注意:执行sql语句时,如果是写入或者更新操作,应该用executeUpdate()方法,否则数据无法正常写入,如果是查询可以用execute()方法,但是在写入mysql时,数据虽然可以正常写入,但是代码会报错,我们用的是InnoDB,支持事务,可能两阶段提交有冲突,在之后写的clickhouse两阶段提交的代码时就能正常运行了。
附上代码中用到的连接工具代码:
package com.hh.bigdata.flink.utils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
/**
* @author PuWang
* @version 1.0
* @date 2021/8/13 13:06
*/
public class DBConnectUtil {
/**
* 获取连接
*
* @param url
* @param user
* @param password
* @return
* @throws SQLException
*/
public static ConnectiongetConnection(String driver,String url, String user, String password)throws SQLException {
Connection conn =null;
try {
Class.forName(driver);
}catch (ClassNotFoundException e) {
e.printStackTrace();
}
conn = DriverManager.getConnection(url, user, password);
//设置手动提交
conn.setAutoCommit(false);
return conn;
}
/**
* 提交事物
*/
public static void commit(Connection conn) {
if (conn !=null) {
try {
conn.commit();
}catch (SQLException e) {
e.printStackTrace();
}
finally {
close(conn);
}
}
}
/**
* 事物回滚
*
* @param conn
*/
public static void rollback(Connection conn) {
if (conn !=null) {
try {
conn.rollback();
}catch (SQLException e) {
e.printStackTrace();
}
finally {
close(conn);
}
}
}
/**
* 关闭连接
*
* @param conn
*/
public static void close(Connection conn) {
if (conn !=null) {
try {
conn.close();
}catch (SQLException e) {
e.printStackTrace();
System.out.println("连接关闭失败");
}
}
}
}
ClickhouseTwoPhaseCommitSink:
public class ClickhouseTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction {
private static final long serialVersionUID =5654334499924165551L;
ObjectMappermapper =new ObjectMapper();
public ClickhouseTwoPhaseCommitSink() {
super(new KryoSerializer<>(Connection.class,new ExecutionConfig()), VoidSerializer.INSTANCE);
}
/** 执行入库操作 */
@Override
protected void invoke(Connection transaction, VehicleInfoToMysql value, Context context)throws Exception {
String sql ="insert into t_test_records(vin,samplingTime,samplingTimeSec,engineStatus,chargingStatus,runningModel)values(?,?,?,?,?,?)";
PreparedStatement ps = transaction.prepareStatement(sql);
String s =mapper.writeValueAsString(value);
System.out.println(s+"数据过来了。。。。。");
ps.setString(1,value.getVin());
ps.setLong(2,value.getSamplingTime());
ps.setLong(3,value.getSamplingTimeSec());
ps.setInt(4,value.getEngineStatus());
ps.setInt(5,value.getChargingStatus());
ps.setInt(6,value.getRunningModel());
//执行insert语句
final int i = ps.executeUpdate();
System.out.println(i+"执行结果。。。。");
}
@Override
protected ConnectionbeginTransaction()throws Exception {
System.out.println("开始建立连接。。。。。。。");
String url ="jdbc:clickhouse://10.17.39.30:9000/test";
String driver="com.github.housepower.jdbc.ClickHouseDriver";
Connection connection = DBConnectUtil.getConnection(driver,url, "develop", "123");
return connection;
}
@Override
protected void preCommit(Connection transaction)throws Exception {
// System.out.println("预提交开始。。。。。");
}
@Override
protected void commit(Connection transaction) {
// System.out.println("手动提交开始。。。。");
DBConnectUtil.commit(transaction);
}
/** 回滚操作,如果invoke异常则回滚事务,下一次checkpoint操作也不会执行 */
@Override
protected void abort(Connection transaction) {
// System.out.println("执行异常,事务回滚。。。。。。。。");
DBConnectUtil.rollback(transaction);
}
}