发送数据到 socket

Spark Streaming 有时候需要使用 nc -lk 9999 开启一个终端来手动键入一些数据供 Streaming 来拉取数据,这种方式不能很好地模拟实时流,所以使用 Perl 6 的 react .. wheneverPromise 来搞很合适:

sub MAIN(Str :$host = 'localhost', Int :$port = 3333) {

    my $vin = 'LSJA0000000000091';
    my $last_meter = 0;
    
    react {
        whenever IO::Socket::Async.listen($host, $port) -> $conn {
            react {
                my Bool:D $ignore = True;
    
                whenever Supply.interval(5).rotor(1, 1 => 1) {
                    $ignore = !$ignore;
                }
        
                whenever Supply.interval(1) {
                    next if $ignore;
                    print sprintf("\{'vin':'%s','createTime':%s,'mileage':%s}\n", $vin, DateTime.now.posix, $last_meter);
                    $conn.print: sprintf("\{'vin':'%s','createTime':%s,'mileage':%s}\n", $vin, DateTime.now.posix, $last_meter++);
                }
            
                whenever signal(SIGINT) {
                    say "Done.";
                    done;
                }
            } 
        }
        CATCH {
            default {
                say .^name, ': ', .Str;
                say "handled in $?LINE";
            }
        }
    }
}

上面的代码会不断发送

{'vin':'LSJA0000000000091','createTime':1542358572,'mileage':0}
socket

这样的带当前时间戳的数据。怎么验证能不能接收到数据呢?使用 telnet

telnet 0.0.0.0 3333

或者拷贝一个 Streaming Demo:

package com.github.yuvalitzchakov.structuredstateful

import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}


/**
  * 
  */
object readSocket {

  def main(args: Array[String]): Unit = {

    val host = "127.0.0.1"
    val port = 3333

    val spark: SparkSession = SparkSession.builder
      .master("local[*]")
      .appName("Stateful Structured Streaming")
      .getOrCreate()

    import spark.implicits._

    val ds: Dataset[String] = spark.readStream
      .format("socket")
      .option("host", host)
      .option("port", port)
      .load()
      .as[String]

    ds.writeStream
      .outputMode(OutputMode.Append())
      .trigger(Trigger.ProcessingTime("2 seconds"))
      .format("console")
      .option("truncate", "false") // 不截断显示
      .start()
      .awaitTermination()
  }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容