什么是spark:
用户大数据计算的引擎
特点:非常快 原因:内存迭代运算
易用
通用
不能做什么?
不能做数据存储,依赖于hbase,hdfs
入门第一步 wordcount
object WordCount{
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("ScalaWordCount")
//描述下mast spark://必带,集群master的ip:hadoop1, 端口号7077 可以在浏览器ip:8080查看界面,当然得是已经正常启动的spark
conf.set("spark.master","spark://hadoop1:7077")
//非常重要的一个对象SparkContext
val sc = new SparkContext(conf)
//hdfs
val textFile = sc.textFile("hdfs://hadoop1:9000/user/test.txt")
val counts = textFile.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://hadoop1:9000/user/testResult.txt")
}
}
入门第二步:读写hive
object SparkHive {
//配置本地启动
var conf = new SparkConf().setAppName("HiveApp").setMaster("local").setJars(List("F:\\ideaWorkSpace\\spark\\target\\scalaDemo.jar"))
System.setProperty("hadoop.home.dir", "E:/hadoops")
val sc = new SparkContext(conf);
val sqlContext = new HiveContext(sc);
//加载数据
def loadDate(filePath:String,tableName:String):Unit={
sqlContext.sql("load data local inpath '"+filePath+"' into table "+tableName);
}
//查询数据
def getPeopleByName(name: String):Person={
val row = sqlContext.sql("select * from people t where t.name='"+name+"'").collect().apply(0);
return new Person(row.getAs[String](0),row.getAs[Int](1));
}
//关闭连接
def destory():Unit={
conf = null;
sc.stop();
}
def main(args: Array[String]): Unit = {
//测试
var filePath = "D:/a.txt";
val tableName = "people";
sqlContext.sql("drop table if exists people");
sqlContext.sql("create table people (name string,account int) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE");
sqlContext.sql("show tables").show();
loadDate(filePath,tableName);
sqlContext.sql("select * from people").collect().foreach(println)
getPeopleByName("1");
sc.stop()
}
}
case class Person (name: String, var account: Int){
}
入门第三步:读写hbse
object SparkHbse {
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
private val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local");
private val sc = new SparkContext(sparkConf);
val conf = HBaseConfiguration.create()
//设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
conf.set("hbase.zookeeper.quorum","hadoop1,hadoop2,hadoop3")
//设置zookeeper连接端口,默认2181
conf.set("hbase.zookeeper.property.clientPort", "2181")
def main(args: Array[String]): Unit = {
//测试 新增,修改,查询
val people1 = new Person("1",1000);
/* val people2 = new Person("2",800);
val persons = List(people1,people2);*/
/* SparkHbse.putRdd(persons,"table1")*/
var hBaseRDD=SparkHbse.getTableRdd("table1");
hBaseRDD = hBaseRDD.filter(peopleRdd => people1.name.equals(Bytes.toString(peopleRdd._2.getRow)));
hBaseRDD.foreach{case (_,result) =>{
//获取行键
val key = Bytes.toString(result.getRow)
println(key +":"+ people1.name +"=="+key.equals(people1.name))
//通过列族和列名获取列
val cid = Bytes.toInt(result.getValue("cf".getBytes,"cid".getBytes))
println("Row key:"+key+" cid:"+cid)
}}
sc.stop();
}
def getTableRdd(tableName: String):RDD[(ImmutableBytesWritable,org.apache.hadoop.hbase.client.Result)]={
conf.set(TableInputFormat.INPUT_TABLE, tableName)
val admin = new HBaseAdmin(conf)
//读取数据并转化成rdd
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
admin.close();
return hBaseRDD;
}
def putRdd(persons: List[Person],tableName: String):Unit={
val jobConf = new JobConf(conf);
jobConf.setOutputFormat(classOf[TableOutputFormat]);
jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
val indataRdd = sc.makeRDD(persons);
val rdd = indataRdd.map{person=>{
val put = new Put(Bytes.toBytes(person.name));
put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("cid"),Bytes.toBytes(person.account));
(new ImmutableBytesWritable,put);
}}
rdd.saveAsHadoopDataset(jobConf);
}
def destory():Unit = {
sc.stop()
}
}
入门第四步:spark stream 读取kafka数据
object LogStream {
def main(args: Array[String]): Unit = {
//设置本地启动
val sparkConf = new SparkConf().setAppName("logStream").setMaster("local");
println("start")
//线程必须大于0
val numThreads = 1;
//组id若未在kafka种设置可随意添加
val groupId = "groupid"
//设置每6秒执行一次
val ssc = new StreamingContext(sparkConf,Seconds(6))
val topics = Set("log-flume");
val brokers = "hadoop1:9092,hadoop2:9092,hadoop3:9092"
val kafkaParam = Map[String,String]("metadata.broker.list" -> brokers,
"serializer.class" -> "kafka.serializer.StringEncoder",
"group.id" -> groupId,
//这个参数表示每次从头开始获取。。如果获取实时数据可不添加,此处用于测试
"auto.offset.reset" -> OffsetRequest.SmallestTimeString);
//获取所有数据
val message = KafkaUtils.createDirectStream[String,String,StringDecoder
,StringDecoder](ssc,kafkaParam,topics).map(_._2);
message.foreachRDD(lines=>{
println(lines);
})
})
ssc.start();
ssc.awaitTermination();
}
}