3-flink api概述

1、抽象分层

3.1-api抽象分层.png
  1. ProcessFunction:提供对时间、事件、状态的细粒度控制,用于处理一些复杂事件的逻辑上,易用性较低
  2. DataStreamApi&DataSet:核心api,提供对流/批数据的操作处理,基于函数式的,简单易用
  3. SQL&TableApi:flink sql的集成基于apache calcite,使用比其他api更灵活方便

2、datastream api

datastream api主要包含以下3块内容

1、datasource

数据的输入来源,来源方式主要有以下几种

  1. 来自文件:读取文本文件,将符合TextInputFormat规范的文件,将字符串返回

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<String> text = env.readTextFile("file:///filePath");
    
  2. 来自集合:fromCollection(Collection),fromElements(T ...)等

  3. 来自socket

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);
    
  4. 自定义输入

    自定义输入源有两种方式:

    • 实现SourceFunction接口来自定义无并行度的数据源

      demo:每一秒产生一条数据的source

      package streaming.source;
      
      import org.apache.flink.streaming.api.functions.source.SourceFunction;
      
      
      /**
       * @author xiaolong
       */
      public class InputSource implements SourceFunction<Long> {
      
          private boolean isRunning = true;
      
          private Long counter = 1L;
      
      
          @Override
          public void cancel() {
              isRunning = false;
          }
      
      
          @Override
          public void run(SourceContext<Long> context) throws Exception {
              while (isRunning) {
                  context.collect(counter);
                  counter++;
                  Thread.sleep(1000);
              }
      
          }
      }
      
  • 实现ParallelSourceFunction接口或者继承RichParallelSourceFunction来自定义具有并行度的数据源

2、transform

flink提供了很多算子,经常使用的有以下这些:

  • Map:输入一个元素,可以进行逻辑运算,输出一个元素

  • FlatMap:输入一个元素,输出多个或零个元素

  • Filter:元素过滤,符合条件的会保留

  • Union:合并多个流,必须保证合并的流必须是格式一致的

    修改InputSource的类型为String,再新增一个InputStringSource

    package streaming.source;
    
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    
    import java.util.Arrays;
    import java.util.List;
    import java.util.Random;
    
    
    /**
     * @author xiaolong
     */
    public class InputStringSource implements SourceFunction<String> {
    
        private boolean isRunning = true;
    
        private List<String> alphabet = Arrays.asList("a", "b", "c", "d", "e", "f", "g");
    
    
        @Override
        public void cancel() {
            isRunning = false;
        }
    
    
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            while (isRunning) {
                Random random = new Random();
                ctx.collect(alphabet.get(random.nextInt(alphabet.size())));
                Thread.sleep(1000);
            }
        }
    }
    

    测试代码:

    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import streaming.source.InputSource;
    import streaming.source.InputStringSource;
    
    
    /**
     * @author xiaolong
     */
    public class TestSource {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> source = env.addSource(new InputSource());
            DataStreamSource<String> source2 = env.addSource(new InputStringSource());
            source.union(source2).print();
            env.execute("testInputSource");
        }
    }
    

    输出结果如下:

3.2-合并流输出结果.png
  • Connect:只能合并两个流,可以不必保证流的格式一致性

  • coMap/coFlatMap:在ConnectedStream中使用这种函数,类似于Map和FlatMap

    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
    import org.apache.flink.util.Collector;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import streaming.source.InputSource;
    import streaming.source.InputStringSource;
    
    /**
     * @author xiaolong
     */
    public class TestSource {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<Long> intSource = env.addSource(new InputSource());
            DataStreamSource<String> strSource = env.addSource(new InputStringSource());
            DataStream<List<String>> result = intSource.connect(strSource).flatMap(new CoFlatMapFunction<Long, String, List<String>>() {
                List<String> list = new ArrayList<>();
    
                @Override
                public void flatMap1(Long aLong, Collector<List<String>> collector) throws Exception {
                    list.add(aLong.toString());
                    collector.collect(list);
                }
    
                @Override
                public void flatMap2(String s, Collector<List<String>> collector) throws Exception {
                    list.add(s);
                    collector.collect(list);
                }
            });
            result.print();
            env.execute("testInputSource");
        }
    }
    
    

测试结果:


