参考http://spark.apache.org/docs/latest/streaming-programming-guide.html
1.运行demo测试
首先
$ nc -lk 9999hello world
然后
$ ./bin/run-example streaming.NetworkWordCount localhost 9999..
---------------Time: 1357008430000 ms-----
(hello,1)
(world,1)
...
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
import org.apache.log4j.{Level, Logger}
object SparkStreaming {
def main(args :Array[String]): Unit ={
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
}
}
spark Streaming 和kafka集成,wordcount实例
import kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.kafka.KafkaUtils
object KafkaStreaming {
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(10))
val kafkaParams = Map("metadata.broker.list" -> "master:9092,slave1:9092,slave2:9092")
// topics: Set[String]
val topics = Set("test")
val kafkaStream = org.apache.spark.streaming.kafka.KafkaUtils.
createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
kafkaStream.print()
val wordCountDStream = kafkaStream.flatMap(_.split(" ")).map(word => (word+"dd", 1)).reduceByKey(_ + _)
wordCountDStream.print()
ssc.start()
ssc.awaitTermination()
}
}