熟悉一个框架最好的方法就是先熟悉例子,明白其工作原理。
所以,先照猫画虎写一下structured streaming中给的wordcount的例子
public class WordCount {
public static void main(String[] args) throws StreamingQueryException {
args = (String[]) Arrays.asList("hostname", "port").toArray(); //自己的主机名和spark端口
if(args.length < 2){
System.err.println("Usage:JavaStructuredNetworkWordCount localhost:7077");
System.exit(1);
}
String host = args[0];
int port = Integer.parseInt(args[1]);
SparkSession spark = SparkSession
.builder()
.master("local") //set the mater of spark,usually local,local[],Spark cluster address.
.appName("JavaStructuredNetworkWordCount")
.getOrCreate();
// Create DataFrame representing the stream of input lines from connection to localhost:9999
Dataset<Row> lines = spark
.readStream()
.format("socket") // 目前Source源只支持File 和 Socket 两种,如果是File的话format填正确的格式,
// 比如MySql的话.format("json")
.option("host", "localhost")
.option("port",9999)
.load();
// Split the lines into words
Dataset<String> words = lines
.as(Encoders.STRING())
.flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());
// Generate running word count
Dataset<Row> wordCounts = words.groupBy("value").count();
// Start running the query that prints the running counts to the console
//Object seconds;
StreamingQuery query = wordCounts.writeStream()
.outputMode("complete") //complete,append,update。
// 目前只支持前面两种complete,每次计算完成后,你都能拿到全量的计算结果。
// append,每次计算完成后,你能拿到增量的计算结果。
.format("console") ///输出则是四种console,parquet,memory,foreach 四种.foreach则是可以无限扩展的
//.trigger(ProcessingTime(5.seconds))
.start();
query.awaitTermination();
}
}
运行成功之后在开始兴奋的开启下一个征程吧~