sink demo
public class ClickHourseSink extends RichSinkFunction<clickDemo> {
private static final long serialVersionUID = -2007127739007206915L;
private PreparedStatement ps;
private Connection connection;
private String user;
private String passwd;
private String url;
public ClickHourseSink() {
}
public ClickHourseSink(String user, String passwd, String url) {
this.user = user;
this.passwd = passwd;
this.url = url;
}
/**
* open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection(url, user, passwd);
String sql = "insert into jdbc_example(day,name,age) values(?, ?, ?)";
ps = this.connection.prepareStatement(sql);
}
@Override
public void close() throws Exception {
super.close();
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
}
/**
* 每条数据的插入都要调用一次 invoke() 方法
*
* @param value
* @param context
* @throws Exception
*/
@Override
public void invoke(clickDemo value, Context context) throws Exception {
//组装数据,执行插入操作
ps.setDate(1, new Date(System.currentTimeMillis()));
ps.setString(2, value.getName());
ps.setInt(3, value.getAge());
ps.executeUpdate();
}
private static Connection getConnection(String url, String user, String pass) {
Connection con = null;
try {
// Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
// con = DriverManager.getConnection(
// "jdbc:clickhouse://192.168.192.145:9000/default");
Class.forName("com.github.housepower.jdbc.ClickHouseDriver");
con = DriverManager.getConnection(
"jdbc:clickhouse://192.168.192.145:8123/default");
System.out.println(con);
} catch (Exception e) {
System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
}
return con;
}
/** 测试ck*/
public static void main(String[] args) throws SQLException {
Connection connection = getConnection("jdbc:clickhouse://127.0.0.1:9000", "", "");
Statement statement = connection.createStatement();
statement.execute("show databases");
// statement.executeQuery("create table jdbc_example(day Date, name String, age UInt8) Engine=Log");
// 新增
// PreparedStatement pstmt = connection.prepareStatement("insert into jdbc_example values(?, ?, ?)");
//
// // insert 10 records
// for (int i = 0; i < 10; i++) {
// pstmt.setDate(1, new Date(System.currentTimeMillis()));
// pstmt.setString(2, "panda_" + (i + 1));
// pstmt.setInt(3, 18);
// pstmt.addBatch();
// }
// pstmt.executeBatch();
String sql = "select * from jdbc_example";
ResultSet rs = statement.executeQuery(sql);
while (rs.next()) {
System.out.println(rs.getDate(1) + ", " + rs.getString(2) + ", " + rs.getInt(3));
}
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class clickDemo {
private Integer age;
private String name;
private Date day;
}
依赖 两个选一个
<!--第三方驱动 默认端口9000 -->
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2</version>
</dependency>
<!--第三方驱动 默认端口8123-->
<dependency>
<groupId>com.github.housepower</groupId>
<artifactId>clickhouse-native-jdbc</artifactId>
<version>2.0-stable</version>
</dependency>