Flink 使用之Kafka exactly-once场景

Flink 使用介绍相关文档目录

Flink 使用介绍相关文档目录

前言

近期一个需求是写一套demo用来证明Flink能够精准一次投送。笔者设计了如下场景:Flink从Kafka消费数据,然后原封不动再发送回Kafka。中间模拟Flink作业失败恢复的场景。Flink作业恢复之后,仍可以继续发送数据到Kafka。输出的数据和消费的数据相比,不丢失也不重复。

环境信息

  • Flink 1.13.2
  • Kafka 1.1.1
  • Hadoop 3.1.1

要点分析

需要格外注意的有如下内容:

  • Flink一定要启用checkpoint。
  • Flink CheckpointMode一定要配置为EXACTLY_ONCE
  • Flink Kafka数据源配置要禁用自动commit。
  • Flink Kafka数据源配置要配置隔离级别为read_committed
  • Flink Kafka Producer要配置EXACTLY_ONCE(内部会启用事务和幂等性)。

主要注意的是,如果我们使用kafka-console-consumer等外部系统读取Flink写入到Kafka的数据来验证数据是否重复或丢失的话,必须保证这个外部系统也是配置了EXACTLY_ONCE相关支持的。例如kafka-console-consumer需要配置隔离级别为read_committed。否则即便是Flink处理数据的时候的确实现了exactly_once,由于kafka-console-consumer读到了未提交的数据,读到的数据会“重复”。干扰结果的验证。

实现代码

Flink主程序代码如下所示:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 非必须
env.setParallelism(1);
ParameterTool parameterTool = ParameterTool.fromArgs(args);

String sourceBroker = parameterTool.get("source-broker");
String sinkBroker = parameterTool.get("sink-broker");
String sourceTopic = parameterTool.get("source-topic");
String sinkTopic = parameterTool.get("sink-topic");
String checkpointPath = parameterTool.get("ckp-path");

CheckpointingMode checkpointingMode = CheckpointingMode.valueOf(mode);

// checkpoint时间间隔,必须到启动checkpoint
env.enableCheckpointing(666, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(checkpointPath);

KafkaSource<String> source = KafkaSource.<String>builder()
        .setBootstrapServers(sourceBroker)
        .setTopics(sourceTopic)
        .setGroupId("source")
        .setValueOnlyDeserializer(new SimpleStringSchema())
        // 必须项,否则数据会有重复
        // 禁用kafka source自动提交offset
        .setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
        // 必须项,否则数据会有重复
        // 配置kafka source隔离级别为读提交
        .setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
        .build();

DataStreamSource<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka_source");

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", sinkBroker);
// exactly once模式必须要配置
properties.setProperty("transaction.timeout.ms", String.valueOf(5 * 60 * 1000));
// 启用幂等性,下面设置FlinkKafkaProducer的时候指定了EXACTLY_ONCE会自动启用事务,可以不配置此项
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
// 配置transactional id,下面设置FlinkKafkaProducer的时候指定了EXACTLY_ONCE会自动启用事务,可以不配置此项
properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "producer");

KafkaSerializationSchema<String> serializationSchema = new KafkaSerializationSchema<String>() {
    @Override
    public ProducerRecord<byte[], byte[]> serialize(String element, Long timestamp) {
        return new ProducerRecord<>(
                sinkTopic,
                element.getBytes(StandardCharsets.UTF_8));
    }
};

FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(
        sinkTopic,
        serializationSchema,
        properties,
        FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

stream.addSink(kafkaSink).name("kafka_sink");

// execute program
env.execute("Exactly once demo");

演示方式

这里Kafka数据源topic为source,输出数据topic为sink。

启动任意kafka数据源(也可以使用console producer),向source topic写入数据。

启动kafka console consumer,监视Flink输出的数据。注意务必要添加隔离级别参数,设置为read_committed。命令示例如下:

kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --isolation-level read_committed --topic sink

将上面代码编译输出为Flink作业jar,使用flink命令提交。

flink run -m yarn-cluster  -c xxx.xxx.xxx xxx.jar --source-broker master:9092,node1:9092,node2:9092 --source-topic source --sink-broker master:9092,node1:9092,node2:9092 --sink-topic sink --ckp-path hdfs://xxx:9000/path/to/checkpoint/

任务运行之后一段时间。通过Flink管理页面找到TaskManager container所在的节点。使用kill pid命令终止进程。等待Flink自动恢复。成功恢复后再观察kafka console consumer,输出数据应不重复不遗漏。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,542评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,596评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,021评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,682评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,792评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,985评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,107评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,845评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,299评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,612评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,747评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,441评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,072评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,828评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,069评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,545评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,658评论 2 350

推荐阅读更多精彩内容