Spark Streaming 有时候需要使用 nc -lk 9999 开启一个终端来手动键入一些数据供 Streaming 来拉取数据,这种方式不能很好地模拟实时流,所以使用 Perl 6 的 react .. whenever 和 Promise 来搞很合适:
sub MAIN(Str :$host = 'localhost', Int :$port = 3333) {
my $vin = 'LSJA0000000000091';
my $last_meter = 0;
react {
whenever IO::Socket::Async.listen($host, $port) -> $conn {
react {
my Bool:D $ignore = True;
whenever Supply.interval(5).rotor(1, 1 => 1) {
$ignore = !$ignore;
}
whenever Supply.interval(1) {
next if $ignore;
print sprintf("\{'vin':'%s','createTime':%s,'mileage':%s}\n", $vin, DateTime.now.posix, $last_meter);
$conn.print: sprintf("\{'vin':'%s','createTime':%s,'mileage':%s}\n", $vin, DateTime.now.posix, $last_meter++);
}
whenever signal(SIGINT) {
say "Done.";
done;
}
}
}
CATCH {
default {
say .^name, ': ', .Str;
say "handled in $?LINE";
}
}
}
}
上面的代码会不断发送
{'vin':'LSJA0000000000091','createTime':1542358572,'mileage':0}

socket
这样的带当前时间戳的数据。怎么验证能不能接收到数据呢?使用 telnet:
telnet 0.0.0.0 3333
或者拷贝一个 Streaming Demo:
package com.github.yuvalitzchakov.structuredstateful
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
/**
*
*/
object readSocket {
def main(args: Array[String]): Unit = {
val host = "127.0.0.1"
val port = 3333
val spark: SparkSession = SparkSession.builder
.master("local[*]")
.appName("Stateful Structured Streaming")
.getOrCreate()
import spark.implicits._
val ds: Dataset[String] = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.load()
.as[String]
ds.writeStream
.outputMode(OutputMode.Append())
.trigger(Trigger.ProcessingTime("2 seconds"))
.format("console")
.option("truncate", "false") // 不截断显示
.start()
.awaitTermination()
}
}