Flink Kafka Doris实战demo

Flink Kafka Doris实战demo

环境:

  • Flink 1.12
  • Doris 0.12
  • Kafka 1.0.1+kafka3.1.1

一:编译doris

参考官网Docker编译:https://github.com/apache/incubator-doris/wiki/Doris-Install

1.1 注意问题:

需要把fe/pom.xml中下载的Repository地址改下
cloudera-thirdparty
https://repository.cloudera.com/content/repositories/third-party/ 改为:

cloudera-public https://repository.cloudera.com/artifactory/public/

cloudera-plugins
https://repository.cloudera.com/content/groups/public/ 改为:

cloudera-public https://repository.cloudera.com/artifactory/public/

1.2 编译:

直接编译就行

1.3 如果不想编译,直接下载编译好的部署包(0.12,0.11版本)

https://download.csdn.net/download/leng91060404/16661347
https://download.csdn.net/download/leng91060404/16655427

二:部署doris(单机版测试)

参考:https://github.com/apache/incubator-doris/wiki/Doris-Install#3-%E9%83%A8%E7%BD%B2

2.1 注意

FE:

在fe目录下创建doris-meta目录;

根据需要修改priority_networks = ip 参数

BE:
修改参数storage_root_path = /home/disk1/doris;/home/disk2/doris;并创建目录;

根据需要修改priority_networks = ip 参数

2.2 启动

有问题直接查看log日志,进行分析解决

2.3 查看状态

BE节点需要先在FE中添加,才可加入集群。

可以使用 mysql-client 连接到 FE: ./mysql-client -h host-P port -uroot 其中 host 为 FE 所在节点 ip;port 为 fe/conf/fe.conf 中的 query_port;默认使用 root 账户,无密码登录。

FE状态查看:
用户可以通过 mysql 客户端登陆 Master FE。通过:
SHOW PROC '/frontends';
来查看当前 FE 的节点情况。
也可以通过前端页面连接:http://fe_hostname:fe_http_port/frontend 或者 http://fe_hostname:fe_http_port/system?path=//frontends 来查看 FE 节点的情况。
mysql> SHOW PROC '/frontends';
+--------------------------------+-------------+-----------+-------------+----------+-----------+---------+----------+----------+-----------+------+-------+-------------------+---------------------+----------+--------+
| Name                           | IP          | HostName  | EditLogPort | HttpPort | QueryPort | RpcPort | Role     | IsMaster | ClusterId | Join | Alive | ReplayedJournalId | LastHeartbeat       | IsHelper | ErrMsg |
+--------------------------------+-------------+-----------+-------------+----------+-----------+---------+----------+----------+-----------+------+-------+-------------------+---------------------+----------+--------+
| 172.16.3.76_9011_1618481783037 | xxxxxxxxx | xxxxxxxx | 9011        | 8030     | 9030      | 9020    | FOLLOWER | true     | 502881269 | true | true  | 24505             | 2021-04-16 16:03:57 | true     |        |
+--------------------------------+-------------+-----------+-------------+----------+-----------+---------+----------+----------+-----------+------+-------+-------------------+---------------------+----------+--------+
1 row in set (0.02 sec)
BE状态查看:
用户可以通过 mysql-client 登陆 Leader FE。通过:
SHOW PROC '/backends';
来查看当前 BE 的节点情况。
也可以通过前端页面连接:http://fe_hostname:fe_http_port/backend 或者 http://fe_hostname:fe_http_port/system?path=//backends 来查看 BE 节点的情况。
mysql> SHOW PROC '/backends';
+-----------+-----------------+-------------+-----------+---------------+--------+----------+----------+---------------------+---------------------+-------+----------------------+-----------------------+-----------+------------------+---------------+---------------+---------+----------------+--------+---------------------+
| BackendId | Cluster         | IP          | HostName  | HeartbeatPort | BePort | HttpPort | BrpcPort | LastStartTime       | LastHeartbeat       | Alive | SystemDecommissioned | ClusterDecommissioned | TabletNum | DataUsedCapacity | AvailCapacity | TotalCapacity | UsedPct | MaxDiskUsedPct | ErrMsg | Version             |
+-----------+-----------------+-------------+-----------+---------------+--------+----------+----------+---------------------+---------------------+-------+----------------------+-----------------------+-----------+------------------+---------------+---------------+---------+----------------+--------+---------------------+
| 10002     | default_cluster | xxxxxxxx | xxxxxx | 9050          | 9060   | 8040     | 8060     | 2021-04-15 18:17:38 | 2021-04-16 16:02:57 | true  | false                | false                 | 13        | 55.108 KB        | 64.740 GB     | 165.915 GB    | 60.98 % | 60.98 %        |        | 0.12.0-rc03-Unknown |
+-----------+-----------------+-------------+-----------+---------------+--------+----------+----------+---------------------+---------------------+-------+----------------------+-----------------------+-----------+------------------+---------------+---------------+---------+----------------+--------+---------------------+
1 row in set (0.00 sec)

