使用IDEA运行sparkStreaming时报错Exception in thread "main" java.lang.NullPointerException
Exception in thread "main" java.lang.NullPointerException
at java.io.Reader.<init>(Reader.java:78)
at java.io.InputStreamReader.<init>(InputStreamReader.java:129)
at scala.io.BufferedSource.reader(BufferedSource.scala:24)
at scala.io.BufferedSource.bu
由于IDEA没有设置resources源,导致程序读取不到文件
val namesResource = this.getClass.getResourceAsStream("/names.csv")
解决办法是:在IDEA上设置数据源文件夹位置
具体设置如下:
源代码:
import java.io.PrintWriter
import java.net.ServerSocket
import org.apache.log4j.{Level, Logger}//设置打印内容的级别
import scala.util.Random
object StreamingProducer {
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.ERROR)//只打印ERROR级别的日志
val random = new Random()
// Maximum number of events per second
val MaxEvents = 6
// Read the list of possible names
val namesResource = this.getClass.getResourceAsStream("/names.csv")
val names = scala.io.Source.fromInputStream(namesResource)
.getLines()
.toList
.head
.split(",")
.toSeq
// Generate a sequence of possible products
val products = Seq(
"iPhone Cover" -> 9.99,
"Headphones" -> 5.49,
"Samsung Galaxy Cover" -> 8.95,
"iPad Cover" -> 7.49
)
/** Generate a number of random product events */
def generateProductEvents(n: Int) = {
(1 to n).map { i =>
val (product, price) = products(random.nextInt(products.size))
val user = random.shuffle(names).head
(user, product, price)
}
}
// create a network producer
val listener = new ServerSocket(9999)
println("Listening on port: 9999")
while (true) {
val socket = listener.accept()
new Thread() {
override def run = {
println("Got client connected from: " + socket.getInetAddress)
val out = new PrintWriter(socket.getOutputStream(), true)
while (true) {
Thread.sleep(1000)
val num = random.nextInt(MaxEvents)
val productEvents = generateProductEvents(num)
productEvents.foreach{ event =>
out.write(event.productIterator.mkString(","))
out.write("\n")
}
out.flush()
println(s"Created $num events...")
}
socket.close()
}
}.start()
}
}
}