MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(Publish/Subscribe)模式的轻量级通讯协议,该协议构建于TCP/IP协议上,和HTTP、FTP等协议一样属于应用层协议。Mqtt协议以其极少代码和有限的带宽为远程设备提供实时可靠消息服务的能力,使得其在物联网、小型设备和移动应用等方面有广泛的应用。
不同于WebSocket协议的C/S通信模式,MQTT是发布/订阅(Publish/Subscribe)通信模式,这样做能更好的解耦通信的双方,发布者不必知道订阅者的存在,同时发布者也可以是订阅者,这种模式中间件称之为broker(代理),通信的双方通过代理转发消息。同时mqtt协议中开发者还可以定义消息质量等级,灵活配置消息质量等级,确保消息发送和接收。此外客户端还可以设置遗嘱消息,在意外断开连接时可以给另外一方发送消息,让对方知道你出现了意外好做出响应的处理。具体介绍这里不展开,本篇文章主要介绍如何在安卓端基于Netty搭建Broker并实现端到端的通信。
Netty也是一个在物联网领域使用比较广泛的S/C框架,网上介绍相关文章也比较多,这里不做过多的介绍。下面直接上代码吧:
一、添加Netty依赖
直接导入jar包到模块lib目录下并添加依赖implementation files('libs/netty-all-4.1.24.Final.jar')
,jar包这里提供4.1.24
版本,其他版本可以在自行寻找下载下来。
链接:https://pan.baidu.com/s/1vwZRge47OLTh-qXv-o000g
提取码:tx4b
二、创建Broker服务
class MqttServer : Service(){
private var mBossGroup: NioEventLoopGroup? = null
private var mWorkerGroup: NioEventLoopGroup? = null
private var mChannel: Channel? = null
private val mHandlerMap = HashMap<String,MqttTransportHandler>()
override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
startUpServer()
return super.onStartCommand(intent, flags, startId)
}
private fun startUpServer(){
try {
mBossGroup = NioEventLoopGroup(1)
mWorkerGroup = NioEventLoopGroup(20)
val bootstrap = ServerBootstrap().apply {
channel(NioServerSocketChannel::class.java).group(mBossGroup, mWorkerGroup)
option(ChannelOption.TCP_NODELAY, true)//无阻塞 内部用LinkedHashMap存储 第一个参数为key 第二个参数为value
option(ChannelOption.SO_BACKLOG,1024)
option(ChannelOption.SO_REUSEADDR,true)
option(ChannelOption.SO_KEEPALIVE, true)//长连接
option(ChannelOption.SO_TIMEOUT, 15)
option(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT)
//配置连接的Channel
childHandler(object : ChannelInitializer<SocketChannel>() {
override fun initChannel(p0: SocketChannel?) {
//核心业务处理对象
val handler = MqttTransportHandler(mHandlerMap)
val pipeline = p0?.pipeline()
//配置日志打印
pipeline?.addLast(LoggingHandler(LogLevel.INFO))
pipeline?.addLast(IdleStateHandler(180,0,0))
pipeline?.addLast("decoder",MqttDecoder())//添加数据发送解码器,netty内置了Mqtt协议的编解码器
pipeline?.addLast("encoder",MqttEncoder.INSTANCE)//添加数据接收编码器
//添加SSL验证
// val engine = ContextSSLFactory.getSSLContextServer()?.newEngine(p0?.alloc())
// engine?.run {
// useClientMode = false
//设置为true表示双向验证,服务端和客户端互相验证通过才能发起请求,false为单向验证,客户端验证服务端通过即可通信
// needClientAuth = false
// }
// pipeline?.addFirst("ssl",SslHandler(engine))
pipeline?.addLast(handler)//添加数据处理器
p0?.closeFuture()?.addListeners(handler)
}
})
}
mChannel = bootstrap.bind(11883).addListener {
if (it.isSuccess){
Log.e("Server","服务器启动成功")
}else{
Log.e("Server","服务器启动失败")
}
}.sync().channel()
}catch (e: Exception){
e.printStackTrace()
}
}
override fun onBind(intent: Intent?): IBinder? {
return null
}
override fun onDestroy() {
super.onDestroy()
try {
mChannel?.close()?.sync()
}finally {
mWorkerGroup?.shutdownGracefully()
mBossGroup?.shutdownGracefully()
}
}
}
三、创建核心业务处理对象(ChannelHandler)
class MqttTransportHandler(val map: HashMap<String,MqttTransportHandler>) : ChannelInboundHandlerAdapter(),GenericFutureListener<Future<in Void>>{
val MAX_SUPPORTED_QOS_LVL = MqttQoS.AT_LEAST_ONCE
@Volatile
private var connected = false
//传输数据的对象
private var mCtx: ChannelHandlerContext? = null
@Volatile
private var address: InetSocketAddress? = null
/**
* 当服务器收到请求时触发这个回调
* 此时在这里进行消息的分发
*/
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
mCtx = ctx
if (msg is MqttMessage) {
processMqttMsg(ctx, msg as MqttMessage)
} else {
ctx.close()
}
}
private fun processMqttMsg(ctx: ChannelHandlerContext, msg: MqttMessage) {
address = ctx.channel().remoteAddress() as InetSocketAddress
if (msg.fixedHeader() == null) {
processDisconnect(ctx)
return
}
//根绝mqtt协议里定义的消息类型分发消息 对不同的请求做出不同的响应
when (msg.fixedHeader().messageType()) {
MqttMessageType.CONNECT -> processConnect(ctx, msg as MqttConnectMessage)//连接
MqttMessageType.PUBLISH -> processPublish(ctx, msg as MqttPublishMessage)//推送
MqttMessageType.SUBSCRIBE -> processSubscribe(ctx, msg as MqttSubscribeMessage)//订阅
MqttMessageType.UNSUBSCRIBE -> processUnsubscribe(ctx, msg as MqttUnsubscribeMessage)//取消订阅
MqttMessageType.PINGREQ -> if (checkConnected(ctx)) {//心跳
ctx.writeAndFlush(
MqttMessage(
MqttFixedHeader(
MqttMessageType.PINGRESP,
false,
MqttQoS.AT_MOST_ONCE,
false,
0
)
)
)
}
MqttMessageType.DISCONNECT -> if (checkConnected(ctx)) {//断开连接
processDisconnect(ctx)
}
else -> {
}
}
}
/**
* 处理服务器收到推送类型的消息
*/
private fun processPublish(ctx: ChannelHandlerContext, mqttMsg: MqttPublishMessage) {
if (!checkConnected(ctx)) {
return
}
//推送来的消息都会携带一个topic 取出这个topic 服务器根据谁订阅了这个topic推送到对方去
val topicName = mqttMsg.variableHeader().topicName()
val msgId = mqttMsg.variableHeader().packetId()
//map是以topic为key,ChannelHandler对象为value,这里根据topic取出ChannelHandler做消息的发送。前面讲过每一个连接就是一个ChannelHandler 每个连接都会订阅一个topic
map[topicName]?.processDevicePublish(ctx, mqttMsg, topicName, msgId)
}
private fun processDevicePublish(
ctx: ChannelHandlerContext,
mqttMsg: MqttPublishMessage,
topicName: String,
msgId: Int
) {
//固定消息头 注意此处的消息类型PUBLISH mqtt协议
val fixedHeader = MqttFixedHeader(MqttMessageType.PUBLISH,false,MqttQoS.AT_LEAST_ONCE,false,0)
//可变头
val variableHeader = MqttPublishVariableHeader(topicName,msgId)
val publishMessage = MqttPublishMessage(fixedHeader,variableHeader,mqttMsg.content())
//ChannelHandlerContext作为消息传输的对象 由它来完成消息的write和flush
mCtx?.writeAndFlush(publishMessage)
}
/**
* 处理订阅消息
*/
private fun processSubscribe(ctx: ChannelHandlerContext, mqttMsg: MqttSubscribeMessage) {
if (!checkConnected(ctx)) {
return
}
val grantedQoSList: MutableList<Int> = ArrayList()
for (subscription in mqttMsg.payload().topicSubscriptions()) {
val topic = subscription.topicName()
val reqQoS = subscription.qualityOfService()
//根据topic存储ChannelHandler 后面服务器做消息的转发时需要取出对应的ChannelHandler才能保证消息的送达
map[topic] = this
}
//ChannelHandlerContext发送一个SubAck消息 相当于服务器给客户端的一个响应 客户端可以根据ack消息做出处理
ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList))
}
/**
* 创建SubAck消息
*/
private fun createSubAckMessage(msgId: Int, grantedQoSList: List<Int>): MqttSubAckMessage? {
val mqttFixedHeader = MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_LEAST_ONCE, false, 0)
val mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId)
val mqttSubAckPayload = MqttSubAckPayload(grantedQoSList)
return MqttSubAckMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttSubAckPayload)
}
private fun registerSubQoS(topic: String, grantedQoSList: MutableList<Int>, reqQoS: MqttQoS) {
grantedQoSList.add(getMinSupportedQos(reqQoS))
}
private fun getMinSupportedQos(reqQoS: MqttQoS): Int {
return Math.min(reqQoS.value(), MAX_SUPPORTED_QOS_LVL.value())
}
private fun processUnsubscribe(ctx: ChannelHandlerContext, mqttMsg: MqttUnsubscribeMessage) {
if (!checkConnected(ctx)) {
return
}
//todo 具体业务逻辑
ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId()))
}
private fun processDisconnect(ctx: ChannelHandlerContext) {
ctx.close()
}
private fun processConnect(ctx: ChannelHandlerContext, msg: MqttConnectMessage) {
ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_ACCEPTED))
connected = true
}
/**
* 创建PubAck消息 客户端发送来消息 Broker做了转发后可以给publish的客户端发送一个ack消息作为响应
*/
private fun createMqttPubAckMsg(requestId: Int): MqttPubAckMessage? {
val mqttFixedHeader = MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_LEAST_ONCE, false, 0)
val mqttMsgIdVariableHeader = MqttMessageIdVariableHeader.from(requestId)
return MqttPubAckMessage(mqttFixedHeader, mqttMsgIdVariableHeader)
}
private fun createUnSubAckMessage(msgId: Int): MqttMessage? {
val mqttFixedHeader = MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_LEAST_ONCE, false, 0)
val mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId)
return MqttMessage(mqttFixedHeader, mqttMessageIdVariableHeader)
}
/**
* ConnectAck 连接请求的响应 让客户端知道连接服务器是否成功
*/
private fun createMqttConnAckMsg(returnCode: MqttConnectReturnCode): MqttConnAckMessage? {
val mqttFixedHeader = MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0)
val mqttConnAckVariableHeader = MqttConnAckVariableHeader(returnCode, true)
return MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader)
}
private fun checkConnected(ctx: ChannelHandlerContext): Boolean {
return if (connected) {
true
} else {
ctx.close()
false
}
}
override fun operationComplete(future: Future<in Void>) {
}
}
至此代码就完结了,整个过程还是比较简单,这其中要注意两点:
1、明确netty框架中ChannelHandler的概念,mqtt协议中每个连到broker的客户端都是一个ChannelHandler;
2、明确ChannelHandlerContext的概念,此对象在netty中主要承担消息传输的职责,mqtt协议中broker作为一个消息的中转站,这其中就是靠ctx传输数据,实现端到端的消息互通。
以上两点就能保证mqtt协议中连接到broker的publisher和subscriber实现消息互通。
以上就是在安卓端基于netty框架搭建mqtt协议中的broker,网上介绍的大部分都是使用第三方平台EMQ搭建Broker或者后端开发者开发这块,希望这篇文章能帮助到有需求的人,当然上面代码只是一个简单的验证,更多复杂场景需要开发者根据自己需求拓展。