这是我的第一篇博客,写的不好之处欢迎指正!
最近公司要做个聊天功能,使用到了stomp协议,参考了几位前辈的文章,做个总结,需要的小伙伴请拿走不谢。
关于stomp是什么可参考:https://www.jianshu.com/p/cee941ca0c09。
项目里之前用的okhttp3,但是okhttp3不支持stomp,无奈,我只好另引入了一个库。
StompProtocol官网地址:https://github.com/NaikSoftware/StompProtocolAndroid
1,引入依赖
在项目app目录下的build.gradle文件中添加
dependencies{
api"com.github.NaikSoftware:StompProtocolAndroid:1.6.4"
api"io.reactivex.rxjava2:rxjava:2.2.5"
}
2,创建连接
publicclassMainActivityextendsAppCompatActivity{
privateStompClientmStompClient;
//存放订阅的容器类、用来做资源回收,以免订阅没有被取消造成内存泄漏。
privateCompositeDisposablecompositeDisposable;
@Override
protectedvoidonCreate(BundlesavedInstanceState){
super.onCreate(savedInstanceState);
//隐藏标题栏
supportRequestWindowFeature(Window.FEATURE_NO_TITLE);
setContentView(R.layout.activity_main);
//初始化mStompClient,第二个参数填入你预连接的服务器地址。
mStompClient=Stomp.over(Stomp.ConnectionProvider.OKHTTP,url);
resetSubscriptions();
connectStomp();
}
publicvoidconnectStomp(){
//头信息集合,根据需要传入
List<StompHeader>stompHeaderList=newArrayList<>();
mStompClient.withClientHeartbeat(1000).withServerHeartbeat(1000);
resetSubscriptions();
//注意以下两个Disposable对象,第一个是创建连接的,第二个是创建订阅的,订阅可根据实际需求确定是否添加。
DisposabledispLifecycle=mStompClient.lifecycle()
.subscribeOn(Schedulers.io())
//.observeOn(AndroidSchedulers.mainThread())
.subscribe(lifecycleEvent->{
switch(lifecycleEvent.getType()){
caseOPENED:
//连接成功
break;
caseERROR:
Log.e(TAG,"Stompconnectionerror",lifecycleEvent.getException());
//出错了
break;
caseCLOSED:
//连接被关闭,可以在这里做重连操作。
resetSubscriptions();
connectStomp();
break;
caseFAILED_SERVER_HEARTBEAT:
break;
}
});
//将连接的Disposable对象加入compositeDisposable
compositeDisposable.add(dispLifecycle);
//订阅
DisposabledispTopic=mStompClient.topic("填入你的订阅地址")
.subscribeOn(Schedulers.io())
//.observeOn(AndroidSchedulers.mainThread())
.subscribe((StompMessagetopicMessage)->{
Log.d(TAG,"Received"+topicMessage.getPayload());
//取出消息,做逻辑处理
Stringbody=topicMessage.getPayload();
},throwable->{
//出错了
Log.e(TAG,"连接错误",throwable);
});
//将订阅的Disposable对象加入compositeDisposable
compositeDisposable.add(dispTopic);
//连接
mStompClient.connect(stompHeaderList);
}
privatevoidresetSubscriptions(){
if(compositeDisposable!=null){
compositeDisposable.dispose();
}
compositeDisposable=newCompositeDisposable();
}
3,发送消息
//第一个参数传订阅的路径,第二个传要发送的消息
//注意必须调用subscribe()函数。
mStompClient.send(path,object.toString()).subscribe();
4,资源回收
@Override
publicvoidonDestroy(){
//关闭连接和资源回收
mStompClient.disconnect();
if(compositeDisposable!=null){
compositeDisposable.dispose();
}
handler.removeCallbacksAndMessages(null);
super.onDestroy();
}
5,添加全局异常补货,否则连接失败后会闪退
RxJavaPlugins.setErrorHandler(newConsumer<Throwable>(){
@Override
publicvoidaccept(Throwablethrowable)throwsException{
}
});
最后别忘了在AndroidManifest.xml文件中添加网络访问权限
<uses-permissionandroid:name="android.permission.INTERNET"/>