Flink SQL 自定义 Sink

1.背景

内部要做 Flink SQL 平台,本文以自定义 Redis Sink 为例来说明 Flink SQL 如何自定义 Sink 以及自定义完了之后如何使用
基于 Flink 1.11

2.步骤

  1. implements DynamicTableSinkFactory
  2. implements DynamicTableSink
  3. 创建 Redis Sink

3.自定义 sink 代码

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.DynamicTableSink.DataStructureConverter;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import redis.clients.jedis.JedisCluster;

import java.util.*;

import static org.apache.flink.configuration.ConfigOptions.key;


/**
 * @author shengjk1
 * @date 2020/10/16
 */
public class RedisTableSinkFactory implements DynamicTableSinkFactory {
    
    public static final String IDENTIFIER = "redis";
    
    public static final ConfigOption<String> HOST_PORT = key("hostPort")
            .stringType()
            .noDefaultValue()
            .withDescription("redis host and port,");
    
    public static final ConfigOption<String> PASSWORD = key("password")
            .stringType()
            .noDefaultValue()
            .withDescription("redis password");
    
    public static final ConfigOption<Integer> EXPIRE_TIME = key("expireTime")
            .intType()
            .noDefaultValue()
            .withDescription("redis key expire time");
    
    public static final ConfigOption<String> KEY_TYPE = key("keyType")
            .stringType()
            .noDefaultValue()
            .withDescription("redis key type,such as hash,string and so on ");
    
    public static final ConfigOption<String> KEY_TEMPLATE = key("keyTemplate")
            .stringType()
            .noDefaultValue()
            .withDescription("redis key template ");
    
    public static final ConfigOption<String> FIELD_TEMPLATE = key("fieldTemplate")
            .stringType()
            .noDefaultValue()
            .withDescription("redis field template ");
    
    
    public static final ConfigOption<String> VALUE_NAMES = key("valueNames")
            .stringType()
            .noDefaultValue()
            .withDescription("redis value name ");
    
