Flink SQL Client综合实战

欢迎访问我的GitHub

https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;

《Flink SQL Client初探》一文中,我们体验了Flink SQL Client的基本功能,今天来通过实战更深入学习和体验Flink SQL;

实战内容

本次实战主要是通过Flink SQL Client消费kafka的实时消息,再用各种SQL操作对数据进行查询统计,内容汇总如下:

  1. DDL创建Kafka表
  2. 窗口统计;
  3. 数据写入ElasticSearch
  4. 联表操作

版本信息

  1. Flink:1.10.0
  2. Flink所在操作系统:CentOS Linux release 7.7.1908
  3. JDK:1.8.0_211
  4. Kafka:2.4.0(scala:2.12)
  5. Mysql:5.7.29

数据源准备

  1. 本次实战用的数据,来源是阿里云天池公开数据集的一份淘宝用户行为数据集,获取方式请参考《准备数据集用于flink学习》
  2. 获取到数据集文件后转成kafka消息发出,这样我们使用Flink SQL时就按照实时消费kafka消息的方式来操作,具体的操作方式请参考《将CSV的数据发送到kafka》
  3. 上述操作完成后,一百零四万条淘宝用户行为数据就会通过kafka消息顺序发出,咱们的实战就有不间断实时数据可用 了,消息内容如下:
{"user_id":1004080,"item_id":2258662,"category_id":79451,"behavior":"pv","ts":"2017-11-24T23:47:47Z"}
{"user_id":100814,"item_id":5071478,"category_id":1107469,"behavior":"pv","ts":"2017-11-24T23:47:47Z"}
{"user_id":114321,"item_id":4306269,"category_id":4756105,"behavior":"pv","ts":"2017-11-24T23:47:48Z"}
  1. 上述消息中每个字段的含义如下表:
列名称 说明
用户ID 整数类型,序列化后的用户ID
商品ID 整数类型,序列化后的商品ID
商品类目ID 整数类型,序列化后的商品所属类目ID
行为类型 字符串,枚举类型,包括('pv', 'buy', 'cart', 'fav')
时间戳 行为发生的时间戳
时间字符串 根据时间戳字段生成的时间字符串

jar准备

实战过程中要用到下面这五个jar文件:

  1. flink-jdbc_2.11-1.10.0.jar
  2. flink-json-1.10.0.jar
  3. flink-sql-connector-elasticsearch6_2.11-1.10.0.jar
  4. flink-sql-connector-kafka_2.11-1.10.0.jar
  5. mysql-connector-java-5.1.48.jar

我已将这些文件打包上传到GitHub,下载地址:https://raw.githubusercontent.com/zq2599/blog_demos/master/files/sql_lib.zip

请在flink安装目录下新建文件夹<font color="blue">sql_lib</font>,然后将这五个jar文件放进去;

Elasticsearch准备

如果您装了docker和docker-compose,那么下面的命令可以快速部署elasticsearch和head工具:

wget https://raw.githubusercontent.com/zq2599/blog_demos/master/elasticsearch_docker_compose/docker-compose.yml && \
docker-compose up -d

准备完毕,开始操作吧;

DDL创建Kafka表

  1. 进入flink目录,启动flink:<font color="blue">bin/start-cluster.sh</font>
  2. 启动Flink SQL Client:<font color="blue">bin/sql-client.sh embedded -l sql_lib</font>
  3. 启动成功显示如下:
在这里插入图片描述
  1. 执行以下命令即可创建kafka表,请按照自己的信息调整参数:
CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3),
    proctime as PROCTIME(),   -- 处理时间列
    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 在ts上定义watermark,ts成为事件时间列
) WITH (
    'connector.type' = 'kafka',  -- kafka connector
    'connector.version' = 'universal',  -- universal 支持 0.11 以上的版本
    'connector.topic' = 'user_behavior',  -- kafka topic
    'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
    'connector.properties.zookeeper.connect' = '192.168.50.43:2181',  -- zk 地址
    'connector.properties.bootstrap.servers' = '192.168.50.43:9092',  -- broker 地址
    'format.type' = 'json'  -- 数据源格式为 json
);
  1. 执行<font color="blue">SELECT * FROM user_behavior;</font>看看原始数据,如果消息正常应该和下图类似:


    6.

窗口统计

  1. 下面的SQL是以每十分钟为窗口,统计每个窗口内的总浏览数,TUMBLE_START返回的数据格式是timestamp,这里再调用DATE_FORMAT函数将其格式化成了字符串:
SELECT DATE_FORMAT(TUMBLE_START(ts, INTERVAL '10' MINUTE), 'yyyy-MM-dd hh:mm:ss'), 
DATE_FORMAT(TUMBLE_END(ts, INTERVAL '10' MINUTE), 'yyyy-MM-dd hh:mm:ss'), 
COUNT(*)
FROM user_behavior
WHERE behavior = 'pv'
GROUP BY TUMBLE(ts, INTERVAL '10' MINUTE);
  1. 得到数据如下所示:
