zookeeper的watcher

利用zk的watcher功能实时监控zk节点的变化,可以利用这个功能做报警、监控,例如监控kafka的broker,otter的node等,如果有节点挂掉,立时通知

class MonitorZk(var zkHost: String, var zkPath: String) extends Watcher {
  var zoo: ZooKeeper = new ZooKeeper(zkHost, 10000, this)
  var brokerListStr = ""
  var brokers: util.List[String] = _
  private val logger: Logger = Logger.getLogger(MonitorZk.getClass)

  override def process(event: WatchedEvent): Unit = {
    if (event == null) return
    //取得连接状态
    val state = event.getState
    //取得事件类型
    val eventType = event.getType
    //哪一个节点路径发生变更
    val nodePath = event.getPath
    brokers = zoo.getChildren(zkPath, true)

    import scala.collection.JavaConversions._

    brokers.foreach(x => {
      brokerListStr += s"$x "
    })

    if (KeeperState.SyncConnected eq state) {
      //连接成功
      // 没有任何节点,表示创建连接成功(客户端与服务器端创建连接成功后没有任何节点信息)
      if (EventType.None eq eventType) {
        logger.warn(s"成功链接zookeeper服务器,节点数量:${
          brokers.size()
        }, 节点列表:$brokerListStr")
      } else if (EventType.NodeCreated eq eventType) {
        //当服务器端创建节点的时候触发
        logger.warn(s"zookeeper服务端创建新的节点, 节点数量:${
          brokers.size()
        }, 节点列表:$brokerListStr")
      } else if (EventType.NodeDataChanged eq eventType) {
        //被监控该节点的数据发生变更的时候触发
        logger.warn(s"节点的数据更新,节点数量:${
          brokers.size()
        }, 节点列表:$brokerListStr")
      } else if (EventType.NodeChildrenChanged eq eventType) {
        // 对应本代码而言只能监控根节点的一级节点变更。如:在根节点直接创建一级节点,
        //或者删除一级节点时触发。如修改一级节点的数据,不会触发,创建二级节点时也不会触发。
        logger.warn(s"子节点发生变更,节点数量:${
          brokers.size()
        }, 节点列表:$brokerListStr")
      } else if (EventType.NodeDeleted eq eventType) {
        logger.warn(s"节点:$nodePath 被删除,节点数量:${
          brokers.size()
        }, 节点列表:$brokerListStr")
      }
    } else if (KeeperState.Disconnected eq state) logger.warn("客户端连接zookeeper服务器端失败")
    else if (KeeperState.Expired eq state) logger.warn("客户端与zookeeper服务器端会话失败")
    else if (KeeperState.AuthFailed eq state) logger.warn("权限认证失败")

    if (brokers.length < nodeCount.toInt) {
      logger.warn(s"报警 $title, 节点列表:$brokerListStr")
      SendMail.sendMail(title, s"节点列表:$brokerListStr", "html")
    }
    brokerListStr = ""
  }

}

object MonitorZk {
  def main(args: Array[String]): Unit = {
    val parseParam = new ZkParam
    val jCommander = new JCommander(parseParam, args: _*)
    if (parseParam.help) {
      jCommander.usage()
      System.exit(0)
    }

    val zt = new MonitorZk(parseParam.zkHost, parseParam.nodePath)
    zt.logger.warn("程序启动....")
    //这句必须先执行,才能实现监控
    zt.brokers = zt.zoo.getChildren(parseParam.nodePath, true)

    scala.sys.addShutdownHook({
      zt.logger.warn(s"${parseParam.title} 程序关闭")
    })
    //让主线程一直在
    Thread.sleep(Integer.MAX_VALUE)
  }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 大数据技术框架 1. 简介 2. Hadoop框架2.1. Hadoop-MapReduce2.1.1. 简介:2...
    sunTengSt阅读 14,132评论 1 77
  • 开始 这里不从代码的角度讲述watcher机制,直接通过抽象的文字进行描述,需要首先说明的是在zookeeper里...
    ImushroomT阅读 2,790评论 0 0
  • 摘要 要学习系统构架,ZooKeeper (下文简称zk)是无法绕开的开源技术。大型网站后台成百上千的分布式服务节...
    老吴学技术阅读 5,221评论 0 4
  • 变量及类型 类型: Numbers(数字): int(有符号整型) long(长整型) float(浮点型) co...
    久壑阅读 2,241评论 0 0
  • 言泽抓起沉萧的手,放到他手臂上,凑到沉萧面前亲了亲,露出一个邪魅的笑,声音低沉惑人,“萧萧,你知道你跟我进去,就意...
    咸鱼惠翻身阅读 7,954评论 0 5