    @Override
    // 当 connector 与 IDENTIFIER 一直才会找到 RedisTableSinkFactory 通过 
    public String factoryIdentifier() {
        return IDENTIFIER;
    }
    
    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        return new HashSet<>();
    }
    
    @Override
    //我们自己定义的所有选项 (with 后面的 ) 都会在这里获取
    public Set<ConfigOption<?>> optionalOptions() {
        Set<ConfigOption<?>> options = new HashSet<>();
        options.add(HOST_PORT);
        options.add(PASSWORD);
        options.add(EXPIRE_TIME);
        options.add(KEY_TYPE);
        options.add(KEY_TEMPLATE);
        options.add(FIELD_TEMPLATE);
        options.add(VALUE_NAMES);
        return options;
    }
    
    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
        helper.validate();
        ReadableConfig options = helper.getOptions();
        return new RedisSink(
                context.getCatalogTable().getSchema().toPhysicalRowDataType(),
                options);
    }
    
    
    private static class RedisSink implements DynamicTableSink {
        
        private final DataType type;
        private final ReadableConfig options;
        
        private RedisSink(DataType type, ReadableConfig options) {
            this.type = type;
            this.options = options;
        }
        
        @Override
        //ChangelogMode 
        public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
            return requestedMode;
        }
        
        @Override
        //具体运行的地方,真正开始调用用户自己定义的 streaming sink ,建立 sql 与 streaming 的联系
        public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
            DataStructureConverter converter = context.createDataStructureConverter(type);
            return SinkFunctionProvider.of(new RowDataPrintFunction(converter, options, type));
        }
        
        @Override
        // sink 可以不用实现,主要用来 source 的谓词下推
        public DynamicTableSink copy() {
            return new RedisSink(type, options);
        }
        
        @Override
        public String asSummaryString() {
            return "redis";
        }
    }
    
    /**
     同 flink streaming 自定义 sink ,只不过我们这次处理的是 RowData,不细说
     */
    private static class RowDataPrintFunction extends RichSinkFunction<RowData> {
        
        private static final long serialVersionUID = 1L;
        
        private final DataStructureConverter converter;
        private final ReadableConfig options;
        private final DataType type;
        private RowType logicalType;
        private HashMap<String, Integer> fields;
        private JedisCluster jedisCluster;
        
        private RowDataPrintFunction(
                DataStructureConverter converter, ReadableConfig options, DataType type) {
            this.converter = converter;
            this.options = options;
            this.type = type;
        }
        
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            logicalType = (RowType) type.getLogicalType();
            fields = new HashMap<>();
            List<RowType.RowField> rowFields = logicalType.getFields();
            int size = rowFields.size();
            for (int i = 0; i < size; i++) {
                fields.put(rowFields.get(i).getName(), i);
            }
            
            jedisCluster = RedisUtil.getJedisCluster(options.get(HOST_PORT));
        }

        @Override
        public void close() throws Exception {
            RedisUtil.closeConn(jedisCluster);
        }

        @Override
        /*
        2> +I(1,30017323,1101)
        2> -U(1,30017323,1101)
        2> +U(2,30017323,1101)
        2> -U(2,30017323,1101)
        2> +U(3,30017323,1101)
        2> -U(3,30017323,1101)
        2> +U(4,30017323,1101)
        3> -U(3,980897,3208)
        3> +U(4,980897,3208)
         */
        public void invoke(RowData rowData, Context context) {
            RowKind rowKind = rowData.getRowKind();
            Row data = (Row) converter.toExternal(rowData);
            if (rowKind.equals(RowKind.UPDATE_AFTER) || rowKind.equals(RowKind.INSERT)) {
                
                String keyTemplate = options.get(KEY_TEMPLATE);
                if (Objects.isNull(keyTemplate) || keyTemplate.trim().length() == 0) {
                    throw new NullPointerException(" keyTemplate is null or keyTemplate is empty");
                }
                
                if (keyTemplate.contains("${")) {
                    String[] split = keyTemplate.split("\\$\\{");
                    keyTemplate = "";
                    for (String s : split) {
                        if (s.contains("}")) {
                            String filedName = s.substring(0, s.length() - 1);
                            int index = fields.get(filedName);
                            keyTemplate = keyTemplate + data.getField(index).toString();
                        } else {
                            keyTemplate = keyTemplate + s;
                        }
                    }
                }
                
                String keyType = options.get(KEY_TYPE);
                String valueNames = options.get(VALUE_NAMES);
                // type=hash must need fieldTemplate
                if ("hash".equalsIgnoreCase(keyType)) {
                    String fieldTemplate = options.get(FIELD_TEMPLATE);
                    if (fieldTemplate.contains("${")) {
                        String[] split = fieldTemplate.split("\\$\\{");
                        fieldTemplate = "";
                        for (String s : split) {
                            if (s.contains("}")) {
                                String fieldName = s.substring(0, s.length() - 1);
                                int index = fields.get(fieldName);
                                fieldTemplate = fieldTemplate + data.getField(index).toString();
                            } else {
                                fieldTemplate = fieldTemplate + s;
                            }
                        }
                    }
                    
                    //fieldName = fieldTemplate-valueName
                    if (valueNames.contains(",")) {
                        HashMap<String, String> map = new HashMap<>();
                        String[] fieldNames = valueNames.split(",");
                        for (String fieldName : fieldNames) {
                            String value = data.getField(fields.get(fieldName)).toString();
                            map.put(fieldTemplate + "_" + fieldName, value);
                        }
                        jedisCluster.hset(keyTemplate, map);
                    } else {
                        jedisCluster.hset(keyTemplate, fieldTemplate + "_" + valueNames, data.getField(fields.get(valueNames)).toString());
                    }
                    
                } else if ("set".equalsIgnoreCase(keyType)) {
                    jedisCluster.set(keyTemplate, data.getField(fields.get(valueNames)).toString());
                    
                } else if ("sadd".equalsIgnoreCase(keyType)) {
                    jedisCluster.sadd(keyTemplate, data.getField(fields.get(valueNames)).toString());
                } else if ("zadd".equalsIgnoreCase(keyType)) {
                    jedisCluster.sadd(keyTemplate, data.getField(fields.get(valueNames)).toString());
                } else {
                    throw new IllegalArgumentException(" not find this keyType:" + keyType);
                }
                
                if (Objects.nonNull(options.get(EXPIRE_TIME))) {
                    jedisCluster.expire(keyTemplate, options.get(EXPIRE_TIME));
                }
            }
        }
    }
}

4.使用 Redis Sink

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.scala.typeutils.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

/**
 * @author shengjk1
 * @date 2020/9/25
 */
