安卓基于Netty搭建Mqtt Broker实现端对端通信

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(Publish/Subscribe)模式的轻量级通讯协议,该协议构建于TCP/IP协议上,和HTTP、FTP等协议一样属于应用层协议。Mqtt协议以其极少代码和有限的带宽为远程设备提供实时可靠消息服务的能力,使得其在物联网、小型设备和移动应用等方面有广泛的应用。
不同于WebSocket协议的C/S通信模式,MQTT是发布/订阅(Publish/Subscribe)通信模式,这样做能更好的解耦通信的双方,发布者不必知道订阅者的存在,同时发布者也可以是订阅者,这种模式中间件称之为broker(代理),通信的双方通过代理转发消息。同时mqtt协议中开发者还可以定义消息质量等级,灵活配置消息质量等级,确保消息发送和接收。此外客户端还可以设置遗嘱消息,在意外断开连接时可以给另外一方发送消息,让对方知道你出现了意外好做出响应的处理。具体介绍这里不展开,本篇文章主要介绍如何在安卓端基于Netty搭建Broker并实现端到端的通信。
Netty也是一个在物联网领域使用比较广泛的S/C框架,网上介绍相关文章也比较多,这里不做过多的介绍。下面直接上代码吧:

一、添加Netty依赖

直接导入jar包到模块lib目录下并添加依赖implementation files('libs/netty-all-4.1.24.Final.jar'),jar包这里提供4.1.24版本,其他版本可以在自行寻找下载下来。
链接:https://pan.baidu.com/s/1vwZRge47OLTh-qXv-o000g
提取码:tx4b

二、创建Broker服务

class MqttServer : Service(){

    private var mBossGroup: NioEventLoopGroup? = null
    private var mWorkerGroup: NioEventLoopGroup? = null

    private var mChannel: Channel? = null

    private val mHandlerMap = HashMap<String,MqttTransportHandler>()

    override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {

        startUpServer()
        return super.onStartCommand(intent, flags, startId)
    }

    private fun startUpServer(){
        try {
            mBossGroup = NioEventLoopGroup(1)
            mWorkerGroup = NioEventLoopGroup(20)
            val bootstrap = ServerBootstrap().apply {
                channel(NioServerSocketChannel::class.java).group(mBossGroup, mWorkerGroup)
                option(ChannelOption.TCP_NODELAY, true)//无阻塞 内部用LinkedHashMap存储 第一个参数为key 第二个参数为value
                option(ChannelOption.SO_BACKLOG,1024)
                option(ChannelOption.SO_REUSEADDR,true)
                option(ChannelOption.SO_KEEPALIVE, true)//长连接
                option(ChannelOption.SO_TIMEOUT, 15)
                option(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT)
                //配置连接的Channel
                childHandler(object : ChannelInitializer<SocketChannel>() {
                    override fun initChannel(p0: SocketChannel?) {
                        //核心业务处理对象
                        val handler = MqttTransportHandler(mHandlerMap)
                        val pipeline = p0?.pipeline()
                        //配置日志打印
                        pipeline?.addLast(LoggingHandler(LogLevel.INFO))
                        pipeline?.addLast(IdleStateHandler(180,0,0))
                        pipeline?.addLast("decoder",MqttDecoder())//添加数据发送解码器,netty内置了Mqtt协议的编解码器
                        pipeline?.addLast("encoder",MqttEncoder.INSTANCE)//添加数据接收编码器
                        //添加SSL验证
//                        val engine = ContextSSLFactory.getSSLContextServer()?.newEngine(p0?.alloc())
//                        engine?.run {
//                            useClientMode = false
                              //设置为true表示双向验证,服务端和客户端互相验证通过才能发起请求,false为单向验证,客户端验证服务端通过即可通信
//                            needClientAuth = false
//                        }
//                        pipeline?.addFirst("ssl",SslHandler(engine))
                        pipeline?.addLast(handler)//添加数据处理器

                        p0?.closeFuture()?.addListeners(handler)
                    }

                })
            }
            mChannel = bootstrap.bind(11883).addListener {
                if (it.isSuccess){
                    Log.e("Server","服务器启动成功")
                }else{
                    Log.e("Server","服务器启动失败")
                }
            }.sync().channel()
        }catch (e: Exception){
            e.printStackTrace()
        }
    }

