在Flink example中,有两个Wordcount的example特别类似,一个是batch下的WordCount一个是streaming下的WordCount,从用法上来讲也比较类似。
WordCount example
- batch下的WordCount样例
- 用法
在Flink on Yarn模式下,配置好HADOOP_CONF_DIR和YARN_CONF_DIR后执行
bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 -yqu exampleQ examples/batch/WordCount.jar
默认情况下如果不加input和output参数,会读取默认的数据WordCountData进行count,然后将结果输出到std out。执行完结果可以看到client端会有输出结果如下。
(wish,1)
(with,3)
(would,2)
(wrong,1)
(you,1)
Program execution finished
Job with JobID 21eca2a01dbbc9594525824e9590c453 has finished.
Job Runtime: 12435 ms
Accumulator Results:
- 71de74f85dc472654c31b6df79701cf5 (java.util.ArrayList) [170 elements]
- streaming下的WordCount样例
- 用法
对比batch的WordCount,执行命令如下
bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 -yqu exampleQ examples/streaming/WordCount.jar
默认情况下如果不加input和output参数,会读取默认的流数据WordCountData进行count,然后将结果输出到std out。执行完结果可以看到client端会有输出结果如下。
但是和batch样例不同的是在流处理中,client端并没有输出count结果的二元组。
问题出在哪了?
我们进入example的源码比较
batch下的WordCount
// get input data
DataSet<String> text;
if (params.has("input")) {
// read the text file from given input path
text = env.readTextFile(params.get("input"));
} else {
// get default test text data
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
text = WordCountData.getDefaultTextLineDataSet(env);
}
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
// emit result
if (params.has("output")) {
counts.writeAsCsv(params.get("output"), "\n", " ");
// execute program
env.execute("WordCount Example");
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
可以看到从默认的数据读到DataSet text中,然后对text进行map和reduce操作统计出结果counts,然后将counts结果print到std out中。
streaming下的WordCount
// get input data
DataStream<String> text;
if (params.has("input")) {
// read the text file from given input path
text = env.readTextFile(params.get("input"));
} else {
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
// get default test text data
text = env.fromElements(WordCountData.WORDS);
}
DataStream<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(0).sum(1);
// emit result
if (params.has("output")) {
counts.writeAsText(params.get("output"));
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
// execute program
env.execute("Streaming WordCount");
可以看到整个流程和batch没有太大区别,不过用的数据类型是DataStream,那么问题来了,为什么流处理下print没有将结果输出到client.
我们对比下DataSet的print和DataSteam的print区别。
/**
* Prints the elements in a DataSet to the standard output stream {@link System#out} of the JVM that calls
* the print() method. For programs that are executed in a cluster, this method needs
* to gather the contents of the DataSet back to the client, to print it there.
*
* <p>The string written for each element is defined by the {@link Object#toString()} method.
*
* <p>This method immediately triggers the program execution, similar to the
* {@link #collect()} and {@link #count()} methods.
*
* @see #printToErr()
* @see #printOnTaskManager(String)
*/
public void print() throws Exception {
List<T> elements = collect();
for (T e: elements) {
System.out.println(e);
}
}
可以看到首先是collect操作,客户端获得了DataSet的list,然后在客户端输出结果。由于存在数据收集传输的过程,所以也建议对大规模数据不要轻易使用print方法。
DataStream的print.
/**
* Writes a DataStream to the standard output stream (stdout).
*
* <p>For each element of the DataStream the result of {@link Object#toString()} is written.
*
* <p>NOTE: This will print to stdout on the machine where the code is executed, i.e. the Flink
* worker.
*
* @return The closed DataStream.
*/
@PublicEvolving
public DataStreamSink<T> print() {
PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();
return addSink(printFunction).name("Print to Std. Out");
}
可以看到只是增加了printsink,并没有把数据收集到client端,因此std out也不会在客户端进行,而是在这段代码的执行机器上进行,也就是Flink的TaskManager上。
结论
正是因为DataSet和DataStream关于print方法的实现不同,也导致了其行为不一致,所以出现了本文开始讲的两种WordCount输出行为的区别。