public class SqlKafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, environmentSettings);
        // enable checkpointing
        Configuration configuration = tableEnv.getConfig().getConfiguration();
        configuration.set(
                ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
        configuration.set(
                ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10));
        
        String sql = "CREATE TABLE sourcedata (`id` bigint,`status` int,`city_id` bigint,`courier_id` bigint,info_index int,order_id bigint,tableName String" +
                ") WITH (" +
                "'connector' = 'kafka','topic' = 'xxx'," +
                "'properties.bootstrap.servers' = 'xxx','properties.group.id' = 'testGroup'," +
                "'format' = 'json','scan.startup.mode' = 'earliest-offset')";
        tableEnv.executeSql(sql);
        
        //15017284 distinct
        Table bigtable = tableEnv.sqlQuery("select distinct a.id,a.courier_id,a.status,a.city_id,b.info_index from (select id,status,city_id,courier_id from sourcedata where tableName = 'orders' and status=60)a join (select " +
                " order_id,max(info_index)info_index from sourcedata  where tableName = 'infos'  group by order_id )b on a.id=b.order_id");

        sql = "CREATE TABLE redis (info_index BIGINT,courier_id BIGINT,city_id BIGINT" +
                ") WITH (" +
                "'connector' = 'redis'," +
                "'hostPort'='xxx'," +
                "'keyType'='hash'," +
                "'keyTemplate'='test2_${city_id}'," +
                "'fieldTemplate'='test2_${courier_id}'," +
                "'valueNames'='info_index,city_id'," +
                "'expireTime'='1000')";
            
        tableEnv.executeSql(sql);
        
        Table resultTable = tableEnv.sqlQuery("select sum(info_index)info_index,courier_id,city_id from " + bigtable + " group by city_id,courier_id");
        TupleTypeInfo<Tuple3<Long, Long, Long>> tupleType = new TupleTypeInfo<>(
                Types.LONG(),
                Types.LONG(),
                Types.LONG());
        tableEnv.toRetractStream(resultTable, tupleType).print("===== ");
        tableEnv.executeSql("INSERT INTO redis SELECT info_index,courier_id,city_id FROM " + resultTable);
        env.execute("");
    }
}

5.详细解释

create table test(
`id` bigint,
 `url` string,
 `day` string,
  `pv` long,
  `uv` long
) with {
    'connector'='redis',
    'hostPort'='xxx',
    'password'='',
    'expireTime'='100',
    'keyType'='hash',
    'keyTemplate'='test_${id}',
    'fieldTemplate'='${day}',
    'valueNames'='pv,uv',
}

redis result: 假设 id=1 day=20201016 pv=20,uv=20
    hash
    test_1 20201016-pv 20,20201016-uv 20

参数解释:
connector  固定写法
hostPort   redis 的地址
password   redis 的密码
expireTime  redis key 过期时间,单位为 s
keyType  redis key 的类型,目前有 hash、set、sadd、zadd
keyTemplate  redis key 的表达式,如 test_${id} 注意 id 为表的字段名
fieldTemplate  redis keyType==hash 时,此选项为必选,表达式规则同 keyTemplate
valueNames  redis value  only 可以有多个

6.原理

在这里插入图片描述
  1. 整个流程如图,CatalogTable ---> DynamicTableSource and DynamicTableSink 这个过程中,其实是通过 DynamicTableSourceFactory and DynamicTableSinkFactory 起到了一个桥梁的作用

  2. (Source/Sink)Factory 通过 connector='xxx' 找到,理论上会做三种操作
    1. validate options
    2. configure encoding/decoding formats( if required )
    3. create a parameterized instance of the table connector
    其中 formats 是通过 format='xxx' 找到

  3. DynamicTableSource DynamicTableSink
    官网虽说可以看做是有状态的,但是否真的有状态取决于具体实现的 source 和 sink

  4. 生成 Runtime logic,Runtime logic 被 Flink core connector interfaces( 如 InputFormat or SourceFunction),如果是 kafka 的话 则 是 FlinkKafkaConsumer 实现,而这些实现又被抽象为 *Provider,然后开始执行 *Provider

  5. *Provider 是连接 SQL 与 Streaming 代码级别的桥梁

7.参考

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,372评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,368评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,415评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,157评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,171评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,125评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,028评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,887评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,310评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,533评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,690评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,411评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,004评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,812评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,693评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,577评论 2 353