FuseSource MqttClient使用

之前使用过Eclipse Paho附上文章链接Eclipse Paho 实现Android推送

讲述一下两者的使用感受

功能 FuseSource Eclipse Paho
MQTT 3.1.1 支持 支持
SSL 支持 支持
TLS 不支持 支持
自动重连 支持 支持
Blocking API 支持 不支持
在线API 没有
连续publish 支持 不支持
使用 方便 麻烦

之前一直都是使用Eclipse Paho, 后来发现一个Bug, 每publish一次连接都会断开大约一秒后自动重连, 不能连续不间断的发送数据, 被迫无奈之下转向新的阵营(FuseSource), 经测试可以连续不间断发送数据。

MQTT是机器对机器(M2M)/“物联网”连接协议。它被设计成一个非常轻量级的发布/订阅消息传输。对于需要较小代码空间和/或网络带宽较高的远程位置进行连接非常有用。
mqtt-client为MQTT提供了ASL 2.0许可的API。它负责自动重新连接到您的MQTT服务器,并在出现任何网络故障时恢复您的客户端会话。应用程序可以使用blocking API风格、 一个futures based API 或callback/continuations passingAPI风格。

从Maven使用

将以下内容添加到您的maven pom.xml文件中

<dependency>
  <groupId>org.fusesource.mqtt-client</groupId>
  <artifactId>mqtt-client</artifactId>
  <version>1.12</version>
</dependency>

从Gradle使用

将以下内容添加到您的gradle文件中

implementation 'org.fusesource.mqtt-client:mqtt-client:1.12'

从任何其他构建系统使用

下载 uber jar文件 并将其添加到您的项目, uber包含mqtt客户端从其他项目所依赖的所有精简依赖项。

配置MQTT连接

blocking, future, 和callback APIs都共享相同的连接设置。您可以创建MQTT该类的新实例,并使用连接和套接字相关选项对其进行配置。在尝试连接之前调用该方法(setHost)。

MQTT mqtt = new MQTT();
mqtt.setHost("localhost", 1883);
// or 
mqtt.setHost("tcp://localhost:1883");

配置MQTT选项

  • setClientId:用于设置会话的客户端ID。这是MQTT服务器用来识别setCleanSession(false);正在使用的会话的内容。该ID必须为23个字符或更少。默认为自动生成的ID(基于您的套接字地址,端口和时间戳)。
  • setCleanSession:如果希望MQTT服务器在客户端会话中保留主题订阅和确认位置,则设置为false。默认为true。
  • setKeepAlive:在几秒钟内配置Keep Alive计时器。定义从客户端收到的消息之间的最大时间间隔。它使服务器能够检测到到客户端的网络连接已经丢失,而无需等待较长的TCP / IP超时。
  • setUserName :设置用于对服务器进行身份验证的用户名。
  • setPassword :设置用于对服务器进行身份验证的密码。
  • setWillTopic:如果设置,服务器将在客户端发生意外断开连接时将客户端的Will消息发布到指定主题。
  • setWillMessage:将发送的意愿消息。默认为零长度的消息。
  • setWillQos:设置用于Will消息的服务质量。默认为QoS.AT_MOST_ONCE。
  • setWillRetain:如果您希望使用保留选项发布遗嘱,请设置为true。
  • setVersion:设置为“3.1.1”以使用MQTT版本3.1.1。否则默认为3.1协议版本。

配置重新连接

如果发生网络错误,连接将自动重新连接并重新建立消息传递会话。您可以控制尝试重新连接的频率,并使用以下方法定义重新连接尝试的最大次数:

  • setConnectAttemptsMax:在客户端首次尝试连接到服务器时,在将错误报告给客户端之前,尝试重新连接的最大次数。设置为-1以使用无限尝试。默认为-1。
  • setReconnectAttemptsMax:在先前建立了服务器连接之后,在将错误报告给客户端之前,重新连接尝试的最大次数。设置为-1以使用无限尝试。默认为-1。
  • setReconnectDelay:在第一次重新连接尝试之前,等待多长时间ms。默认为10。
  • setReconnectDelayMax:重新连接尝试之间等待的最长时间(以毫秒为单位)。默认为30,000。
  • setReconnectBackOffMultiplier:在重新连接尝试之间使用指数退避。设置为1以禁用指数退避。默认为2。

配置套接字选项

您可以使用以下方法调整某些套接字选项:

  • setReceiveBufferSize:设置内部套接字接收缓冲区的大小。
    默认为65536(64k)
  • setSendBufferSize:设置内部套接字发送缓冲区的大小。
    默认为65536(64k)
  • setTrafficClass:为从传输器发送的数据包设置IP标头中的流量类别或服务类型八位字节。默认为8这意味着流量应该针对吞吐量进行优化。

节流连接

如果您想要降低连接的读取或写入速度,请使用以下方法:

  • setMaxReadRate:设置此传输将在每秒接收数据的最大字节数。此设置限制读取,以便不超过速率。默认为0,禁用限制。
  • setMaxWriteRate:设置此传输器将在其每秒发送数据的最大字节数。此设置会限制写入操作,以便不会超出速率。默认为0,禁用限制。

