MINA客户端的应用应用体系结构
在上一篇文章中我们介绍了MINA服务器端的开发流程,这一章我给大家介绍MINA的客户端开发流程,客户端开发我们基于Android来实现相应的开发过程,首先让我们来看一下MINA客户端开发的应用体系结构图:
- 客户端首先创建一个IOConnector(MINA Construct (结构) 中连接Socket服务端的接口 ),初始化绑定Server
- 创建的一个会话,并且与Connection相关联
- 应用程序/客户端写入会话,遍历过过滤器链之后将数据发送到服务器
- 从服务器接收到的所有响应/消息都将遍历过滤器链,回调给IoHandler进行处理
Android端TCP代码示例
首先创建android项目,和普通的Android项目一致,引入对应的MINA客户端jar包:
按照客户端的体系结构,按顺序创建对应的对象:
/**
* 创建tcp客户端的连接
*/
public void createTcpConnect() throws InterruptedException {
if (ioSession == null) {
//首先创建对应的tcp连接
IoConnector ioConnector = new NioSocketConnector();
//设置超时时间
ioConnector.setConnectTimeoutMillis(CONNECT_TIMEOUT);
//添加过滤器
ioConnector.getFilterChain().addLast("logger", new LoggingFilter());//添加日志过滤器
ioConnector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory()));//设置字节处理过滤器
//添加IoHandler
ioConnector.setHandler(new ClientSessionHandler());
// IoSession ioSession;
//通过ConnectFuture来连接服务器
for (; ; ) {
try {
ConnectFuture future = ioConnector.connect(new InetSocketAddress(HOSTNAME, PORT));
future.awaitUninterruptibly();
ioSession = future.getSession();
break;//连接成功跳出死循环
} catch (Exception e) {
System.err.println("Failed to connect");
e.printStackTrace();
Thread.sleep(5000);//如果连接失败,5秒后继续连接直到连接成功
}
}
}
ioSession.write("tcp-ceshi");
}
和服务端不同的是,客户端创建了IoConnector用来连接服务器,其他设置属性,比如过滤器,Handler,等等设置都很相似。
客户端创建对应的IoSession来实现数据的发送,通过IoHandler来获取对应的服务端的数据。这里我们主要分析一下ConnectFuture来实现对应的连接,首先看看通过awaitUninterruptibly()方法实现连接成功:
//通过ConnectFuture来连接服务器
for (; ; ) {
try {
ConnectFuture future = ioConnector.connect(new InetSocketAddress(HOSTNAME, PORT));
future.awaitUninterruptibly();
ioSession = future.getSession();
break;//连接成功跳出死循环
} catch (Exception e) {
System.err.println("Failed to connect");
e.printStackTrace();
Thread.sleep(5000);//如果连接失败,5秒后继续连接直到连接成功
}
}
ConnectFuture是一个接口继承自IoFuture,而在使用过程中实际上使用的是默认实现类,我们通过源码来分析连接过程(AbstractPollingIoConnector.java):
/**
* {@inheritDoc}
*/
@Override
@SuppressWarnings("unchecked")
protected final ConnectFuture connect0(SocketAddress remoteAddress, SocketAddress localAddress,
IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
H handle = null;
boolean success = false;
try {
handle = newHandle(localAddress);
if (connect(handle, remoteAddress)) {
ConnectFuture future = new DefaultConnectFuture();//(1)
T session = newSession(processor, handle);
initSession(session, future, sessionInitializer);
// Forward the remaining process to the IoProcessor.
session.getProcessor().add(session);
success = true;
return future;
}
success = true;
} catch (Exception e) {
return DefaultConnectFuture.newFailedFuture(e);
} finally {
if (!success && handle != null) {
try {
close(handle);
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
}
}
}
ConnectionRequest request = new ConnectionRequest(handle, sessionInitializer);
connectQueue.add(request);
startupWorker();
wakeup();
return request;
}
我们可以看到,ConnectFuture的实现过程是,首先判断是否连接成功:
if (connect(handle, remoteAddress)) {
ConnectFuture future = new DefaultConnectFuture();//(1)
T session = newSession(processor, handle);
initSession(session, future, sessionInitializer);
// Forward the remaining process to the IoProcessor.
session.getProcessor().add(session);
success = true;
return future;
}
如果连接成功,创建默认的ConnectFuture对象,初始化创建有效连接的Session会话直接返回即可。但是如果尚没有连接成功,会创建ConnectionRequest 对象返回,我们该对象对应的源码 (AbstractPollingIoConnector.java内部类):
public final class ConnectionRequest extends DefaultConnectFuture {
/** The handle associated with this connection request */
private final H handle;
/** The time up to this connection request will be valid */
private final long deadline;
/** The callback to call when the session is initialized */
private final IoSessionInitializer<? extends ConnectFuture> sessionInitializer;
public ConnectionRequest(H handle, IoSessionInitializer<? extends ConnectFuture> callback) {
this.handle = handle;
long timeout = getConnectTimeoutMillis();
if (timeout <= 0L) {
this.deadline = Long.MAX_VALUE;
} else {
this.deadline = System.currentTimeMillis() + timeout;
}
this.sessionInitializer = callback;
}
public H getHandle() {
return handle;
}
public long getDeadline() {
return deadline;
}
public IoSessionInitializer<? extends ConnectFuture> getSessionInitializer() {
return sessionInitializer;
}
@Override
public boolean cancel() {
if (!isDone()) {
boolean justCancelled = super.cancel();
// We haven't cancelled the request before, so add the future
// in the cancel queue.
if (justCancelled) {
cancelQueue.add(this);
startupWorker();
wakeup();
}
}
return true;
}
}
我们可以看到,ConnectionRequest继承自DefaultConnectFuture ;创建完ConnectionRequest这个对象之后,就会将对应的对象实例添加到队列中去:
ConnectionRequest request = new ConnectionRequest(handle, sessionInitializer);
connectQueue.add(request);
startupWorker();
wakeup();
return request;
通过方法startupWorker()实现请求队列的轮询操作,具体是创建对应的Connector(实现Runnable),然后通过调用线程池去完成对应的操作,我们来看看Connector的实现过程 (AbstractPollingIoConnector.java内部类):
private class Connector implements Runnable {
public void run() {
assert (connectorRef.get() == this);
int nHandles = 0;
while (selectable) {
try {
// the timeout for select shall be smaller of the connect
// timeout or 1 second...
int timeout = (int) Math.min(getConnectTimeoutMillis(), 1000L);
int selected = select(timeout);
nHandles += registerNew();
// get a chance to get out of the connector loop, if we
// don't have any more handles
if (nHandles == 0) {
connectorRef.set(null);
if (connectQueue.isEmpty()) {
assert (connectorRef.get() != this);
break;
}
if (!connectorRef.compareAndSet(null, this)) {
assert (connectorRef.get() != this);
break;
}
assert (connectorRef.get() == this);
}
if (selected > 0) {
nHandles -= processConnections(selectedHandles());
}
processTimedOutSessions(allHandles());
nHandles -= cancelKeys();
} catch (ClosedSelectorException cse) {
// If the selector has been closed, we can exit the loop
ExceptionMonitor.getInstance().exceptionCaught(cse);
break;
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
ExceptionMonitor.getInstance().exceptionCaught(e1);
}
}
}
if (selectable && isDisposing()) {
selectable = false;
try {
if (createdProcessor) {
processor.dispose();
}
} finally {
try {
synchronized (disposalLock) {
if (isDisposing()) {
destroy();
}
}
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
} finally {
disposalFuture.setDone();
}
}
}
}
}
由源码我们可以看到具体的操作方法processConnections中,实现了主要操作:
/**
* Process the incoming connections, creating a new session for each valid
* connection.
*/
private int processConnections(Iterator<H> handlers) {
int nHandles = 0;
// Loop on each connection request
while (handlers.hasNext()) {
H handle = handlers.next();
handlers.remove();
ConnectionRequest connectionRequest = getConnectionRequest(handle);
if (connectionRequest == null) {
continue;
}
boolean success = false;
try {
if (finishConnect(handle)) {
T session = newSession(processor, handle);
initSession(session, connectionRequest, connectionRequest.getSessionInitializer());
// Forward the remaining process to the IoProcessor.
session.getProcessor().add(session);
nHandles++;
}
success = true;
} catch (Exception e) {
connectionRequest.setException(e);
} finally {
if (!success) {
// The connection failed, we have to cancel it.
cancelQueue.offer(connectionRequest);
}
}
}
return nHandles;
}
该方法判断连接是否完成,如果完成,创建初始化的Session对象;执行方法 initSession(session, connectionRequest, connectionRequest.getSessionInitializer())之后,就会将ConnectFuture中的状态码ready设置为true(这一块的代码暂时没有发现,但是通过打断点得到执行完该方法,ready=true),修改状态时,执行方法(ConnectFuture中的方法)setValue(...):
/**
* Sets the result of the asynchronous operation, and mark it as finished.
*
* @param newValue The result to store into the Future
* @return {@code true} if the value has been set, {@code false} if
* the future already has a value (thus is in ready state)
*/
public boolean setValue(Object newValue) {
synchronized (lock) {
// Allowed only once.
if (ready) {
return false;
}
result = newValue;
ready = true;
// Now, if we have waiters, notify them that the operation has completed
if (waiters > 0) {
lock.notifyAll();
}
}
// Last, not least, inform the listeners
notifyListeners();
return true;
}
执行该方法,就会调用notifyListeners方法来实现回调函数,由于awaitUninterruptibly()方法一直在阻塞等待ready状态变化来结束执行,接下来分别看看两种方式的执行:
1.awaitUninterruptibly():
/**
* {@inheritDoc}
*/
@Override
public ConnectFuture awaitUninterruptibly() {
return (ConnectFuture) super.awaitUninterruptibly();
}
awaitUninterruptibly方法最终实现方法:
/**
* Wait for the Future to be ready. If the requested delay is 0 or
* negative, this method immediately returns the value of the
* 'ready' flag.
* Every 5 second, the wait will be suspended to be able to check if
* there is a deadlock or not.
*
* @param timeoutMillis The delay we will wait for the Future to be ready
* @param interruptable Tells if the wait can be interrupted or not
* @return <tt>true</tt> if the Future is ready
* @throws InterruptedException If the thread has been interrupted
* when it's not allowed.
*/
private boolean await0(long timeoutMillis, boolean interruptable) throws InterruptedException {
long endTime = System.currentTimeMillis() + timeoutMillis;
if (endTime < 0) {
endTime = Long.MAX_VALUE;
}
synchronized (lock) {
// We can quit if the ready flag is set to true, or if
// the timeout is set to 0 or below : we don't wait in this case.
if (ready||(timeoutMillis <= 0)) {
return ready;
}
// The operation is not completed : we have to wait
waiters++;
try {
for (;;) {
try {
long timeOut = Math.min(timeoutMillis, DEAD_LOCK_CHECK_INTERVAL);
// Wait for the requested period of time,
// but every DEAD_LOCK_CHECK_INTERVAL seconds, we will
// check that we aren't blocked.
lock.wait(timeOut);
} catch (InterruptedException e) {
if (interruptable) {
throw e;
}
}
if (ready || (endTime < System.currentTimeMillis())) {
return ready;
} else {
// Take a chance, detect a potential deadlock
checkDeadLock();
}
}
} finally {
// We get here for 3 possible reasons :
// 1) We have been notified (the operation has completed a way or another)
// 2) We have reached the timeout
// 3) The thread has been interrupted
// In any case, we decrement the number of waiters, and we get out.
waiters--;
if (!ready) {
checkDeadLock();
}
}
}
}
因此我们可以知道,通过阻塞的方式实现最终的连接成功,接着获取对应的IoSession;
2.通过回调来实现方法:notifyListeners():
/**
* Notify the listeners, if we have some.
*/
private void notifyListeners() {
// There won't be any visibility problem or concurrent modification
// because 'ready' flag will be checked against both addListener and
// removeListener calls.
if (firstListener != null) {
notifyListener(firstListener);
firstListener = null;
if (otherListeners != null) {
for (IoFutureListener<?> listener : otherListeners) {
notifyListener(listener);
}
otherListeners = null;
}
}
}
@SuppressWarnings("unchecked")
private void notifyListener(IoFutureListener listener) {
try {
listener.operationComplete(this);
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
}
}
ConnectFuture回调的方法来实现获取Session的代码:
connFuture.addListener( new IoFutureListener(){
public void operationComplete(IoFuture future) {
ConnectFuture connFuture = (ConnectFuture)future;
if( connFuture.isConnected() ){
session = future.getSession();
try {
sendData();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
log.error("Not connected...exiting");
}
}
});
我们来看一下addListener的源码:
/**
* {@inheritDoc}
*/
public IoFuture addListener(IoFutureListener<?> listener) {
if (listener == null) {
throw new IllegalArgumentException("listener");
}
synchronized (lock) {
if (ready) {
// Shortcut : if the operation has completed, no need to
// add a new listener, we just have to notify it. The existing
// listeners have already been notified anyway, when the
// 'ready' flag has been set.
notifyListener(listener);
} else {
if (firstListener == null) {
firstListener = listener;
} else {
if (otherListeners == null) {
otherListeners = new ArrayList<IoFutureListener<?>>(1);
}
otherListeners.add(listener);
}
}
}
return this;
}
至此,MINA客户端的核心连接IoFuture的连接操作流程已经结束;
参考官网以及对应的源码包
Android端UDP代码示例
MINA客户端的UDP连接和TCP的连接在代码方面没有太大的区别,毕竟MINA实现方面使用的是统一的API接口,至于不同点,主要对于IoConnector的实现类不一样:
/**
* 创建udp客户端的连接
*/
public void createUdpConnect() throws InterruptedException {
if (udpUoSession == null) {
//首先创建对应的tcp连接
udpIoConnector = new NioDatagramConnector();
//设置超时时间
udpIoConnector.setConnectTimeoutMillis(CONNECT_TIMEOUT);
//添加过滤器
udpIoConnector.getFilterChain().addLast("logger", new LoggingFilter());//添加日志过滤器
udpIoConnector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory()));//设置字节处理过滤器
//添加IoHandler
udpIoConnector.setHandler(new ClientSessionHandler());
//通过ConnectFuture来连接服务器
for (; ; ) {
try {
ConnectFuture future = udpIoConnector.connect(new InetSocketAddress(HOSTNAME, PORT + 1));
future.awaitUninterruptibly();
udpUoSession = future.getSession();
break;//连接成功跳出死循环
} catch (Exception e) {
System.err.println("Failed to connect");
e.printStackTrace();
Thread.sleep(5000);//如果连接失败,5秒后继续连接直到连接成功
}
}
}
udpUoSession.write("udp-ceshi");
}
由代码我们可以发现,API的调用方式基本一致;
注:Android使用过程中,Socket连接不应该在主线程中实现,否则会报错
至此我们的MINA的ANDROID客户端开发大体的流程已经完成,后续会继续MINA各个单独模块进行使用以及源码分析。