SparkStreaming WordCount 文本分词统计之NioSocketServer

SparkStreaming是基于spark的流计算框架,其可以实现高吞吐量的,具备容错机制的实时流数据处理。


图片.png

Spark Streaming将接收到的实时流数据,按照一定时间间隔,对数据进行拆分,最终得到一批批的结果。
DStream是sparkstreaming高度抽象化的离散流,可以理解为其底层由Kafka、Flume、HDFS,FIle,Socket实现。
本文通过java nio实现一个简单的 wordcount 文本分词统计小例子。

首先server端 start方法开启 一个 NioSocketServer。
public void start() throws IOException {
        Selector select = Selector.open();
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.bind(new InetSocketAddress(port));
        ssc.configureBlocking(false);
        ssc.register(select, SelectionKey.OP_ACCEPT);
        System.out.println("start NioSocketServer...");
        while (running) {
            int n = select.select(10);
            if (n < 0) {
                System.err.println("no select channel...");
                break;
            }
            if (n > 0) {
                Iterator<SelectionKey> iter = select.selectedKeys().iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    iter.remove();
                    System.out.println(key.toString() + "\t" + key.isAcceptable() + "\t" + key.isValid() + "\t" + key.isReadable() + "\t" + key.isWritable());
                    // spark straming socketTextStream会不断产生选择器键,
                    // 这里加一个keyFilter是为了读完文件,关闭当前channel即可,不需要重复写。
                    if (keyFilter.contains(key.toString())) {
                        break;
                    }
                    keyFilter.add(key.toString());
                    if (key.isValid() && key.isAcceptable()) {
                        acceptkey(key);
                    } else if (key.isValid() && key.isWritable()) {
                        writekey(key);
                    }
                }
            }
        }
    }
在acceptkey方法中对socketchannel选择器健只注册OP_WRITE方法即可。
public void acceptkey(SelectionKey key) throws IOException {
        ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
        SocketChannel channel = ssc.accept();
        channel.configureBlocking(false);
        channel.socket().setTcpNoDelay(true);
        channel.register(key.selector(), SelectionKey.OP_WRITE);
    }
从文件中读取一行行的文本,分词之后写入到socketchannel中即可。
public void writekey(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        Segment seg = HanLP.newSegment().enablePartOfSpeechTagging(true);
        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(filepath), "gbk"));
        String line = null;
        while ((line = br.readLine()) != null) {
            List<Term> terms = seg.seg(line);
            if (terms.isEmpty())
                continue;
            StringBuilder builder = new StringBuilder();
            for (Term t : terms) {
                String word = t.word.trim();
                if (word.length() > 1 && chinese.matcher(word).find()) {
                    builder.append(t.word);
                    builder.append(" ");
                }
            }
            if (builder.length() > 0) {
                builder.append("\n");
                ByteBuffer byteBuf = ByteBuffer.wrap(builder.toString().getBytes("utf-8"));
                channel.write(byteBuf);
                try {
                    Thread.sleep(timeDelay);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        br.close();
        channel.close();
    }
NioSocketServer#main方法
public static void main(String[] args) throws IOException {
        String filepath = NioSocketServer.class.getResource("/39.txt").getPath();
        new NioSocketServer(filepath, 1000L, 9999).start();
    }

Spark Streaming端 运行wordcount方法即可。
def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")
    val sc = new StreamingContext(conf, Seconds(1))
    sc.sparkContext.setLogLevel("OFF")



    val lines = sc.socketTextStream("127.0.0.1", 9999)
    val words = lines.flatMap(_.split("\\s+"))
    val count = words.map((_, 1)).reduceByKey(_ + _)
    count.print()

    sc.start()
    sc.awaitTermination()
  }
添加maven依赖
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.3</version>
        </dependency>
        <dependency>
            <groupId>com.hankcs</groupId>
            <artifactId>hanlp</artifactId>
            <version>portable-1.3.4</version>
        </dependency>
数据和代码:https://github.com/qianzhengyang/SparkStreamingProj/tree/master/WordCountNioSocketServer
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容