    override fun onBind(intent: Intent?): IBinder? {
        return null
    }

    override fun onDestroy() {
        super.onDestroy()

        try {
            mChannel?.close()?.sync()
        }finally {
            mWorkerGroup?.shutdownGracefully()
            mBossGroup?.shutdownGracefully()
        }
    }
}

三、创建核心业务处理对象(ChannelHandler)

class MqttTransportHandler(val map: HashMap<String,MqttTransportHandler>) : ChannelInboundHandlerAdapter(),GenericFutureListener<Future<in Void>>{

    val MAX_SUPPORTED_QOS_LVL = MqttQoS.AT_LEAST_ONCE

    @Volatile
    private var connected = false
    //传输数据的对象
    private var mCtx: ChannelHandlerContext? = null

    @Volatile
    private var address: InetSocketAddress? = null

    /**
     * 当服务器收到请求时触发这个回调
     * 此时在这里进行消息的分发
     */
    override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
        mCtx = ctx
        if (msg is MqttMessage) {
            processMqttMsg(ctx, msg as MqttMessage)
        } else {
            ctx.close()
        }
    }

    private fun processMqttMsg(ctx: ChannelHandlerContext, msg: MqttMessage) {
        address = ctx.channel().remoteAddress() as InetSocketAddress
        if (msg.fixedHeader() == null) {
            processDisconnect(ctx)
            return
        }


        //根绝mqtt协议里定义的消息类型分发消息  对不同的请求做出不同的响应
        when (msg.fixedHeader().messageType()) {
            MqttMessageType.CONNECT -> processConnect(ctx, msg as MqttConnectMessage)//连接
            MqttMessageType.PUBLISH -> processPublish(ctx, msg as MqttPublishMessage)//推送
            MqttMessageType.SUBSCRIBE -> processSubscribe(ctx, msg as MqttSubscribeMessage)//订阅
            MqttMessageType.UNSUBSCRIBE -> processUnsubscribe(ctx, msg as MqttUnsubscribeMessage)//取消订阅
            MqttMessageType.PINGREQ -> if (checkConnected(ctx)) {//心跳
                ctx.writeAndFlush(
                    MqttMessage(
                        MqttFixedHeader(
                            MqttMessageType.PINGRESP,
                            false,
                            MqttQoS.AT_MOST_ONCE,
                            false,
                            0
                        )
                    )
                )
            }
            MqttMessageType.DISCONNECT -> if (checkConnected(ctx)) {//断开连接
                processDisconnect(ctx)
            }
            else -> {
            }
        }
    }
    
    /**
     * 处理服务器收到推送类型的消息
     */
    private fun processPublish(ctx: ChannelHandlerContext, mqttMsg: MqttPublishMessage) {
        if (!checkConnected(ctx)) {
            return
        }
        //推送来的消息都会携带一个topic 取出这个topic  服务器根据谁订阅了这个topic推送到对方去 
        val topicName = mqttMsg.variableHeader().topicName()
        val msgId = mqttMsg.variableHeader().packetId()
        //map是以topic为key,ChannelHandler对象为value,这里根据topic取出ChannelHandler做消息的发送。前面讲过每一个连接就是一个ChannelHandler 每个连接都会订阅一个topic
        map[topicName]?.processDevicePublish(ctx, mqttMsg, topicName, msgId)
    }

    private fun processDevicePublish(
        ctx: ChannelHandlerContext,
        mqttMsg: MqttPublishMessage,
        topicName: String,
        msgId: Int
    ) {
        //固定消息头 注意此处的消息类型PUBLISH mqtt协议
        val fixedHeader = MqttFixedHeader(MqttMessageType.PUBLISH,false,MqttQoS.AT_LEAST_ONCE,false,0)
        //可变头
        val variableHeader = MqttPublishVariableHeader(topicName,msgId)
        val publishMessage = MqttPublishMessage(fixedHeader,variableHeader,mqttMsg.content())
        //ChannelHandlerContext作为消息传输的对象 由它来完成消息的write和flush
        mCtx?.writeAndFlush(publishMessage)
    }

    /**
      * 处理订阅消息
      */
    private fun processSubscribe(ctx: ChannelHandlerContext, mqttMsg: MqttSubscribeMessage) {
        if (!checkConnected(ctx)) {
            return
        }
        val grantedQoSList: MutableList<Int> = ArrayList()
        for (subscription in mqttMsg.payload().topicSubscriptions()) {
            val topic = subscription.topicName()
            val reqQoS = subscription.qualityOfService()
            //根据topic存储ChannelHandler   后面服务器做消息的转发时需要取出对应的ChannelHandler才能保证消息的送达
            map[topic] = this
        }
        //ChannelHandlerContext发送一个SubAck消息  相当于服务器给客户端的一个响应  客户端可以根据ack消息做出处理
        ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList))
    }
    
    /**
      * 创建SubAck消息
      */
    private fun createSubAckMessage(msgId: Int, grantedQoSList: List<Int>): MqttSubAckMessage? {
        val mqttFixedHeader = MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_LEAST_ONCE, false, 0)
        val mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId)
        val mqttSubAckPayload = MqttSubAckPayload(grantedQoSList)
        return MqttSubAckMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttSubAckPayload)
    }

    private fun registerSubQoS(topic: String, grantedQoSList: MutableList<Int>, reqQoS: MqttQoS) {
        grantedQoSList.add(getMinSupportedQos(reqQoS))

    }

    private fun getMinSupportedQos(reqQoS: MqttQoS): Int {
        return Math.min(reqQoS.value(), MAX_SUPPORTED_QOS_LVL.value())
    }

    private fun processUnsubscribe(ctx: ChannelHandlerContext, mqttMsg: MqttUnsubscribeMessage) {
        if (!checkConnected(ctx)) {
            return
        }
        //todo 具体业务逻辑
        ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId()))
    }

    private fun processDisconnect(ctx: ChannelHandlerContext) {
        ctx.close()
    }

    private fun processConnect(ctx: ChannelHandlerContext, msg: MqttConnectMessage) {
        ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_ACCEPTED))
        connected = true
    }

    /**
      * 创建PubAck消息 客户端发送来消息 Broker做了转发后可以给publish的客户端发送一个ack消息作为响应
      */
    private fun createMqttPubAckMsg(requestId: Int): MqttPubAckMessage? {
        val mqttFixedHeader = MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_LEAST_ONCE, false, 0)
        val mqttMsgIdVariableHeader = MqttMessageIdVariableHeader.from(requestId)
        return MqttPubAckMessage(mqttFixedHeader, mqttMsgIdVariableHeader)
    }

    private fun createUnSubAckMessage(msgId: Int): MqttMessage? {
        val mqttFixedHeader = MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_LEAST_ONCE, false, 0)
        val mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId)
        return MqttMessage(mqttFixedHeader, mqttMessageIdVariableHeader)
    }
    
    /**
      * ConnectAck  连接请求的响应 让客户端知道连接服务器是否成功
      */
    private fun createMqttConnAckMsg(returnCode: MqttConnectReturnCode): MqttConnAckMessage? {
        val mqttFixedHeader = MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0)
        val mqttConnAckVariableHeader = MqttConnAckVariableHeader(returnCode, true)
        return MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader)
    }

    private fun checkConnected(ctx: ChannelHandlerContext): Boolean {
        return if (connected) {
            true
        } else {
            ctx.close()
            false
        }
    }


    override fun operationComplete(future: Future<in Void>) {
    }
}

至此代码就完结了,整个过程还是比较简单,这其中要注意两点:
1、明确netty框架中ChannelHandler的概念,mqtt协议中每个连到broker的客户端都是一个ChannelHandler;
2、明确ChannelHandlerContext的概念,此对象在netty中主要承担消息传输的职责,mqtt协议中broker作为一个消息的中转站,这其中就是靠ctx传输数据,实现端到端的消息互通。

以上两点就能保证mqtt协议中连接到broker的publisher和subscriber实现消息互通。

以上就是在安卓端基于netty框架搭建mqtt协议中的broker,网上介绍的大部分都是使用第三方平台EMQ搭建Broker或者后端开发者开发这块,希望这篇文章能帮助到有需求的人,当然上面代码只是一个简单的验证,更多复杂场景需要开发者根据自己需求拓展。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,711评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,079评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,194评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,089评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,197评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,306评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,338评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,119评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,541评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,846评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,014评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,694评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,322评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,026评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,257评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,863评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,895评论 2 351

推荐阅读更多精彩内容