三:flink程序(source:kafka、transfer:flink、sink:kafka)

3.1 数据实体类

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Item {

    private int id;
    private String name;
    private int counts;
    private Timestamp ts;

    @Override
    public String toString() {
        return "{" +
                "\"id\":" + "\"" + id + "\"" +
                ",\"name\":" + "\"" + name + "\"" +
                ",\"counts\":" + "\"" + counts + "\"" +
                ",\"ts\":" + "\"" + ts + "\"" +
                "}";

    }

3.2 模拟数据到source-kafka

{

        while (true) {
            Item item = generateItem();
            long startTime = System.currentTimeMillis();

            if (isAsync) {
                producer.send(new ProducerRecord(topic, item.toString()), new DemoCallBack(startTime, item.toString()));
                System.out.println("Sent message: (" + ", " + item.toString() + ")");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                try {
                    producer.send(new ProducerRecord(topic, item)).get();
                    System.out.println("Sent message: (" + ", " + item + ")");
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
        }
    }

数据:

{"id":"75392","name":"TIE4","counts":"77098","ts":"2021-04-16 11:40:48.423"}
{"id":"40904","name":"TIE3","counts":"26613","ts":"2021-04-16 11:40:52.436"}
{"id":"18363","name":"SHOE4","counts":"56329","ts":"2021-04-16 11:40:56.446"}
{"id":"17240","name":"TIE4","counts":"32312","ts":"2021-04-16 11:41:00.461"}

3.3 flink处理运行(source+transfer+sink)

sql:

public static final String KAFKA_SQL_EVEN_CREATE = "CREATE TABLE even (\n" +
            "  `id` INTEGER,\n" +
            "  `name` STRING,\n" +
            "  `counts` INTEGER,\n" +
            "  `ts` TIMESTAMP(3)," +
            " WATERMARK FOR ts as ts - INTERVAL '5' SECOND \n" +
            ") WITH (\n" +
            "  'connector' = 'kafka',\n" +
            "  'topic' = 'flink_sql_table_test_datas',\n" +
            "  'properties.bootstrap.servers' = 'xxxxxxxxxxxxx:9092',\n" +
            "  'properties.group.id' = 'testGroup',\n" +
            "  'scan.startup.mode' = 'earliest-offset',\n" +
            "  'format' = 'json'\n" +
            ")";

sql :

public static final String QUERY_EVEN_AGG_SQL =
            "SELECT\n"
                    + "  id as even_id,name as even_name,\n"
                    + "  COUNT(counts) even_cnt \n"
                    + "FROM even \n"
                    + "GROUP BY TUMBLE(ts, INTERVAL '20' SECOND),id,name";

deal :

{
        // set up execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

        //create table op
        tableEnv.executeSql(KAFKA_SQL_EVEN_CREATE);
        tableEnv.executeSql(KAFKA_SQL_ODD_CREATE);

        // windows aggregate count op
        Table resultEven = tableEnv.sqlQuery(QUERY_EVEN_AGG_SQL);
        resultEven.printSchema();

        Table resultOdd = tableEnv.sqlQuery(QUERY_ODD_AGG_SQL);
        resultOdd.printSchema();

        //sql join op
        Table joinTable = resultEven.join(resultOdd)
                .where(
                        $("even_name").isEqual($("odd_name")))
                .select(
                        $("even_id").as("id"),
                        $("even_name").as("name"),
                        $("even_cnt").as("counts"),
                        $("ts"));

        joinTable.printSchema();

        //sink to kafka 1
        DataStream<ItemSink> sinkResultStream = tableEnv.toAppendStream(joinTable, ItemSink.class);

        //kafka connector
        Properties props = new Properties();
        props.put("bootstrap.servers", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
        props.put("client.id", KafkaProperties.CLIENT_ID);

        FlinkKafkaProducer<ItemSink> myProducer = new FlinkKafkaProducer<>(
                KafkaProperties.TOPIC_SINK,
                new ProducerStringSerializationSchema(KafkaProperties.TOPIC_SINK),
                props,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

        sinkResultStream.addSink(myProducer);

        //run
        env.execute("Streaming Window SQL Job");

    }

sink-kafka数据

41219,HAT9,1,2021-04-16 11:32:08.524
41219,HAT9,1,2021-04-16 11:33:30.846
41219,HAT9,1,2021-04-16 11:40:36.387
41219,HAT9,1,2021-04-14 19:21:46.738

四:kafka load to doris(flink上面已经sink到kafka)

4.1 建库

CREATE DATABASE example_db;

4.2 建表

CREATE TABLE item
(
    id INTEGER,
    name VARCHAR(256) DEFAULT '',
    ts DATETIME,
    counts BIGINT SUM DEFAULT '0'
)
    AGGREGATE KEY(id, name, ts)
DISTRIBUTED BY HASH(id) BUCKETS 3
PROPERTIES("replication_num" = "1");

4.3 加载Kafka2Doris任务

CREATE ROUTINE LOAD example_db.task1 ON item
        COLUMNS TERMINATED BY ",",
        COLUMNS(id, name, counts, ts)
        PROPERTIES
        (
            "desired_concurrent_number"="1",
            "max_batch_interval" = "20",
            "max_batch_rows" = "300000",
            "max_batch_size" = "209715200",
            "strict_mode" = "false"
        )
        FROM KAFKA
        (
            "kafka_broker_list" = "xxxxxxx:9092",
            "kafka_topic" = "sinkTopic",
            "property.group.id" = "1234",
            "property.client.id" = "12345",
             "kafka_partitions" = "0,1,2",
            "kafka_offsets" = "0,0,0"
        );

4.4 查看任务

SHOW ROUTINE LOAD FOR example_db.task1;
mysql> SHOW ROUTINE LOAD FOR flink_kafka_db.task1;
+-------+-------+---------------------+-----------+---------+--------------------------------+-----------+---------+----------------+----------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------+-----------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------+----------------------+--------------+----------+
| Id    | Name  | CreateTime          | PauseTime | EndTime | DbName                         | TableName | State   | DataSourceType | CurrentTaskNum | JobProperties                                                                                                                                                                                                                         | DataSourceProperties                                                                                    | CustomProperties                        | Statistic                                                                                                                                                                                                     | Progress                           | ReasonOfStateChanged | ErrorLogUrls | OtherMsg |
+-------+-------+---------------------+-----------+---------+--------------------------------+-----------+---------+----------------+----------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------+-----------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------+----------------------+--------------+----------+
| 10037 | task1 | 2021-04-16 11:27:39 | N/A       | N/A     | default_cluster:flink_kafka_db | item      | RUNNING | KAFKA          | 1              | {"partitions":"*","columnToColumnExpr":"id,name,counts,ts","maxBatchIntervalS":"20","whereExpr":"*","maxBatchSizeBytes":"209715200","columnSeparator":"','","maxErrorNum":"0","currentTaskConcurrentNum":"1","maxBatchRows":"300000"} | {"topic":"flink_sql_table_test_sink","currentKafkaPartitions":"0,1,2","brokerList":"172.16.2.148:9092"} | {"group.id":"1234","client.id":"12345"} | {"receivedBytes":726452,"errorRows":0,"committedTaskNum":159,"loadedRows":20118,"loadRowsRate":0,"abortedTaskNum":629,"totalRows":20118,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":3247070} | {"0":"6059","1":"6915","2":"7141"} |                      |              |          |
+-------+-------+---------------------+-----------+---------+--------------------------------+-----------+---------+----------------+----------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------+-----------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------+----------------------+--------------+----------+
1 row in set (0.00 sec)
SHOW ROUTINE LOAD TASK WHERE JobName = "task1";
mysql> SHOW ROUTINE LOAD TASK WHERE JobName = "task1";
+-----------------------------------+-------+-----------+-------+---------------------+---------------------+---------+-------+------------------------------+
| TaskId                            | TxnId | TxnStatus | JobId | CreateTime          | ExecuteStartTime    | Timeout | BeId  | DataSourceProperties         |
+-----------------------------------+-------+-----------+-------+---------------------+---------------------+---------+-------+------------------------------+
| 56543bac022d4fdb-bb5944617d6480ac | 788   | UNKNOWN   | 10037 | 2021-04-16 15:56:50 | 2021-04-16 15:56:50 | 40      | 10002 | {"0":6060,"1":6916,"2":7142} |
+-----------------------------------+-------+-----------+-------+---------------------+---------------------+---------+-------+------------------------------+

4.5 通过mysql查看数据

./mysql-client -h host -P port -uroot
mysql> select * from item order by id desc limit 10;
+-------+-------+---------------------+--------+
| id    | name  | ts                  | counts |
+-------+-------+---------------------+--------+
| 99995 | SHOE2 | 2021-04-16 11:34:45 |      3 |
| 99995 | SHOE2 | 2021-04-15 19:05:59 |      4 |
| 99995 | SHOE2 | 2021-04-16 11:35:59 |      3 |
| 99995 | SHOE2 | 2021-04-16 11:35:05 |      3 |
| 99995 | SHOE2 | 2021-04-14 19:22:06 |      4 |
| 99995 | SHOE2 | 2021-04-16 11:35:57 |      3 |
| 99995 | SHOE2 | 2021-04-16 11:38:45 |      3 |
| 99995 | SHOE2 | 2021-04-16 11:38:11 |      3 |
| 99995 | SHOE2 | 2021-04-14 19:22:24 |      4 |
| 99995 | SHOE2 | 2021-04-14 19:21:12 |      4 |
+-------+-------+---------------------+--------+
10 rows in set (0.01 sec)

五:demo代码参考

https://gitee.com/suntyu_admin/flink-kafka-doris

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 221,888评论 6 515
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,677评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,386评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,726评论 1 297
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,729评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,337评论 1 310
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,902评论 3 421
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,807评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,349评论 1 318
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,439评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,567评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,242评论 5 350
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,933评论 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,420评论 0 24
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,531评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,995评论 3 377
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,585评论 2 359