3.4-合并不同流输出结果.png
  • Split:根据规则把一个流切分为多个流

  • Select:选择切分后的流,与Split配合使用

  • KeyBy:根据指定的Key进行分组,Key相同的数据会进入到同一个分区

  • Aggregation:聚合算子,例如sum,max等

  • Reduce:将上一条数据与当前数据进行聚合操作,返回一条新数据

    import org.apache.flink.api.common.functions.ReduceFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    
    /**
     * @author xiaolong
     */
    public class TestSource {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<Tuple2<Integer, Integer>> source = env.fromElements(Tuple2.of(1, 10), Tuple2.of(2, 20), Tuple2.of(2, 21), Tuple2.of(1, 11), Tuple2.of(2, 22));
            SingleOutputStreamOperator<Tuple2<Integer, Integer>> reduce = source.keyBy(0).reduce(new ReduceFunction<Tuple2<Integer, Integer>>() {
                @Override
                public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> t2, Tuple2<Integer, Integer> t1) throws Exception {
                    return new Tuple2<>(t1.f0, t2.f1 + t1.f1);
                }
            });
            reduce.print();
            env.execute("testInputSource");
        }
    }
    

    测试结果:

3.3-reduce输出.png
  • 分区:

    1. 随机分区:dataStream.shuffle();

    2. 重新平衡:dataStream.rebalance(),对数据进行再平衡、重分区和消除数据倾斜

    3. 重新调节:dataStream.rescale

      2和3的区别是rebalance会产生全量重分区,rescale重新调节的过程是,如果上游有4个并发操作,下游有2个并发,重新调节后上游的2个并发会分配给下游的1个并发操作,反之亦然。

    4. 自定义分区:自定义分区需要实现partitionCustom方法

      import org.apache.flink.api.common.functions.MapFunction;
      import org.apache.flink.api.common.functions.Partitioner;
      import org.apache.flink.api.java.tuple.Tuple1;
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      
      import java.util.Arrays;
      import java.util.List;
      
      import streaming.source.InputStringSource;
      
      
      /**
       * @author xiaolong
       */
      public class TestSource {
      
          public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              DataStreamSource<String> strSource = env.addSource(new InputStringSource());
              List<String> list = Arrays.asList("a", "b", "c", "d");
              strSource.map(new MapFunction<String, Tuple1<String>>() {
                  @Override
                  public Tuple1<String> map(String s) throws Exception {
                      return new Tuple1<>(s);
                  }
              }).partitionCustom(new Partitioner<String>() {
                  @Override
                  public int partition(String s, int i) {
                      System.out.println("分区个数:" + i);
                      if (list.contains(s)) {
                          return 0;
                      }else {
                          return 1;
                      }
                  }
              }, 0).print();
              env.execute("testFlinkJob");
          }
      }
      

      测试结果:


      3.5-自定义分区.png

3、sink

flink有如下几种sink操作:

  1. 标准输出:print()/printToErr()

  2. 输出到文档或socket:writeAsCsv,writeAsText,writeToSocket

  3. 写入到flink第三方存储:ElasticSearch,Redis,kafkaProducer等

    测试从socket读取数据,写入到kafka

    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
    import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
    
    import java.util.Properties;
    
    
    /**
     * @author xiaolong
     */
    public class TestSource {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> strSource = env.socketTextStream("localhost", 9000, "\n");
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "xxxxxx"); // brokers地址
            properties.put("transaction.timeout.ms", 15 * 60 * 1000); // 设置FlinkKafkaProducer011的超时时间,默认是1h, kafka服务默认事务超时时间是15min,如果不设置会报错
            FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
                    "kafkaDruid",  // kafka topic
                    new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()),  // 序列化
                    properties,      // properties
                    FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);  // kafka语义
    
            // 0.10+ 版本的 Kafka 允许在将记录写入 Kafka 时附加记录的事件时间戳;
            // 此方法不适用于早期版本的 Kafka
            myProducer.setWriteTimestampToKafka(true);
            strSource.addSink(myProducer);
            strSource.print();
            env.execute("testFlinkJob");
    
        }
    }
    

    socket输入:

3.7-socket输入.png

测试结果,到kafka平台上可查看到最新的消息:

3.6-数据写入到kafka.png
  1. 自定义输出,实现SinkFunction或RichSInkFunction接口
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350