SparkStreaming的幂等操作者

#### 幂等操作

​ **所谓的幂等操作,简单点,就是说,无论你执行多少次的操作,对于用户而言都是一次操作**

```

/**

  */

object _04KafkaOffsetIdempotent {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName("test").setMaster("local[2]")

    val processingInterval = 2

    val brokers = "bigdata01:9092,bigdata02:9092,bigdata03:9092"

    val topic = "mytopic1"

    val groupName =  "myspark"

    // Create direct kafka stream with brokers and topics

    val topicsSet = topic.split(",").toSet

    val kafkaParams = Map[String, String](

      "metadata.broker.list" -> brokers,

      "auto.offset.reset" -> "smallest",

      "group.id" -> groupName

    )

    /*

      1. 创建测试的mysql数据库

      create database mytest;

      2. 建表

      CREATE TABLE myorders(NAME VARCHAR(20), orderid VARCHAR(100) PRIMARY KEY);

      3. 新建topic: mytopic1

        kafka-topics.sh --zookeeper bigdata01:2181/kafka --create --topic mytopic1 --partitions 3 --replication-factor 1

      4. 往mytopic1发送数据, 数据格式为 "字符,数字"  比如  abc,3

    */

    val ssc = new StreamingContext(sparkConf, Seconds(processingInterval))

//读取kafka中信息

    val messages = KafkaManager.createMessage(ssc, kafkaParams, topicsSet, curator)

    val jdbcUrl =  "jdbc:mysql://localhost:3306/mytest"

    val jdbcUser = "root"

    val jdbcPassword = "sorry"

//判断是否有信息

    messages.foreachRDD(rdd=>{

      if(!rdd.isEmpty()) {

//获取偏移量

        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

        rdd.map(x => x._2).foreachPartition(partition => {

//获取数库连接

          val dbConn = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword)

          partition.foreach(msg => {

            val name = msg.split(",")(0)

            val orderid = msg.split(",")(1)

            val sql = s"insert into myorders(name, orderid) values ('$name', '$orderid') ON DUPLICATE KEY UPDATE name='${name}'"

//进行预编译sql

            val pstmt = dbConn.prepareStatement(sql)

            pstmt.execute()

            pstmt.close()

          })

          dbConn.close()

        })

//将偏移量写回redis

        KafkaManager.storeOffsets(rdd.asInstanceOf[HasOffsetRanges].offsetRanges, groupName, curator)

      }

    })

    ssc.start()

    ssc.awaitTermination()

  }

  val curator = {

//创建zookeeper连接(通过builder创建zookeeper对象)

val client = CuratorFrameworkFactory.builder()

// connectString参数是ZooKeeper服务的地址和端口号,对于集群情况下的多个ZooKeeper示例,之间使用逗号分隔.比如

        .connectString("bigdata01:2181,bigdata02:2181,bigdata03:2181")

retryPolicy参数是指在连接ZK服务过程中重新连接测策略.在它的实现类ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)中,baseSleepTimeMs参数代表两次连接的等待时间,maxRetries参数表示最大的尝试连接次数

        .retryPolicy(new ExponentialBackoffRetry(1000, 3))

        .namespace("kafka/consumers")//需要协商确定存储的zk的路径

相比于使用newClient()方法创建连接外,还可以使用builder()方法来控制更多的参数,如        .build()

    client.start()

    client

  }

}

**

## 用redis进行direct的偏移量offset管理对读取偏移量和写入

具体详情看[用redis进行direct的偏移量offset管理](https://mp.csdn.net/mdeditor/97967077#)

**

import kafka.common.TopicAndPartition

import kafka.message.MessageAndMetadata

import kafka.serializer.StringDecoder

import org.apache.curator.framework.CuratorFramework

import org.apache.spark.streaming.StreamingContext

import org.apache.spark.streaming.dstream.InputDStream

import org.apache.spark.streaming.kafka.{KafkaUtils, OffsetRange}

import org.apache.zookeeper.data.Stat

import scala.collection.mutable

object KafkaManager {

    /**

      * 更新offset

      *  /kafka/consumers/offsets/${topic}/${group}/${partition}

      */

    def storeOffsets(offsetRanges: Array[OffsetRange], group:String, curator:CuratorFramework) = {

        for (offsetRange <- offsetRanges) {

            val topic = offsetRange.topic

            val partition = offsetRange.partition

            val offset = offsetRange.untilOffset

            val path = s"/offset/${topic}/${group}/${partition}"

            checkExist(path, curator)

            curator.setData().forPath(path, (offset + "").getBytes())

        }

    }

    def createMessage(ssc: StreamingContext, kafkaParams: Map[String, String],

                      topics:Set[String], curator:CuratorFramework): InputDStream[(String, String)] = {

        //step 1 读取偏移量

        val fromOffsets:Map[TopicAndPartition, Long] = getFromOffsets(topics, kafkaParams("group.id"), curator)

        /*

            step 2 拉取kafka数据

                有offset

                    从相关偏移量的位置拉取数据

                无offset

                    从指定的offset.auto.reset对应的位置开始拉取数据

        */

        var messages:InputDStream[(String, String)] = null

        if(!fromOffsets.isEmpty) {

            //you偏移量

            val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)

            messages = KafkaUtils.createDirectStream[String, String,

                StringDecoder, StringDecoder,

                (String, String)](ssc,

                kafkaParams, fromOffsets,

                messageHandler)

        } else {

            //无偏移量

            messages = KafkaUtils.createDirectStream[String, String,

                StringDecoder, StringDecoder](ssc, kafkaParams, topics)

        }

        messages

    }

    /*

        获取topic和partition对应的偏移量

        1、既然是基于zk的方式来管理offset,那么就需要在zk中的某个目录对应节点中存储offset信息,每次再更新

        2、所以我们可以模仿基于receiver的方式来保存的offset

        3、操作zk得需要zk的client

        约定数据存储的目录

        /kafka/consumers/bd-1901-group-1/offsets/spark/0 -->基于receiver的方式

        模仿

        /kafka/consumers/offsets/${topic}/${group}/${partition}

                                                    |data-offset

    */

    def getFromOffsets(topics:Set[String], group:String, curator:CuratorFramework):Map[TopicAndPartition, Long] = {

        val fromOffset = mutable.Map[TopicAndPartition, Long]()

        import scala.collection.JavaConversions._

        for(topic <- topics) {

            val basePath = s"/offset/${topic}/${group}"

            //basepath可能存在,可能不存在

            checkExist(basePath, curator)

            val partitions = curator.getChildren.forPath(basePath)//partitions是一个java的集合

            for(partition <- partitions) {

                val path = s"${basePath}/${partition}"

                val offset = new String(curator.getData.forPath(path)).toLong

                fromOffset.put(TopicAndPartition(topic, partition.toInt), offset)

            }

        }

        fromOffset.toMap

    }

    def checkExist(path:String, curator:CuratorFramework): Unit = {

        val stat:Stat = curator.checkExists().forPath(path)

        if(stat == null) {

            //创建该目录

            curator.create()

                .creatingParentsIfNeeded()//如果目录中存在多级未创建的目录,需要指定递归创建目录的方式

                .forPath(path)

        }

    }

}

```

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容