Spark的ip编程题

部分数据如下

1.26.32.0|1.26.39.255|18489344|18491391|亚洲|中国|内蒙古|通辽||联通|150500|China|CN|122.263119|43.617429

1.26.40.0|1.26.43.255|18491392|18492415|亚洲|中国|内蒙古|赤峰||联通|150400|China|CN|118.956806|42.275317

1.26.44.0|1.26.95.255|18492416|18505727|亚洲|中国|内蒙古|通辽||联通|150500|China|CN|122.263119|43.617429

1.26.96.0|1.26.143.255|18505728|18518015|亚洲|中国|内蒙古|赤峰||联通|150400|China|CN|118.956806|42.275317

1.26.144.0|1.26.147.255|18518016|18519039|亚洲|中国|内蒙古|呼伦贝尔||联通|150700|China|CN|119.758168|49.215333

1.26.148.0|1.26.159.255|18519040|18522111|亚洲|中国|内蒙古|兴安盟||联通|152200|China|CN|122.070317|46.076268

1.26.160.0|1.26.171.255|18522112|18525183|亚洲|中国|内蒙古|通辽||联通|150500|China|CN|122.263119|43.617429

1.26.172.0|1.26.179.255|18525184|18527231|亚洲|中国|内蒙古|兴安盟||联通|152200|China|CN|122.070317|46.076268

1.26.180.0|1.26.195.255|18527232|18531327|亚洲|中国|内蒙古|赤峰||联通|150400|China|CN|118.956806|42.275317

1.26.196.0|1.26.207.255|18531328|18534399|亚洲|中国|内蒙古|呼伦贝尔||联通|150700|China|CN|119.758168|49.215333

将数据ip进行统计并写入mysql中

object IpCount {

  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf()

      .setAppName(this.getClass.getSimpleName)

      .setMaster("local[*]")

    val sc = new SparkContext(conf)

    //ip的规则数据

    val ipData: RDD[String] = sc.textFile("D:\\data\\data\\ip\\ip.txt")

    //用户数据

    val ipaccess: RDD[String] = sc.textFile("D:\\data\\data\\ip\\ipaccess.log")

    //获取用户的ip信息的十进制Long

    val rddUserIP: RDD[Long] = ipaccess.map(t => {

      val userIpStr: String = t.split("\\|")(1)

      ip2Long(userIpStr)

    })

    //不能在一个RDD中,操作另外一个RDD

  /* ipData.map(t=>{

      rddUserIP.map(m=>{

      })

    })*/

    //把规则数据提取到内存中

    val ipArr: Array[(Long, Long, String)] = ipData.map(t => {

      val str: Array[String] = t.split("\\|")

      val startIp: Long = str(2).toLong

      val endIp: Long = str(3).toLong

      val addr: String = str(6)

      (startIp, endIp, addr)

    }).coalesce(1).collect()

    val addrAndOne: RDD[(String, Int)] = rddUserIP.map(t => {

      //将用户的ip地址转换成省份和1

      val addr: String = binarySearch(t, ipArr)

      (addr, 1)

    })

    //对结果进行归并,并排序:按上网用户的数量降序

    val res: RDD[(String, Int)] = addrAndOne.reduceByKey(_+_).sortBy(t=>(-t._2,t._1))

  //将结果写入到SQL中

    //不能再driver端catch executor的异常

    res.foreachPartition(it=>{

      var connection : Connection = null

      var ps: PreparedStatement = null

      try{

        Class.forName("com.mysql.jdbc.Driver")

        connection= DriverManager.

          getConnection("jdbc:mysql://hdp03:3306/bdpro02?useUnicode=true&characterEncoding=utf-8","root","root")

        val sql = "insert into ipLog values(?,?)"

        ps= connection.prepareStatement(sql)

        it.foreach(t=>{

          ps.setString(1,t._1)

          ps.setInt(2,t._2)

          ps.executeUpdate()

        })

      }catch {

        case e:Exception => e.printStackTrace()

      }finally {

        if(connection!=null) connection.close()

        if(ps!=null) ps.close()

      }

    })

/*    res.foreach(t=>{

      //如果要在集群环境中运行,需要--jar

      //也可以,把mysqljar手动添加到lib中

      try{

        //try catch

        val statement: PreparedStatement = connection.prepareStatement("insert into ipLog values(?,?)")

        statement.setString(1,t._1)

        statement.setInt(2,t._2)

        statement.execute()

      }catch {

        case e:Exception=>{e.printStackTrace()}

      }

    })*/

    res.collect().foreach(println)

  }

  //从给定的数组中二分查找给定值

  //二分查找的数组必须是有序的

  def binarySearch(ipLong:Long,arr:Array[(Long, Long, String)]): String ={

    var start: Int = 0

    var end: Int = arr.length-1

    var addr:String= null;

    //这里必须有=,如果没有,就会缺少很多值

    while(end>=start){

      var midd: Int = (start+end)/2

      //如果给定的ip在该数组的中间值的起始ip和结束ip之间,则反回省份份

      if(ipLong>=arr(midd)._1&&ipLong<=arr(midd)._2){

        return arr(midd)._3

      }else if(ipLong<arr(midd)._1){

        end=midd-1

      }else{

        start=midd+1

      }

    }

    addr

  }

  def ip2Long(ip:String): Long ={

    val fragments = ip.split("[.]")

    var ipNum = 0L

    for (i <- 0 until fragments.length) {

      ipNum = fragments(i).toLong | ipNum << 8L

    }

    ipNum

  }

}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,287评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,346评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,277评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,132评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,147评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,106评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,019评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,862评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,301评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,521评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,682评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,405评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,996评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,651评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,803评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,674评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,563评论 2 352

推荐阅读更多精彩内容