背景
对于金融交易软件,行情信息的实时更新是一个重要需求。后端团队已经提供了 WebSocket api,现在需求在安卓客户端使用 WebSocket 来接收后端实时推送的行情更新。
平台
Android
语言环境
Kolin
目标
- 在异步线程里建立 WebSocket 连接
- WebSocket 所有事件回调到主线程处理
- 支持消息背压(行情可能在短时间内有剧烈波动,消息量会激增)
实现
建立连接
WebSocket 采用了 okHttp 的内置实现 - RealWebSocket
,其中 WebSocketListener
提供了相应事件回调:
public abstract class WebSocketListener {
/**
* Invoked when a web socket has been accepted by the remote peer and may begin transmitting
* messages.
*/
public void onOpen(WebSocket webSocket, Response response) {
}
/** Invoked when a text (type {@code 0x1}) message has been received. */
public void onMessage(WebSocket webSocket, String text) {
}
/** Invoked when a binary (type {@code 0x2}) message has been received. */
public void onMessage(WebSocket webSocket, ByteString bytes) {
}
/**
* Invoked when the remote peer has indicated that no more incoming messages will be
* transmitted.
*/
public void onClosing(WebSocket webSocket, int code, String reason) {
}
/**
* Invoked when both peers have indicated that no more messages will be transmitted and the
* connection has been successfully released. No further calls to this listener will be made.
*/
public void onClosed(WebSocket webSocket, int code, String reason) {
}
/**
* Invoked when a web socket has been closed due to an error reading from or writing to the
* network. Both outgoing and incoming messages may have been lost. No further calls to this
* listener will be made.
*/
public void onFailure(WebSocket webSocket, Throwable t, @Nullable Response response) {
}
}
这里有两个地方需要注意,onClosed
和 onFailure
,当两个对等方都指示不再传输消息并且连接已成功释放时,onClosed
会被调用,之后不会再有任何回调,而由于从网络读取或写入错误而关闭 WebSocket 时,onFailure
会被调用,同样之后不会再有任何回调,注意此时 WebSocket 会被关闭而且 onClosed
不会再被调用,这是 RealWebSocket
的实现:
public final class RealWebSocket implements WebSocket, WebSocketReader.FrameCallback {
...
public void failWebSocket(Exception e, @Nullable Response response) {
Streams streamsToClose;
synchronized (this) {
if (failed) return; // Already failed.
failed = true;
streamsToClose = this.streams;
this.streams = null;
if (cancelFuture != null) cancelFuture.cancel(false);
if (executor != null) executor.shutdown(); //停止 executor
}
try {
listener.onFailure(this, e, response);
} finally {
//关闭流
closeQuietly(streamsToClose);
}
}
...
}
另外双方还需要一个心跳机制来检测连接状态,RealWebSocket
内心跳是一个空字节,即 0 byte,心跳逻辑与后端 api 一致,因此心跳不做修改,如果实际业务中心跳机制不同于此,就需要做修改了,这里暂且跳过。
下面我们来建立一个 WebSocket 连接(部分代码省略):
val okHttpClient = OkHttpClient.Builder()..........build()
val request = Request.Builder().url("这里输入连接地址").build()
val listener = object: WebSocketListener () {
...
}
val pingIntervalMillis = 1000 * 60 * 5 //心跳间隔 5 分钟
val realWss = RealWebSocket(request, listener, SecureRandom(), pingIntervalMillis)
realWss.connect(client) //建立连接
这样我们就能建立起一个 WebSocket 连接了,所有消息都会回调到我们定义的 listener
里。
异步回调
大家都知道 Android 网络操作一定不能放在主线程中,这一点就不做赘述了。这里我们使用了 RxJava2 的 Flowable
操作符以及 Scheduler
来进行线程调度,我们可以手动调用 onNext
来向订阅者发送 WebSocket 消息事件,同时 Flowable
也支持背压:
public enum BackpressureStrategy {
/**
* OnNext events are written without any buffering or dropping.
* Downstream has to deal with any overflow.
* <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
*/
MISSING,
/**
* Signals a MissingBackpressureException in case the downstream can't keep up.
*/
ERROR,
/**
* Buffers <em>all</em> onNext values until the downstream consumes it.
*/
BUFFER,
/**
* Drops the most recent onNext value if the downstream can't keep up.
*/
DROP,
/**
* Keeps only the latest onNext value, overwriting any previous value if the
* downstream can't keep up.
*/
LATEST
}
以上是 RxJava2 提供的几种背压策略,根据业务这里选择了 BUFFER
缓存所有收到的消息直到被消费。同时 BUFFER
也是默认的背压策略:
public final class FlowableCreate<T> extends Flowable<T> {
...
@Override
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;
switch (backpressure) {
...
default: {
//默认 BufferAsyncEmitter
emitter = new BufferAsyncEmitter<T>(t, bufferSize());
break;
}
}
...
}
...
}
这里我们注意到,BufferAsyncEmitter
实例化的过程中传入了 bufferSize()
,通过查看 Flowable
我们看到默认的 buffer size 是 128:
public abstract class Flowable<T> implements Publisher<T> {
/** The default buffer size. */
static final int BUFFER_SIZE;
static {
BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
}
...
}
这里先使用默认容量,不做修改。
接下来将 WebSocket 所有事件密封在一个类里,用作 Flowable 的消息类型:
sealed class WebSocketEvent {
class Open(webSocket: WebSocket?, val response: Response?) : WebSocketEvent()
class BinaryMessage(webSocket: WebSocket?, val bytes: ByteString?) : WebSocketEvent()
class StringMessage(webSocket: WebSocket?, val text: String?) : WebSocketEvent()
class Closing(webSocket: WebSocket?, val code: Int, val reason: String?) : WebSocketEvent()
class Closed(webSocket: WebSocket?, val code: Int, val reason: String?) : WebSocketEvent()
}
在 WebSocketListener
中向订阅者发送对应的事件:
class FlowableWebSocketListener(private val emitter: FlowableEmitter<WebSocketEvent>) : WebSocketListener() {
override fun onOpen(webSocket: WebSocket?, response: Response?) {
emitter.onNext(WebSocketEvent.Open(webSocket, response))
}
override fun onMessage(webSocket: WebSocket?, text: String?) {
emitter.onNext(WebSocketEvent.StringMessage(webSocket, text))
}
override fun onMessage(webSocket: WebSocket?, bytes: ByteString?) {
emitter.onNext(WebSocketEvent.BinaryMessage(webSocket, bytes))
}
override fun onClosing(webSocket: WebSocket?, code: Int, reason: String?) {
emitter.onNext(WebSocketEvent.Closing(webSocket, code, reason))
}
override fun onClosed(webSocket: WebSocket?, code: Int, reason: String?) {
emitter.onComplete()
}
override fun onFailure(webSocket: WebSocket?, t: Throwable?, response: Response?) {
if (!emitter.isCancelled) {
emitter.onError(t ?: IOException("WebSocket unknown error"))
emitter.onComplete()
}
}
}
接下来我们创建一个 Flowable
,在异步线程建立连接,消息发送到主线程做处理:
val flowable: Flowable<WebSocketEvent> = Flowable.create({ emitter ->
...
val listener = RxWebSocketListener(emitter)
val realWss = RealWebSocket(request, listener, SecureRandom(), pingIntervalMillis)
realWss.connect(client) //建立连接
}, BackpressureStrategy.BUFFER)
flowable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object: ResourceSubscriber<WebSocketEvent> {
...
override fun onStart() {
//ResourceSubscriber 会一次拉取 Long.MAX_VALUE 个消息,这里我们覆写先拉取一条
request(1)
}
override fun onNext(t: WebSocketEvent?) {
when(t){
WebSocketEvent.Open -> {...}
WebSocketEvent.BinaryMessage -> {...}
WebSocketEvent.StringMessage -> {...}
WebSocketEvent.Closing -> {...}
}
if (判断是否需要继续拉取消息) {
request(1)
}
}
override fun onComplete() {
onClosed()
dispose()
}
...
})
这里要注意 Schedulers.io()
创建工作线程的数量是没有上限的,因此在 WebSocket 关闭之后应该立即 dispose()
来释放,否则可能引发 OutOfMemory。
...
static final class CachedWorkerPool implements Runnable {
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// No cached worker found, so create a new one.
// 缓存中没有可复用的 ThreadWorker 时,创建一个新的 ThreadWorker
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
...
}
...
到此一个异步 WebSocket 连接就搭建完成了。
(转载请注明出处)