SparkStreaming是基于spark的流计算框架,其可以实现高吞吐量的,具备容错机制的实时流数据处理。

图片.png
Spark Streaming将接收到的实时流数据,按照一定时间间隔,对数据进行拆分,最终得到一批批的结果。
DStream是sparkstreaming高度抽象化的离散流,可以理解为其底层由Kafka、Flume、HDFS,FIle,Socket实现。
本文通过java nio实现一个简单的 wordcount 文本分词统计小例子。
首先server端 start方法开启 一个 NioSocketServer。
public void start() throws IOException {
Selector select = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(port));
ssc.configureBlocking(false);
ssc.register(select, SelectionKey.OP_ACCEPT);
System.out.println("start NioSocketServer...");
while (running) {
int n = select.select(10);
if (n < 0) {
System.err.println("no select channel...");
break;
}
if (n > 0) {
Iterator<SelectionKey> iter = select.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
System.out.println(key.toString() + "\t" + key.isAcceptable() + "\t" + key.isValid() + "\t" + key.isReadable() + "\t" + key.isWritable());
// spark straming socketTextStream会不断产生选择器键,
// 这里加一个keyFilter是为了读完文件,关闭当前channel即可,不需要重复写。
if (keyFilter.contains(key.toString())) {
break;
}
keyFilter.add(key.toString());
if (key.isValid() && key.isAcceptable()) {
acceptkey(key);
} else if (key.isValid() && key.isWritable()) {
writekey(key);
}
}
}
}
}
在acceptkey方法中对socketchannel选择器健只注册OP_WRITE方法即可。
public void acceptkey(SelectionKey key) throws IOException {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel channel = ssc.accept();
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(true);
channel.register(key.selector(), SelectionKey.OP_WRITE);
}
从文件中读取一行行的文本,分词之后写入到socketchannel中即可。
public void writekey(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
Segment seg = HanLP.newSegment().enablePartOfSpeechTagging(true);
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(filepath), "gbk"));
String line = null;
while ((line = br.readLine()) != null) {
List<Term> terms = seg.seg(line);
if (terms.isEmpty())
continue;
StringBuilder builder = new StringBuilder();
for (Term t : terms) {
String word = t.word.trim();
if (word.length() > 1 && chinese.matcher(word).find()) {
builder.append(t.word);
builder.append(" ");
}
}
if (builder.length() > 0) {
builder.append("\n");
ByteBuffer byteBuf = ByteBuffer.wrap(builder.toString().getBytes("utf-8"));
channel.write(byteBuf);
try {
Thread.sleep(timeDelay);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
br.close();
channel.close();
}
NioSocketServer#main方法
public static void main(String[] args) throws IOException {
String filepath = NioSocketServer.class.getResource("/39.txt").getPath();
new NioSocketServer(filepath, 1000L, 9999).start();
}
Spark Streaming端 运行wordcount方法即可。
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")
val sc = new StreamingContext(conf, Seconds(1))
sc.sparkContext.setLogLevel("OFF")
val lines = sc.socketTextStream("127.0.0.1", 9999)
val words = lines.flatMap(_.split("\\s+"))
val count = words.map((_, 1)).reduceByKey(_ + _)
count.print()
sc.start()
sc.awaitTermination()
}
添加maven依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.3</version>
</dependency>
<dependency>
<groupId>com.hankcs</groupId>
<artifactId>hanlp</artifactId>
<version>portable-1.3.4</version>
</dependency>