使用SSL连接

如果要通过SSL / TLS而不是TCP进行连接,请为host字段使用“ssl://”或“tls://”URI前缀而不是“tcp://” 。用于更细粒度地控制使用哪种算法。支持的协议值是:

  • ssl:// - 使用SSL算法的JVM默认版本。
  • sslv*:// - 使用特定的SSL版本,其中*是您的JVM支持的版本。例:sslv3
  • tls:// - 使用JVM默认版本的TLS算法。
  • tlsv*:// - 在*您的JVM支持的版本中使用特定的TLS版本。例:tlsv1.1

SSLContext除非使用setSslContext方法配置MQTT实例,否则客户端将使用通过JVM系统属性配置 的默认JVM 。

SSL连接对内部线程池执行阻塞操作,除非您调用setBlockingExecutor方法来配置它们将使用的执行程序。

选择调度队列

一个HawtDispatch调度队列来同步访问连接。如果未通过该setDispatchQueue方法配置显式队列,则将为该连接创建一个新队列。如果您希望多个连接共享同一队列进行同步,那么设置显式队列可能会很方便。

使用BlockingAPI

MQTT.connectBlocking方法建立连接并为您提供与阻塞API的连接。

BlockingConnection connection = mqtt.blockingConnection();
connection.connect();

使用以下publish方法将消息发布到主题:

connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false);

您可以使用该subscribe方法订阅多个主题:

Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
byte[] qoses = connection.subscribe(topics);

然后使用receiveack 方法接收并确认消息的消耗:

Message message = connection.receive();
System.out.println(message.getTopic());
byte[] payload = message.getPayload();
// process the message then:
message.ack();

最后断开:

connection.disconnect();

使用FutureAPI

MQTT.connectFuture方法建立连接并为您提供与期货样式API的连接。所有针对连接的操作都是非阻塞的,并通过Future返回结果。

FutureConnection connection = mqtt.futureConnection();
Future<Void> f1 = connection.connect();
f1.await();

Future<byte[]> f2 = connection.subscribe(new Topic[]{new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)});
byte[] qoses = f2.await();

// We can start future receive..
Future<Message> receive = connection.receive();

// send the message..
Future<Void> f3 = connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false);

// Then the receive will get the message.
Message message = receive.await();
message.ack();

Future<Void> f4 = connection.disconnect();
f4.await();

使用基于回调/继续传递的API

MQTT.connectCallback方法建立一个连接并为您提供与回调风格API的连接。这是使用API​​风格最复杂的,但可以提供最佳性能。未来和阻塞API使用封面下的回调API。连接上的所有操作都是非阻塞的,操作的结果将传递给您实现的回调接口。

例:

final CallbackConnection connection = mqtt.callbackConnection();
connection.listener(new Listener() {

    public void onDisconnected() {
    }
    public void onConnected() {
    }

    public void onPublish(UTF8Buffer topic, Buffer payload, Runnable ack) {
        // You can now process a received message from a topic.
        // Once process execute the ack runnable.
        ack.run();
    }
    public void onFailure(Throwable value) {
        connection.close(null); // a connection failure occured.
    }
})
connection.connect(new Callback<Void>() {
    public void onFailure(Throwable value) {
        result.failure(value); // If we could not connect to the server.
    }

    // Once we connect..
    public void onSuccess(Void v) {

        // Subscribe to a topic
        Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
        connection.subscribe(topics, new Callback<byte[]>() {
            public void onSuccess(byte[] qoses) {
                // The result of the subcribe request.
            }
            public void onFailure(Throwable value) {
                connection.close(null); // subscribe failed.
            }
        });

        // Send a message to a topic
        connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() {
            public void onSuccess(Void v) {
              // the pubish operation completed successfully.
            }
            public void onFailure(Throwable value) {
                connection.close(null); // publish failed.
            }
        });

        // To disconnect..
        connection.disconnect(new Callback<Void>() {
            public void onSuccess(Void v) {
              // called once the connection is disconnected.
            }
            public void onFailure(Throwable value) {
              // Disconnects never fail.
            }
        });
    }
});

每个连接都有一个HawtDispatch调度队列,它用于处理套接字的IO事件。派发队列是一个执行器,提供IO的串行执行和处理事件,并用于确保连接的同步访问。

回调将执行与连接相关联的调度队列,因此可以安全地使用来自回调的连接,但不能在回调中执行任何阻塞操作。如果您需要执行可能会阻塞的某些处理,则必须将其发送到另一个线程池进行处理。此外,如果另一个线程需要与连接进行交互,它只能通过使用提交给连接调度队列的Runnable来完成。

在连接的调度队列上执行Runnable的示例:

connection.getDispatchQueue().execute(new Runnable(){
    public void run() {
      connection.publish( ..... );
    }
});
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,816评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,729评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,300评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,780评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,890评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,084评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,151评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,912评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,355评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,666评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,809评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,504评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,150评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,121评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,628评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,724评论 2 351

推荐阅读更多精彩内容