在这里插入图片描述

数据写入ElasticSearch

  1. 确保elasticsearch已部署好;
  2. 执行以下语句即可创建es表,请按照您自己的es信息调整下面的参数:
CREATE TABLE pv_per_minute ( 
    start_time STRING,
    end_time STRING,
    pv_cnt BIGINT
) WITH (
    'connector.type' = 'elasticsearch', -- 类型
    'connector.version' = '6',  -- elasticsearch版本
    'connector.hosts' = 'http://192.168.133.173:9200',  -- elasticsearch地址
    'connector.index' = 'pv_per_minute',  -- 索引名,相当于数据库表名
    'connector.document-type' = 'user_behavior', -- type,相当于数据库库名
    'connector.bulk-flush.max-actions' = '1',  -- 每条数据都刷新
    'format.type' = 'json',  -- 输出数据格式json
    'update-mode' = 'append'
);
  1. 执行以下语句,就会将每分钟的pv总数写入es的pv_per_minute索引:
INSERT INTO pv_per_minute
SELECT DATE_FORMAT(TUMBLE_START(ts, INTERVAL '1' MINUTE), 'yyyy-MM-dd hh:mm:ss') AS start_time, 
DATE_FORMAT(TUMBLE_END(ts, INTERVAL '1' MINUTE), 'yyyy-MM-dd hh:mm:ss') AS end_time, 
COUNT(*) AS pv_cnt
FROM user_behavior
WHERE behavior = 'pv'
GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE);
  1. 用es-head查看,发现数据已成功写入:


    在这里插入图片描述

联表操作

  1. 当前user_behavior表的category_id表示商品类目,例如<font color="blue">11120</font>表示计算机书籍,<font color="blue">61626</font>表示牛仔裤,本次实战的数据集中,这样的类目共有五千多种;
  2. 如果我们将这五千多种类目分成6个大类,例如<font color="blue">11120</font>属于教育类,<font color="blue">61626</font>属于服装类,那么应该有个大类和类目的关系表;
  3. 这个大类和类目的关系表在MySQL创建,表名叫<font color="blue">category_info</font>,建表语句如下:
CREATE TABLE `category_info`(
   `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
   `parent_id` bigint ,
   `category_id` bigint ,
   PRIMARY KEY ( `id` )
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
  1. 表<font color="blue">category_info</font>所有数据来自对原始数据中<font color="blue">category_id</font>字段的提取,并且随机将它们划分为6个大类,该表的数据请在我的GitHub下载:https://raw.githubusercontent.com/zq2599/blog_demos/master/files/category_info.sql
  2. 请在MySQL上建表<font color="blue">category_info</font>,并将上述数据全部写进去;
  3. 在Flink SQL Client执行以下语句创建这个维表,mysql信息请按您自己配置调整:
CREATE TABLE category_info (
    parent_id BIGINT, -- 商品大类
    category_id BIGINT  -- 商品详细类目
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://192.168.50.43:3306/flinkdemo',
    'connector.table' = 'category_info',
    'connector.driver' = 'com.mysql.jdbc.Driver',
    'connector.username' = 'root',
    'connector.password' = '123456',
    'connector.lookup.cache.max-rows' = '5000',
    'connector.lookup.cache.ttl' = '10min'
);
  1. 尝试联表查询:
SELECT U.user_id, U.item_id, U.behavior, C.parent_id, C.category_id
FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.category_id;
  1. 如下图,联表查询成功,每条记录都能对应大类:


    在这里插入图片描述
  2. 再试试联表统计,每个大类的总浏览量:
SELECT C.parent_id, COUNT(*) AS pv_count
FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.category_id
WHERE behavior = 'pv'
GROUP BY C.parent_id;
  1. 如下图,数据是动态更新的:


    在这里插入图片描述
  2. 执行以下语句,可以在统计时将大类ID转成中文名:
SELECT CASE C.parent_id
    WHEN 1 THEN '服饰鞋包'
    WHEN 2 THEN '家装家饰'
    WHEN 3 THEN '家电'
    WHEN 4 THEN '美妆'
    WHEN 5 THEN '母婴'
    WHEN 6 THEN '3C数码'
    ELSE '其他'
  END AS category_name,
COUNT(*) AS pv_count
FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.category_id
WHERE behavior = 'pv'
GROUP BY C.parent_id;
  1. 效果如下图:
在这里插入图片描述

至此,我们借助Flink SQL Client体验了Flink SQL丰富的功能,如果您也在学习Flink SQL,希望本文能给您一些参考;

欢迎关注公众号:程序员欣宸

微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界...
https://github.com/zq2599/blog_demos

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,036评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,046评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,411评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,622评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,661评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,521评论 1 304
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,288评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,200评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,644评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,837评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,953评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,673评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,281评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,889评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,011评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,119评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,901评论 2 355

推荐阅读更多精彩内容