java写的程序,所以程序中使用的都是java的类和api,例如JavaStreamingContext,JavaReceiverInputDStream,JavaDStream。注意使用Java开头的类。
这个例子演示的是spark streaming接收socket数据。
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName("test");
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(5));
JavaReceiverInputDStream<String> lines = jsc.socketTextStream("127.0.0.1", 9999);
JavaDStream<String> flatMap = lines.flatMap((FlatMapFunction<String, String>) s -> {
String[] split = s.split(" ");
return Arrays.asList(split).iterator();
});
JavaPairDStream<String, Integer> mapToPair = flatMap.mapToPair(
(PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1)).reduceByKey(
(Function2<Integer, Integer, Integer>) (v1, v2) -> v1 + v2);
mapToPair.print();
jsc.start();
jsc.awaitTermination();
}
然后使用nc命令向9999端口发送数据。输入命令nc -lk 9998,然后输入数据即可。spark streaming程序就可以接收到数据了,spark streaming是把一段时间内接收到的数据当作一个批次,然后把这个批次转为rdd,之后就是调用rdd的api进行数据处理了。每个批次就执行一次,rdd的转换和输出流程。
注意:代码中的.setMaster("local[4]").的含义,该测试是在本地进行的也就是local模式。下面这篇文件写的很好:
https://blog.csdn.net/zpf336/article/details/82152286
StreamingListener
是spark提供的时间监听类,实现该类的接口即可。
看一下源码中都有哪些方法可以实现:
trait StreamingListener {
/** Called when the streaming has been started */
def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted) { }
/** Called when a receiver has been started */
def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }
/** Called when a receiver has reported an error */
def onReceiverError(receiverError: StreamingListenerReceiverError) { }
/** Called when a receiver has been stopped */
def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { }
/** Called when a batch of jobs has been submitted for processing. */
def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }
/** Called when processing of a batch of jobs has started. */
def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }
/** Called when processing of a batch of jobs has completed. */
def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }
/** Called when processing of a job of a batch has started. */
def onOutputOperationStarted(
outputOperationStarted: StreamingListenerOutputOperationStarted) { }
/** Called when processing of a job of a batch has completed. */
def onOutputOperationCompleted(
outputOperationCompleted: StreamingListenerOutputOperationCompleted) { }
}
下面用java实现一下该类,并测试这些方法:
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName("test");
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(15));
jsc.addStreamingListener(new TestListener());
JavaReceiverInputDStream<String> lines = jsc.socketTextStream("127.0.0.1", 9998);
JavaDStream<String> flatMap = lines.flatMap((FlatMapFunction<String, String>) s -> {
String[] split = s.split(" ");
return Arrays.asList(split).iterator();
});
JavaPairDStream<String, Integer> mapToPair = flatMap.mapToPair(
(PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1)).reduceByKey(
(Function2<Integer, Integer, Integer>) Integer::sum);
mapToPair.foreachRDD(new VoidFunction2<JavaPairRDD<String, Integer>, Time>() {
@Override
public void call(JavaPairRDD<String, Integer> v1, Time v2) throws Exception {
System.out.println("pair:: " + v1.collect() + " time: " + v2);
}
}
);
// mapToPair.print();
jsc.start();
jsc.awaitTermination();
}
static class TestListener implements StreamingListener {
@Override
public void onStreamingStarted(StreamingListenerStreamingStarted streamingStarted) {
System.out.println("onStreamingStarted");
}
@Override
public void onReceiverStarted(StreamingListenerReceiverStarted receiverStarted) {
System.out.println("onReceiverStarted");
}
@Override
public void onReceiverError(StreamingListenerReceiverError receiverError) {
System.out.println("onReceiverError");
}
@Override
public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {
System.out.println("onReceiverStopped");
}
@Override
public void onBatchSubmitted(StreamingListenerBatchSubmitted batchSubmitted) {
System.out.println("onBatchSubmitted");
}
@Override
public void onBatchStarted(StreamingListenerBatchStarted batchStarted) {
System.out.println("onBatchStarted");
}
@Override
public void onOutputOperationStarted(
StreamingListenerOutputOperationStarted outputOperationStarted) {
System.out.println("onOutputOperationStarted");
}
@Override
public void onOutputOperationCompleted(
StreamingListenerOutputOperationCompleted outputOperationCompleted) {
System.out.println("onOutputOperationCompleted");
}
@Override
public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {
BatchInfo batchInfo = batchCompleted.batchInfo();
Long batchTime = batchInfo.batchTime().milliseconds();
Long numRecords = batchInfo.numRecords();
Long totalDelay = (Long) Optional.ofNullable(batchInfo.totalDelay().getOrElse(null))
.orElse(-1L);
Long submissionTime = batchInfo.submissionTime();
Long processingDelay = (Long) Optional
.ofNullable(batchInfo.processingDelay().getOrElse(null)).orElse(-1L);
System.out.println("batchInfo :: " + batchInfo);
}
}
输出:
19:04:08: onStreamingStarted
19:04:08: onReceiverStarted
19:04:15: onBatchSubmitted
19:04:15: onBatchStarted
19:04:15: onOutputOperationStarted
19:04:15: pair:: [(,1), (a,2), (v,1), (c,2)] time:
19:04:15: onOutputOperationCompleted
19:04:15: batchInfo :: BatchInfo(1592996655000 ms,Map(0 -> StreamInputInfo(0,6,Map())),1592996655039,Some(1592996655044),Some(1592996655271),Map(0 -> OutputOperationInfo(1592996655000 ms,0,foreachRDD at DStreamingTest.java:56,org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.foreachRDD(JavaDStreamLike.scala:42)
SparkLockTest.DStreamingTest.main(DStreamingTest.java:56),Some(1592996655044),Some(1592996655271),None)))
19:04:30: onBatchSubmitted
19:04:30: onBatchStarted
19:04:30: onOutputOperationStarted
19:04:30: pair:: [(d,1), (,1), (a,1), (b,1), (c,1)] time:
19:04:30: onOutputOperationCompleted
19:04:30: batchInfo :: BatchInfo(1592996670000 ms,Map(0 -> StreamInputInfo(0,5,Map())),1592996670014,Some(1592996670015),Some(1592996670068),Map(0 -> OutputOperationInfo(1592996670000 ms,0,foreachRDD at DStreamingTest.java:56,org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.foreachRDD(JavaDStreamLike.scala:42)
SparkLockTest.DStreamingTest.main(DStreamingTest.java:56),Some(1592996670015),Some(1592996670068),None)))
19:04:38: onReceiverStopped
可以看到在spark启动时调用,onStreamingStarted和onReceiverStarted方法,表示启动和开始接收数据;一个批次结束时(该例子中也就是15秒时)调用onBatchSubmitted和onBatchStarted,表示批次开始和批次执行;onOutputOperationStarted和onOutputOperationCompleted表示输出开始和输出结束,之间就是对数据的处理,在该例子中是数据打印;当整个批次完成时调用onBatchCompleted;如果整个程序结束那么调用onReceiverStopped。