原文链接:https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/run_example_quickstart.html#setting-up-a-maven-project
本指南从零开始,创建一个流分析的Flink项目,并在Flink集群中运行。
Wikipedia(维基百科)提供了一个IRC渠道,在这个渠道中所有编辑wiki的记录都会被记录下来。我们将通过Flink去读取这个渠道的数据,并通过一个指定的时间窗口去统计每个用户编辑的字节数。这个例子很简单,用Flink几分钟就可以实现了,但是会为你日后书写更加复杂的流式分析程序打上了坚实的基础。
创建一个Maven项目(Setting up a Maven Project)
我们将使用Flink的Maven原形来创建我们的项目结构,请参考Java API快速入门文档来获取跟多关于Flink Maven原形创建Flink项目的信息。(https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html)
对于我们这个例子来说,命令行如下:
$mvn archetype:generate\
-DarchetypeGroupId=org.apache.flink\
-DarchetypeArtifactId=flink-quickstart-java\
-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/\
-DarchetypeVersion=1.4-SNAPSHOT\
-DgroupId=wiki-edits\
-DartifactId=wiki-edits\
-Dversion=0.1\
-Dpackage=wikiedits\
-DinteractiveMode=false
你可以通过自己的喜好来编辑groupId,artifactId和package。通过上述参数,我们将创建一个如下图所示的Maven工程:
这里有一个已经添加了Flink依赖的pom.xml文件在根目录中,还有几个Flink的示例程序在src/main/java目录下.因为我们要从头开始,所以需要删掉这些示例:
$rm wiki-edits/src/main/java/wikiedits/*.java
最后一步,我们需要将Flink Wikipedia的connector(连接器)添加到maven依赖中,这样我们就可以在程序中直接调用了。编辑pom.xml中的dependencies模块,如下所示:
注意:flink-connector-wikiedits_2.11的依赖已经添加完毕。(这个例子都到Apache Samza的Hello Samza示例的启发)
编写一个Flink程序(Writing a Flink Program)
编码时间到,打开你喜欢的IDE并导入maven工程,或者打开文本编辑器并创建src/main/java/wikiedits/wikieditAnalysis.java文件
packagewikiedits;
public class WikipediaAnalysis{
public static void main(String [ ] args) throws Exception{
}
}
这段代码现在还很简单,但是我们后续会进一步的完善它。注意:我这里不会给出导入的语句,因为IDE会自动的去导入。在本节的最后我会展示包括导入语句在内的完整代码,如果你想跳过前面,可以将后面的完整代码写入你的编辑器中。
编写Flink程序的第一步就是创建一个StreamExecutionEnvironment(如果你写得是批处理的Job的话,就是ExecutionEnvironment)。StreamExecutionEnvironment(或者ExecutionEnvironment)将用来设置运行参数以及创建从外部系统读取数据的sources。接下来,我们将StreamExecutionEnvironment添加到main方法中:
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
接下来我们来创建一个读取Wikipedia IRC日志的source
DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
这创建了WikipediaEvent元素的DataStream,我们将进一步处理。在这个例子中我们主要关心的是每个用户在一个给定的时间窗内添加了或者删除了多少字节数。为此我们首先通过用户名来指定这个流的key,也就是说流上的操作必须把用户名考虑进去。在这个例子中在windows中的编辑总字节数必须是每一个唯一的用户。为了给Stream加key,我们需要提供一个KeySelector类,如下:
KeyedStream<WikipediaEditEvent,String>keyedEdits=edits
.keyBy(new KeySelector<WikipediaEditEvent,String>(){
@Override
public String getKey(WikipediaEditEventevent){
return event.getUser();
}
});
这给了我们一个WikipediaEditEvent类型的流,并有一个String类型的key—用户名。现在我们可以在这个流上加一个window(窗口),并计算在这个窗口中的元素的结果。Window(窗口)指定了要执行计算的流的一个分片。Window在执行无限数据流元素的聚合操作时是很有必要的。在我们的例子中我们需要计算每5秒钟用户编辑的字节数的聚合
DataStream<Tuple2<String,Long>>result=keyedEdits
.timeWindow(Time.seconds(5))
.fold(new Tuple2<>("",0L),new FoldFunction<WikipediaEditEvent,Tuple2<String,Long>>(){
@Override
public Tuple2<String,Long>fold(Tuple2<String,Long>acc,WikipediaEditEventevent){
acc.f0 = event.getUser();
acc.f1 += event.getByteDiff();
return acc;
}
});
第一个调用函数.timeWindow()指定了我们需要一个5秒钟的翻滚窗口,第二个调用函数指定了一个根据每个用户在每个窗口分片中的Fold转换操作。在我们的例子中我们以一个初始值(“”,0L)开始,并将时间窗口中每个用户的每次编辑的字节流添加进去,现在的Stream结果包含了每个用户在5秒钟内的产生的字节流Tuple2(String, Long),String是用户名,Long是字节数。
剩下的最后一件事就是将Stream打印到控制台并启动执行程序:
result.print();
see.execute();
最后一个调用对于启动一个Flink作业来说是非常有必要的。所有的操作,例如创建sources、transformations以及sinks都只是建立了一个内部的图而已,只有当execute()被调用的时候,这些操作才会被提交到集群中去执行或者在本地的机器上开始执行。
截止到现在的所有代码如下:
package wikiedits;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
public class WikipediaAnalysis {
public static void main(String [] args)throws Exception{
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
KeyedStream<WikipediaEditEvent,String> keyedEdits = edits
.keyBy(new KeySelector<WikipediaEditEvent,String>(){
@Override
public String getKey( WikipediaEditEvent event ){
returnevent.getUser();
}
});
DataStream<Tuple2<String,Long>> result = keyedEdits
.timeWindow(Time.seconds(5))
.fold(new Tuple2<>("",0L),new FoldFunction<WikipediaEditEvent,Tuple2<String,Long>>(){
@Override
public Tuple2<String,Long>fold(Tuple2<String,Long> acc,WikipediaEditEvent event){
acc.f0 = event.getUser();
acc.f1 += event.getByteDiff();
returnacc;
}
});
result.print();
see.execute();
}
}
你可以在你的IDE中去执行这个例子或者在Maven在命令行中执行:
$mvn clean package
$mvn exec:java -Dexec.mainClass = wikiedits.WikipediaAnalysis
第一条命令是构建我们的项目,第二条命令是执行我们的main方法的类,输出的结果如下:
1>(Fenix down,114)
6>(AnomieBOT,155)
8>(BD2412bot,-3690)
7>(IgnorantArmies,49)
3>(Ckh3111,69)
5>(Slade360,0)
7>(Narutolovehinata5,2195)
6>(Vuyisa2001,79)
4>(Ms Sarah Welch,269)
4>(KasparBot,-245)
每一行前面的数字告诉你打印输出的操作来自哪个并行实例
额外的训练:运行在集群中并写入Kafka
请在我们开始之前首先参考Flink搭建快速入门在你的机器中搭建一个Flink集群,参考Kafka快速入门搭建一个Kafka环境。
第一步:我们需要将Flink Kafka的connector(连接器)作为maven依赖,这样的话我们就可以直接调用Kafka Sink了,将Flink Kafka Connector添加到pom.xml文件的依赖部分:
下一步,我们需要修改我们的程序,我们将移除print() sink并使用Kafka sink来替代。新的代码如下:
result
.map(new MapFunction<Tuple2<String,Long>,String>(){
@Override
public String map(Tuple2<String,Long> tuple){
return tuple.toString();
}
})
.addSink(new FlinkKafkaProducer08<>("localhost:9092","wiki-result",new SimpleStringSchema()));
导入相关的类:
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.api.common.functions.MapFunction;
注意:我们首先得将Tuple2类型的Stream通过MapFunction转换成String类型的Stream。这样做主要是因为写String到Kafka会更加容易。接下来我们创建一个Kafka的sink。你可能还需要适配一下你集群的hostname和端口。”wiki-result”是Kafka stream的名称,这是我们接下来要创建的,在我们跑程序之前,通过Maven来构建我们的工程因为在集群中运行的时候我们需要用到jar包。
$mvn clean package
结果jar包保存在target的子目录中:target/wiki-edits-0.1.jar,接下来我们会用到这个jar包。现在我们准备启动Flink集群并在上面运行写Kafka数据的程序。切换到你安装Flink的目录并启动local集群。
$cd my/flink/directory
$bin/start-local.sh
同时我们还需要创建一个Kafka的Topic,这样我们才能往这个Topic中写数据:
$cd my/kafka/directory
$bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic wiki-results
现在我们准备在我们这个本地集群中运行我们的jar包:
$cd my/flink/directory
$bin/flink run -c wikiedits.WikipediaAnalysis path/to/wikiedits-0.1.jar
如果一切如期执行的话,我们将得到如下的结果输出:
03/08/2016 15:09:27 Job execution switched to status RUNNING.
03/08/2016 15:09:27 Source: Custom Source(1/1) switched to SCHEDULED
03/08/2016 15:09:27 Source: Custom Source(1/1) switched to DEPLOYING
03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED
03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING
03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to RUNNING
03/08/2016 15:09:27 Source: Custom Source(1/1) switched to RUNNING
你可以看到各个操作之间如何开始执行,这里只有两个操作,因为window之后的操作因为性能需要,合并到一个操作里去了。在Flink中,我们称之为chaining(链式操作)。
你可以使用Kafka的控制台consumer来检查Kafka的topic以观察程序的输出。
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic wiki-result
你可以通过运行在http://localhost:8081上的Flink仪表盘来查看你的集群资源信息及运行的任务的信息。
如果你点击你运行的任务的话,你还可以看到一个视图,在这个视图里你可以查看每一个操作,例如:查看已经处理过的元素的个数。