1. 处理流程:
通过flink 从kafka 中获取到数据, 然后在sink 到hbase 中
数据结构
{"address":"深圳","age":20,"createTime":"2021-12-08 22:30","id":1,"name":"hdfs"}
2.Hbase 建表
hbase(main):002:0> create 'wudluser','cf', { NUMREGIONS => 15, SPLITALGO =>'HexStringSplit'}
Created table wudluser
Took 3.1878 seconds
=> Hbase::Table - wudluser
hbase(main):003:0>
3. kafak 使用命令
./kafka-topics.sh --zookeeper 192.168.1.161:2181 --create --topic wudltopic --replication-factor 1 --partitions 1
./kafka-console-producer.sh --broker-list 192.168.1.161:6667 --topic wudltopic
./kafka-console-consumer.sh --bootstrap-server 192.168.1.161:6667 --topic wudltopic
4. 项目结构:
主要的类: HbaseSink 类
4.1 pom 文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- <parent>-->
<!-- <artifactId>Flink-learning</artifactId>-->
<!-- <groupId>com.wudl.flink</groupId>-->
<!-- <version>1.0-SNAPSHOT</version>-->
<!-- </parent>-->
<groupId>org.wudlflink13</groupId>
<version>1.0-SNAPSHOT</version>
<modelVersion>4.0.0</modelVersion>
<artifactId>wudl-flink-13</artifactId>
<!-- 指定仓库位置,依次为aliyun、apache和cloudera仓库 -->
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>apache</id>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>spring-plugin</id>
<url>https://repo.spring.io/plugins-release/</url>
</repository>
</repositories>
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.11</scala.version>
<flink.version>1.13.3</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>1.12.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hbase-2.2_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.10.3</version>
</dependency>
<!--依赖Scala语言-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- blink执行计划,1.11+默认的-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink连接器-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
<exclusions>
<exclusion>
<artifactId>flink-streaming-java_2.11</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-runtime_2.11</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<!-- <exclusion>-->
<!-- <artifactId>flink-core</artifactId>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- </exclusion>-->
<exclusion>
<artifactId>flink-java</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>2.1.0</version>
<exclusions>
<exclusion>
<artifactId>hadoop-hdfs</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.1.0</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.7.5-10.0</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.0</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
<!--<version>8.0.20</version>-->
</dependency>
<!-- 高性能异步组件:Vertx-->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-jdbc-client</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-redis-client</artifactId>
<version>3.9.0</version>
</dependency>
<!-- 日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.3</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hbase_2.11</artifactId>
<version>1.10.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
4.2 User 类
package com.wudl.flink.hbase.model;
import com.alibaba.fastjson.JSON;
import lombok.Data;
/**
* @author :wudl
* @date :Created in 2021-12-08 22:30
* @description: {"address":"深圳","age":20,"createTime":"2021-12-08 22:30","id":1,"name":"hdfs"}
* @modified By:
* @version: 1.0
*/
@Data
public class User {
private Long id;
private String name;
private int age;
private String address;
private String createTime;
public static void main(String[] args) {
User u = new User();
u.setId(1l);
u.setName("hdfs");
u.setAge(20);
u.setAddress("深圳");
u.setCreateTime("2021-12-08 22:30");
String s = JSON.toJSONString(u);
System.out.println(s);
}
}
4.3 自定义flink sinkHbase 类
package com.wudl.flink.hbase.slink;
import com.wudl.flink.hbase.model.User;
import com.wudl.flink.hbase.utils.DateUtils;
import com.wudl.flink.hbase.utils.HbaseUtils;
import com.wudl.flink.hbase.utils.RowKeyUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.ParseException;
/**
* @author :wudl
* @date :Created in 2021-12-08 21:43
* @description:
* @modified By:
* @version: 1.0
*/
public class HbaseSink extends RichSinkFunction<User> {
private final static Logger logger = LoggerFactory.getLogger(HbaseSink.class);
Connection conn = null;
private String tableName = "wudluser";
private BufferedMutator mutator = null;
@Override
public void open(Configuration parameters) throws Exception {
//从上下文获取到全局参数
super.open(parameters);
org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
//设置缓存对象的多大、多长时间刷写到HBase中
//缓存写入HBaes,与Kafka的缓存写入Kafka有异曲同工之秒
BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tableName));
//设置缓存达到一定的大小:10M
params.writeBufferSize(10 * 1024 * 1024L);
//设置缓存达到一定的时间:5s
params.setWriteBufferPeriodicFlushTimeoutMs(5 * 1000L);
conn = HbaseUtils.getConnect();
BufferedMutator bufferedMutator = conn.getBufferedMutator(params);
try {
mutator = conn.getBufferedMutator(params);
} catch (IOException e) {
logger.error("当前获取bufferedMutator 失败:" + e.getMessage());
}
}
//5. 重写 invoke 方法,将读取的数据写入到 hbase
@Override
public void invoke(User value, Context context) throws Exception {
//5.1 setDataSourcePut输入参数value,返回put对象
try {
Put put = setDataSourcePut(value);
// 可以批量写入
// List<Put> listPut = new ArrayList<>();
mutator.mutate(put);
//5.2 指定时间内的数据强制刷写到hbase
mutator.flush();
} catch (Exception ex) {
ex.printStackTrace();
logger.error("写入到hbase失败:" + ex.getMessage());
}
}
//4.重写close方法
@Override
public void close() throws Exception {
//4.1 关闭hbase 表和连接资源
HbaseUtils.close(mutator, conn);
}
//6. 实现 setDataSourcePut 方法
/**
* 每条对象生成一个 put
* 1.表名 2.rowkey 3.列簇 4.列名和列值
*
* @param user
* @return
*/
private Put setDataSourcePut(User user) throws ParseException {
byte[] rowKey = RowKeyUtils.getRowkey(user);
String cf = "cf";
Put put = new Put(rowKey);
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("id"), Bytes.toBytes(String.valueOf(user.getId())));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("name"), Bytes.toBytes(user.getName()));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("age"), Bytes.toBytes(String.valueOf(user.getAge())));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("address"), Bytes.toBytes(user.getAddress()));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("createTime"), Bytes.toBytes(user.getCreateTime()));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("updateDate"), Bytes.toBytes(DateUtils.getCurrentTime()));
return put;
}
}
4.4 时间工具类DateUtils
package com.wudl.flink.hbase.utils;
import org.apache.commons.lang3.StringUtils;
import javax.crypto.spec.PSource;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
/**
* @author :wudl
* @date :Created in 2021-12-08 23:16
* @description:
* @modified By:
* @version: 1.0
*/
public class DateUtils {
/**
* 获得当前日期 yyyy-MM-dd HH:mm:ss
*
* @return
*/
public static String getCurrentTime() {
// 小写的hh取得12小时,大写的HH取的是24小时
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = new Date();
return df.format(date);
}
/**
* 获取系统当前时间戳
*
* @return 1566889186583
*/
public static String getSystemTime() {
String current = String.valueOf(System.currentTimeMillis());
return current;
}
/**
* 获取当前日期 yy-MM-dd
*
* @return 2021-08-27
*/
public static String getDateByString() {
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
return sdf.format(date);
}
/**
* 得到两个时间差 格式yyyy-MM-dd HH:mm:ss
*
* @param start 2021-06-27 14:12:40
* @param end 2021-08-27 14:12:40
* @return 5270400000
*/
public static long dateSubtraction(String start, String end) {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
Date date1 = df.parse(start);
Date date2 = df.parse(end);
return date2.getTime() - date1.getTime();
} catch (ParseException e) {
e.printStackTrace();
return 0;
}
}
/**
* 得到两个时间差
*
* @param start 开始时间
* @param end 结束时间
* @return
*/
public static long dateTogether(Date start, Date end) {
return end.getTime() - start.getTime();
}
/**
* 转化long值的日期为yyyy-MM-dd HH:mm:ss.SSS格式的日期
*
* @param millSec 日期long值 5270400000
* @return 日期,以yyyy-MM-dd HH:mm:ss.SSS格式输出 1970-03-03 08:00:00.000
*/
public static String transferLongToDate(String millSec) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
Date date = new Date(Long.parseLong(millSec));
return sdf.format(date);
}
/**
* 获得当前日期 yyyy-MM-dd HH:mm:ss
*
* @return
*/
public static String getOkDate(String date) {
try {
if (StringUtils.isEmpty(date)) {
return null;
}
Date date1 = new SimpleDateFormat("EEE MMM dd HH:mm:ss Z yyyy", Locale.ENGLISH).parse(date);
//格式化
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return sdf.format(date1);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 获取当前日期是一个星期的第几天
*
* @return 2
*/
public static int getDayOfWeek() {
Calendar cal = Calendar.getInstance();
cal.setTime(new Date());
return cal.get(Calendar.DAY_OF_WEEK) - 1;
}
/**
* 判断当前时间是否在[startTime, endTime]区间,注意时间格式要一致
*
* @param nowTime 当前时间
* @param dateSection 时间区间 2021-01-08,2019-09-09
* @return
* @author jqlin
*/
public static boolean isEffectiveDate(Date nowTime, String dateSection) {
try {
String[] times = dateSection.split(",");
String format = "yyyy-MM-dd";
Date startTime = new SimpleDateFormat(format).parse(times[0]);
Date endTime = new SimpleDateFormat(format).parse(times[1]);
if (nowTime.getTime() == startTime.getTime()
|| nowTime.getTime() == endTime.getTime()) {
return true;
}
Calendar date = Calendar.getInstance();
date.setTime(nowTime);
Calendar begin = Calendar.getInstance();
begin.setTime(startTime);
Calendar end = Calendar.getInstance();
end.setTime(endTime);
if (isSameDay(date, begin) || isSameDay(date, end)) {
return true;
}
if (date.after(begin) && date.before(end)) {
return true;
} else {
return false;
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public static boolean isSameDay(Calendar cal1, Calendar cal2) {
if (cal1 != null && cal2 != null) {
return cal1.get(0) == cal2.get(0) && cal1.get(1) == cal2.get(1) && cal1.get(6) == cal2.get(6);
} else {
throw new IllegalArgumentException("The date must not be null");
}
}
public static long getTimeByDate(String time) {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
Date date = format.parse(time);
//日期转时间戳(毫秒)
return date.getTime();
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 获取当前小时 :2021-08-23 17
*
* @return 2021-08-27 17
*/
public static String getCurrentHour() {
GregorianCalendar calendar = new GregorianCalendar();
int hour = calendar.get(Calendar.HOUR_OF_DAY);
if (hour < 10) {
return DateUtils.getCurrentTime() + " 0" + hour;
}
return DateUtils.getDateByString() + " " + hour;
}
/**
* 获取当前时间一个小时前
* @return 2019-08-27 16
*/
public static String getCurrentHourBefore() {
GregorianCalendar calendar = new GregorianCalendar();
int hour = calendar.get(Calendar.HOUR_OF_DAY);
if (hour > 0) {
hour = calendar.get(Calendar.HOUR_OF_DAY) - 1;
if (hour < 10) {
return DateUtils.getDateByString() + " 0" + hour;
}
return DateUtils.getDateByString() + " " + hour;
}
//获取当前日期前一天
return DateUtils.getBeforeDay() + " " + 23;
}
/**
* 获取当前日期前一天
*
* @return 2021-08-26
*/
public static String getBeforeDay() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Date date = new Date();
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
calendar.add(Calendar.DAY_OF_MONTH, -1);
date = calendar.getTime();
return sdf.format(date);
}
/**
* 获取最近七天
*
* @return 2021-08-20
*/
public static String getServen() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Calendar c = Calendar.getInstance();
c.add(Calendar.DATE, -7);
Date monday = c.getTime();
String preMonday = sdf.format(monday);
return preMonday;
}
/**
* 获取最近一个月
*
* @return 2021-07-27
*/
public static String getOneMonth() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Calendar c = Calendar.getInstance();
c.add(Calendar.MONTH, -1);
Date monday = c.getTime();
String preMonday = sdf.format(monday);
return preMonday;
}
/**
* 获取最近三个月
*
* @return 2021-05-27
*/
public static String getThreeMonth() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Calendar c = Calendar.getInstance();
c.add(Calendar.MONTH, -3);
Date monday = c.getTime();
String preMonday = sdf.format(monday);
return preMonday;
}
/**
* 获取最近一年
*
* @return 2021-08-27
*/
public static String getOneYear() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Calendar c = Calendar.getInstance();
c.add(Calendar.YEAR, -1);
Date start = c.getTime();
String startDay = sdf.format(start);
return startDay;
}
private static int month = Calendar.getInstance().get(Calendar.MONTH) + 1;
/**
* 获取今年月份数据
* 说明 有的需求前端需要根据月份查询每月数据,此时后台给前端返回今年共有多少月份
*
* @return [1, 2, 3, 4, 5, 6, 7, 8]
*/
public static List getMonthList(){
List list = new ArrayList();
for (int i = 1; i <= month; i++) {
list.add(i);
}
return list;
}
/**
* 返回当前年度季度list
* 本年度截止目前共三个季度,然后根据1,2,3分别查询相关起止时间
* @return [1, 2, 3]
*/
public static List getQuartList(){
int quart = month / 3 + 1;
List list = new ArrayList();
for (int i = 1; i <= quart; i++) {
list.add(i);
}
return list;
}
public static void main(String[] args) {
System.out.println(DateUtils. getCurrentTime());
System.out.println("--"+System.currentTimeMillis());
}
}
4.5 Hbase工具类
package com.wudl.flink.hbase.utils;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import java.io.IOException;
/**
* @author :wudl
* @date :Created in 2021-12-08 23:57
* @description:Hbase工具类
* @modified By:
* @version: 1.0
*/
public class HbaseUtils {
private Connection conn = null;
private static String tableName = "wudluser";
private static Connection instance = null;
public static Connection getConnect() throws IOException {
Connection conn = null;
org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
conf.set(HConstants.ZOOKEEPER_QUORUM,"192.168.1.161");
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT,"2181");
conf.set(TableInputFormat.INPUT_TABLE,tableName);
org.apache.hadoop.conf.Configuration hbaseConf = HBaseConfiguration.create(conf);
//通过连接工厂创建连接
conn = ConnectionFactory.createConnection(hbaseConf);
return conn;
}
public static void close(BufferedMutator mutator,Connection conn) throws IOException {
if (mutator != null) {
mutator.close();
}
if (!conn.isClosed()) {
conn.close();
}
}
}
4.6 KafkaUtils 工具类
package com.wudl.flink.hbase.utils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
/**
* @author :wudl
* @date :Created in 2021-12-08 22:20
* @description:
* @modified By:
* @version: 1.0
*/
public class KafkaUtils {
private static String brokers = "192.168.1.161:6667,192.168.1.162:6667,192.168.1.163:6667";
private static String default_topic = "wudltopic";
public static FlinkKafkaProducer<String> getKafkaProducer(String topic) {
return new FlinkKafkaProducer<String>(brokers,
topic,
new SimpleStringSchema());
}
public static <T> FlinkKafkaProducer<T> getKafkaProducer(KafkaSerializationSchema<T> kafkaSerializationSchema) {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
return new FlinkKafkaProducer<T>(default_topic,
kafkaSerializationSchema,
properties,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
}
public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic, String groupId) {
Properties properties = new Properties();
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
return new FlinkKafkaConsumer<String>(topic,
new SimpleStringSchema(),
properties);
}
//拼接Kafka相关属性到DDL
public static String getKafkaDDL(String topic, String groupId) {
return " 'connector' = 'kafka', " +
" 'topic' = '" + topic + "'," +
" 'properties.bootstrap.servers' = '" + brokers + "', " +
" 'properties.group.id' = '" + groupId + "', " +
" 'format' = 'json', " +
" 'scan.startup.mode' = 'latest-offset' ";
}
}
4.7 RowKeyUtils 工具类
package com.wudl.flink.hbase.utils;
import com.alibaba.fastjson.JSON;
import com.wudl.flink.hbase.model.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MD5Hash;
import java.io.UnsupportedEncodingException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
/**
* @author :wudl
* @date :Created in 2021-12-08 23:05
* @description:
* @modified By:
* @version: 1.0
*/
public class RowKeyUtils {
public static byte[] getRowkey(User User) throws ParseException {
// 3. 构建ROWKEY
// 发件人ID1反转
StringBuilder stringBuilder = new StringBuilder(10);
// stringBuilder.append("_");
// stringBuilder.append(10);
// stringBuilder.append("_");
// 转换为时间戳
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:MM:ss");
stringBuilder.append(sdf.parse(User.getCreateTime()).getTime());
byte[] orginkey = Bytes.toBytes(stringBuilder.toString());
// 为了避免ROWKEY过长,取前八位
String md5AsHex = MD5Hash.getMD5AsHex(orginkey).substring(0, 8);
return Bytes.toBytes(md5AsHex + "_" + stringBuilder.toString());
}
}
4.8 主类Application
package com.wudl.flink.hbase;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.JsonObject;
import com.wudl.flink.hbase.model.User;
import com.wudl.flink.hbase.slink.HbaseSink;
import com.wudl.flink.hbase.utils.KafkaUtils;
import io.vertx.core.json.Json;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.concurrent.TimeUnit;
/**
* @author :wudl
* @date :Created in 2021-12-08 22:18
* @description:
* @modified By:
* @version: 1.0
*/
public class Application {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
String sourceTopic = "wudltopic";
String groupId = "2019";
DataStreamSource<String> kafkaDs = env.addSource(KafkaUtils.getKafkaConsumer(sourceTopic, groupId));
DataStream<User> userStream = kafkaDs.map(new MapFunction<String, User>() {
@Override
public User map(String s) throws Exception {
User user = JSON.parseObject(s, User.class);
return user;
}
});
userStream.addSink(new HbaseSink());
env.setRestartStrategy(RestartStrategies.failureRateRestart(
//最大失败次数
3,
// 衡量失败次数的是时间段
Time.of(2, TimeUnit.SECONDS),
// 间隔
Time.of(5, TimeUnit.SECONDS)
));
env.execute();
}
}
5. kafka 插入数据
{"address":"深圳","age":20,"createTime":"1638977618520","id":1,"name":"hdfs"}
{"address":"深圳2","age":21,"createTime":"2021-12-09 00:00:00","id":2,"name":"flink"}
{"address":"深圳5","age":24,"createTime":"2021-12-11 01:00:00","id":4,"name":"flink-sql03"}
6. 查看hbase 数据
scan 'wudluser'
hbase(main):109:0> scan 'wudluser'
ROW COLUMN+CELL
201095ee_1607446800000 column=cf:address, timestamp=1638981031968, value=\xE6\xB7\xB1\xE5\x9C\xB35
201095ee_1607446800000 column=cf:age, timestamp=1638981031968, value=24
201095ee_1607446800000 column=cf:createTime, timestamp=1638981031968, value=2021-13-09 01:00:00
201095ee_1607446800000 column=cf:id, timestamp=1638981031968, value=4
201095ee_1607446800000 column=cf:name, timestamp=1638981031968, value=flink-sql03
201095ee_1607446800000 column=cf:updateDate, timestamp=1638981031968, value=2021-12-09 00:30:30
2a8bd0b1_1607533200000 column=cf:address, timestamp=1638981085750, value=\xE6\xB7\xB1\xE5\x9C\xB35
2a8bd0b1_1607533200000 column=cf:age, timestamp=1638981085750, value=24
2a8bd0b1_1607533200000 column=cf:createTime, timestamp=1638981085750, value=2021-12-10 01:00:00
2a8bd0b1_1607533200000 column=cf:id, timestamp=1638981085750, value=4
2a8bd0b1_1607533200000 column=cf:name, timestamp=1638981085750, value=flink-sql03
2a8bd0b1_1607533200000 column=cf:updateDate, timestamp=1638981085750, value=2021-12-09 00:31:24
7fe8fbc2_1607443200000 column=cf:address, timestamp=1638978975402, value=\xE6\xB7\xB1\xE5\x9C\xB32
7fe8fbc2_1607443200000 column=cf:age, timestamp=1638978975402, value=21
7fe8fbc2_1607443200000 column=cf:createTime, timestamp=1638978975402, value=2021-12-09 00:00:00
7fe8fbc2_1607443200000 column=cf:id, timestamp=1638978975402, value=2
7fe8fbc2_1607443200000 column=cf:name, timestamp=1638978975402, value=flink
7fe8fbc2_1607443200000 column=cf:updateDate, timestamp=1638978975402, value=2021-12-08 23:56:14
86532b5a_1607356800000 column=cf:address, timestamp=1638978130285, value=\xE6\xB7\xB1\xE5\x9C\xB3
86532b5a_1607356800000 column=cf:age, timestamp=1638978130285, value=\x00\x00\x00\x14
86532b5a_1607356800000 column=cf:createTime, timestamp=1638978130285, value=2021-12-08 00:00:00
86532b5a_1607356800000 column=cf:id, timestamp=1638978130285, value=\x00\x00\x00\x00\x00\x00\x00\x01
86532b5a_1607356800000 column=cf:name, timestamp=1638978130285, value=hdfs
86532b5a_1607356800000 column=cf:updateDate, timestamp=1638978130285, value=2021-12-08 23:42:08
a7fd7ded_1607619600000 column=cf:address, timestamp=1638981129207, value=\xE6\xB7\xB1\xE5\x9C\xB35
a7fd7ded_1607619600000 column=cf:age, timestamp=1638981129207, value=24
a7fd7ded_1607619600000 column=cf:createTime, timestamp=1638981129207, value=2021-12-11 01:00:00
a7fd7ded_1607619600000 column=cf:id, timestamp=1638981129207, value=4
a7fd7ded_1607619600000 column=cf:name, timestamp=1638981129207, value=flink-sql03
a7fd7ded_1607619600000 column=cf:updateDate, timestamp=1638981129207, value=2021-12-09 00:32:07
5 row(s)
Took 0.0306 seconds
hbase(main):110:0>
7.代码读取hbase 程序
java 读取hbase 程序 https://blog.csdn.net/wudonglianga/article/details/121722396
8. 需要注意的是写入hbase 数据的时候int 类型的值需要转化为String 如下:
否则代码读取的时候会出现乱码:
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("id"), Bytes.toBytes(String.valueOf(user.getId())));