2018-06-04 初识spark

什么是spark:
用户大数据计算的引擎
特点:非常快 原因:内存迭代运算
易用
通用
不能做什么?
不能做数据存储,依赖于hbase,hdfs

入门第一步 wordcount

object WordCount{
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("ScalaWordCount")
    //描述下mast spark://必带,集群master的ip:hadoop1, 端口号7077 可以在浏览器ip:8080查看界面,当然得是已经正常启动的spark  
    conf.set("spark.master","spark://hadoop1:7077")
    //非常重要的一个对象SparkContext
    val sc = new SparkContext(conf)
//hdfs 
    val textFile = sc.textFile("hdfs://hadoop1:9000/user/test.txt")
    val counts = textFile.flatMap(line => line.split(" "))
      .map(word => (word, 1))
      .reduceByKey(_ + _)
    counts.saveAsTextFile("hdfs://hadoop1:9000/user/testResult.txt")
  }
}

入门第二步:读写hive

object SparkHive {
//配置本地启动
  var conf = new SparkConf().setAppName("HiveApp").setMaster("local").setJars(List("F:\\ideaWorkSpace\\spark\\target\\scalaDemo.jar"))
  System.setProperty("hadoop.home.dir", "E:/hadoops")
  val sc = new SparkContext(conf);
  val sqlContext = new HiveContext(sc);
  
//加载数据
  def loadDate(filePath:String,tableName:String):Unit={
    sqlContext.sql("load data local inpath '"+filePath+"' into table "+tableName);
  }
//查询数据
  def getPeopleByName(name: String):Person={
    val row = sqlContext.sql("select * from people t where t.name='"+name+"'").collect().apply(0);
    return new Person(row.getAs[String](0),row.getAs[Int](1));
  }
//关闭连接
  def destory():Unit={
    conf = null;
    sc.stop();
  }
  def main(args: Array[String]): Unit = {
//测试
    var filePath = "D:/a.txt";
    val tableName = "people";
    sqlContext.sql("drop table if exists people");
    sqlContext.sql("create table people (name string,account int)  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE");
    sqlContext.sql("show tables").show();
    loadDate(filePath,tableName);
    sqlContext.sql("select * from people").collect().foreach(println)
    getPeopleByName("1");
    sc.stop()
  }
}
case class Person (name: String, var account: Int){

}

入门第三步:读写hbse

object SparkHbse {
  System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
  private val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local");
  private val sc = new SparkContext(sparkConf);
  val conf = HBaseConfiguration.create()
  //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
  conf.set("hbase.zookeeper.quorum","hadoop1,hadoop2,hadoop3")
  //设置zookeeper连接端口,默认2181
  conf.set("hbase.zookeeper.property.clientPort", "2181")

  def main(args: Array[String]): Unit = {
    //测试 新增,修改,查询
    val people1 = new Person("1",1000);
   /* val people2 = new Person("2",800);
    val persons =  List(people1,people2);*/
   /* SparkHbse.putRdd(persons,"table1")*/
    var hBaseRDD=SparkHbse.getTableRdd("table1");
    hBaseRDD = hBaseRDD.filter(peopleRdd => people1.name.equals(Bytes.toString(peopleRdd._2.getRow)));
    hBaseRDD.foreach{case (_,result) =>{
      //获取行键
      val key = Bytes.toString(result.getRow)
      println(key +":"+ people1.name +"=="+key.equals(people1.name))
      //通过列族和列名获取列
      val cid = Bytes.toInt(result.getValue("cf".getBytes,"cid".getBytes))
      println("Row key:"+key+" cid:"+cid)
    }}
    sc.stop();
  }
  def getTableRdd(tableName: String):RDD[(ImmutableBytesWritable,org.apache.hadoop.hbase.client.Result)]={
    conf.set(TableInputFormat.INPUT_TABLE, tableName)
    val admin = new HBaseAdmin(conf)
    //读取数据并转化成rdd
    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])
    admin.close();
    return  hBaseRDD;
  }
  def putRdd(persons: List[Person],tableName: String):Unit={
    val jobConf = new JobConf(conf);
    jobConf.setOutputFormat(classOf[TableOutputFormat]);
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
    val indataRdd = sc.makeRDD(persons);
    val rdd = indataRdd.map{person=>{
      val put = new Put(Bytes.toBytes(person.name));
      put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("cid"),Bytes.toBytes(person.account));
      (new ImmutableBytesWritable,put);
    }}
    rdd.saveAsHadoopDataset(jobConf);
  }
  def destory():Unit = {
    sc.stop()
  }
}

入门第四步:spark stream 读取kafka数据

object LogStream {
  def main(args: Array[String]): Unit = {
//设置本地启动
    val sparkConf = new SparkConf().setAppName("logStream").setMaster("local");
    println("start")
//线程必须大于0
    val numThreads = 1;
  //组id若未在kafka种设置可随意添加
    val groupId = "groupid"
//设置每6秒执行一次
    val ssc = new StreamingContext(sparkConf,Seconds(6))
    val topics = Set("log-flume");
    val brokers = "hadoop1:9092,hadoop2:9092,hadoop3:9092"
    val kafkaParam = Map[String,String]("metadata.broker.list" -> brokers,
      "serializer.class" -> "kafka.serializer.StringEncoder",
      "group.id" -> groupId,
//这个参数表示每次从头开始获取。。如果获取实时数据可不添加,此处用于测试
      "auto.offset.reset" -> OffsetRequest.SmallestTimeString);
    //获取所有数据
    val message = KafkaUtils.createDirectStream[String,String,StringDecoder
      ,StringDecoder](ssc,kafkaParam,topics).map(_._2);
    message.foreachRDD(lines=>{
        println(lines);
      })

    })
    ssc.start();
    ssc.awaitTermination();
  }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,033评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,725评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,473评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,846评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,848评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,691评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,053评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,700评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,856评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,676评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,787评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,430评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,034评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,990评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,218评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,174评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,526评论 2 343

推荐阅读更多精彩内容