1、抽象分层
- ProcessFunction:提供对时间、事件、状态的细粒度控制,用于处理一些复杂事件的逻辑上,易用性较低
- DataStreamApi&DataSet:核心api,提供对流/批数据的操作处理,基于函数式的,简单易用
- SQL&TableApi:flink sql的集成基于apache calcite,使用比其他api更灵活方便
2、datastream api
datastream api主要包含以下3块内容
1、datasource
数据的输入来源,来源方式主要有以下几种
-
来自文件:读取文本文件,将符合TextInputFormat规范的文件,将字符串返回
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = env.readTextFile("file:///filePath");
来自集合:fromCollection(Collection),fromElements(T ...)等
-
来自socket
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);
-
自定义输入
自定义输入源有两种方式:
-
实现SourceFunction接口来自定义无并行度的数据源
demo:每一秒产生一条数据的source
package streaming.source; import org.apache.flink.streaming.api.functions.source.SourceFunction; /** * @author xiaolong */ public class InputSource implements SourceFunction<Long> { private boolean isRunning = true; private Long counter = 1L; @Override public void cancel() { isRunning = false; } @Override public void run(SourceContext<Long> context) throws Exception { while (isRunning) { context.collect(counter); counter++; Thread.sleep(1000); } } }
-
- 实现ParallelSourceFunction接口或者继承RichParallelSourceFunction来自定义具有并行度的数据源
2、transform
flink提供了很多算子,经常使用的有以下这些:
Map:输入一个元素,可以进行逻辑运算,输出一个元素
FlatMap:输入一个元素,输出多个或零个元素
Filter:元素过滤,符合条件的会保留
-
Union:合并多个流,必须保证合并的流必须是格式一致的
修改InputSource的类型为String,再新增一个InputStringSource
package streaming.source; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Arrays; import java.util.List; import java.util.Random; /** * @author xiaolong */ public class InputStringSource implements SourceFunction<String> { private boolean isRunning = true; private List<String> alphabet = Arrays.asList("a", "b", "c", "d", "e", "f", "g"); @Override public void cancel() { isRunning = false; } @Override public void run(SourceContext<String> ctx) throws Exception { while (isRunning) { Random random = new Random(); ctx.collect(alphabet.get(random.nextInt(alphabet.size()))); Thread.sleep(1000); } } }
测试代码:
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import streaming.source.InputSource; import streaming.source.InputStringSource; /** * @author xiaolong */ public class TestSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = env.addSource(new InputSource()); DataStreamSource<String> source2 = env.addSource(new InputStringSource()); source.union(source2).print(); env.execute("testInputSource"); } }
输出结果如下:
Connect:只能合并两个流,可以不必保证流的格式一致性
-
coMap/coFlatMap:在ConnectedStream中使用这种函数,类似于Map和FlatMap
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.List; import streaming.source.InputSource; import streaming.source.InputStringSource; /** * @author xiaolong */ public class TestSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Long> intSource = env.addSource(new InputSource()); DataStreamSource<String> strSource = env.addSource(new InputStringSource()); DataStream<List<String>> result = intSource.connect(strSource).flatMap(new CoFlatMapFunction<Long, String, List<String>>() { List<String> list = new ArrayList<>(); @Override public void flatMap1(Long aLong, Collector<List<String>> collector) throws Exception { list.add(aLong.toString()); collector.collect(list); } @Override public void flatMap2(String s, Collector<List<String>> collector) throws Exception { list.add(s); collector.collect(list); } }); result.print(); env.execute("testInputSource"); } }
测试结果:
Split:根据规则把一个流切分为多个流
Select:选择切分后的流,与Split配合使用
KeyBy:根据指定的Key进行分组,Key相同的数据会进入到同一个分区
Aggregation:聚合算子,例如sum,max等
-
Reduce:将上一条数据与当前数据进行聚合操作,返回一条新数据
import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @author xiaolong */ public class TestSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Tuple2<Integer, Integer>> source = env.fromElements(Tuple2.of(1, 10), Tuple2.of(2, 20), Tuple2.of(2, 21), Tuple2.of(1, 11), Tuple2.of(2, 22)); SingleOutputStreamOperator<Tuple2<Integer, Integer>> reduce = source.keyBy(0).reduce(new ReduceFunction<Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> t2, Tuple2<Integer, Integer> t1) throws Exception { return new Tuple2<>(t1.f0, t2.f1 + t1.f1); } }); reduce.print(); env.execute("testInputSource"); } }
测试结果:
-
分区:
随机分区:dataStream.shuffle();
重新平衡:dataStream.rebalance(),对数据进行再平衡、重分区和消除数据倾斜
-
重新调节:dataStream.rescale
2和3的区别是rebalance会产生全量重分区,rescale重新调节的过程是,如果上游有4个并发操作,下游有2个并发,重新调节后上游的2个并发会分配给下游的1个并发操作,反之亦然。
-
自定义分区:自定义分区需要实现partitionCustom方法
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.Arrays; import java.util.List; import streaming.source.InputStringSource; /** * @author xiaolong */ public class TestSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> strSource = env.addSource(new InputStringSource()); List<String> list = Arrays.asList("a", "b", "c", "d"); strSource.map(new MapFunction<String, Tuple1<String>>() { @Override public Tuple1<String> map(String s) throws Exception { return new Tuple1<>(s); } }).partitionCustom(new Partitioner<String>() { @Override public int partition(String s, int i) { System.out.println("分区个数:" + i); if (list.contains(s)) { return 0; }else { return 1; } } }, 0).print(); env.execute("testFlinkJob"); } }
测试结果:
3、sink
flink有如下几种sink操作:
标准输出:print()/printToErr()
输出到文档或socket:writeAsCsv,writeAsText,writeToSocket
-
写入到flink第三方存储:ElasticSearch,Redis,kafkaProducer等
测试从socket读取数据,写入到kafka
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import java.util.Properties; /** * @author xiaolong */ public class TestSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> strSource = env.socketTextStream("localhost", 9000, "\n"); Properties properties = new Properties(); properties.put("bootstrap.servers", "xxxxxx"); // brokers地址 properties.put("transaction.timeout.ms", 15 * 60 * 1000); // 设置FlinkKafkaProducer011的超时时间,默认是1h, kafka服务默认事务超时时间是15min,如果不设置会报错 FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>( "kafkaDruid", // kafka topic new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), // 序列化 properties, // properties FlinkKafkaProducer011.Semantic.EXACTLY_ONCE); // kafka语义 // 0.10+ 版本的 Kafka 允许在将记录写入 Kafka 时附加记录的事件时间戳; // 此方法不适用于早期版本的 Kafka myProducer.setWriteTimestampToKafka(true); strSource.addSink(myProducer); strSource.print(); env.execute("testFlinkJob"); } }
socket输入:
测试结果,到kafka平台上可查看到最新的消息:
- 自定义输出,实现SinkFunction或RichSInkFunction接口