MINA原理详解

1. 通过SocketConnector同服务器端建立连接 

2. 链接建立之后I/O的读写交给了I/O Processor线程,I/O Processor是多线程的 

3. 通过I/O  Processor读取的数据经过IoFilterChain里所有配置的IoFilter,IoFilter进行消息的过滤,格式的转换,在这个层面可以制定一些自定义的协议 

4. 最后IoFilter将数据交给Handler进行业务处理,完成了整个读取的过程 

5. 写入过程也是类似,只是刚好倒过来,通过IoSession.write写出数据,然后Handler进行写入的业务处理,处理完成后交给IoFilterChain,进行消息过滤和协议的转换,最后通过I/O Processor将数据写出到socket通道 

IoFilterChain作为消息过滤链 

1. 读取的时候是从低级协议到高级协议的过程,一般来说从byte字节逐渐转换成业务对象的过程 

2. 写入的时候一般是从业务对象到字节byte的过程 

IoSession贯穿整个通信过程的始终 

整个过程可以用一个图来表现 


消息箭头都是有NioProcessor-N线程发起调用,默认情况下也在NioProcessor-N线程中执行 

Connector : 作为连接客户端,SocketConector用来和服务器端建立连接,连接成功,创建IoProcessor Thread(不能超过指定的processorCount),Thread由指定的线程池进行管理,IoProcessor 利用NIO框架对IO进行处理,同时创建IoSession。连接的建立是通过Nio的SocketChannel进行。 

NioSocketConnector connector = new NioSocketConnector(processorCount); 

ConnectFuture future = connector.connect(new InetSocketAddress(HOSTNAME, PORT));建立一个I/O通道 

Acceptor :作为服务器端的连接接受者,SocketAcceptor用来监听端口,同客户端建立连接,连接建立之后的I/O操作全部交给IoProcessor进行处理 

IoAcceptor acceptor = new NioSocketAcceptor(); 

acceptor.bind( new InetSocketAddress(PORT) ); 

Protocol : 利用IoFilter,对消息进行解码和编码,如以下代码通过 MyProtocolEncoder 将java对象转成byte串,通过MyProtocalDecoder 将byte串恢复成java对象

connector.getFilterChain().addLast("codec";,  new  ProtocolCodecFilter( new  MyProtocalFactory()));  

......  

public   class  MyProtocalFactory  implements  ProtocolCodecFactory {  

 ProtocolEncoderAdapter encoder = new  MyProtocolEncoder();

 ProtocolDecoder decoder = new  MyProtocalDecoder() ;

 public  ProtocolDecoder getDecoder(IoSession session)  throws  Exception {

 return  decoder;

 }

 public  ProtocolEncoder getEncoder(IoSession session)  throws  Exception {

 return  encoder;

 }

}  

......  

public   class  MyProtocalDecoder  extends  ProtocolDecoderAdapter  {  


 public   void  decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)  

   throws  Exception {  

 int   id  = in.getInt();

 int   len = in.getInt();

 byte []  dst =  new   byte [len];

 in.get(dst);

 String name = new  String(dst,"GBK");

 Item item = new  Item();

 item.setId(id);

 item.setName(name);

 out.write(item);

 }  

}  

......  

public   class  MyProtocolEncoder  extends  ProtocolEncoderAdapter {  


 public   void  encode(IoSession session, Object message,  

   ProtocolEncoderOutput out) throws  Exception {  

  Item item = (Item)message;  

  int  byteLen =  8  + item.getName().getBytes("GBK").length ;  

  IoBuffer buf = IoBuffer.allocate(byteLen);  

  buf.putInt(item.getId());  

  buf.putInt(item.getName().getBytes("GBK").length);  

  buf.put(item.getName().getBytes("GBK";));  

  buf.flip();  

  out.write(buf);  


 }  

}  

handler : 具体处理事件,事件包括:sessionCreated、sessionOpened、sessionClosed、sessionIdle、exceptionCaught、messageReceived、messageSent。 

connector.setHandler(new MyHandler());MyHandler继承IoHandlerAdapter类或者实现IoHandler接口.事件最终由IoProcessor线程发动调用。 

Processor : I/O处理器、允许多线程读写,开发过程中只需要指定线程数量,Processor通过Nio框架进行I/O的续写操作,Processor包含了Nio的Selector的引用。这点也正是mina的优势,如果直接用Nio编写,则需要自己编写代码来实现类似Processor的功能。正因为 I/O Processor是异步处理读写的,所以我们有时候需要识别同一个任务的消息,比如一个任务包括发送消息,接收消息,反馈消息,那么我们需要在制定消息格式的时候,消息头里能包含一个能识别是同一个任务的id。 

I/O Porcessor线程数的设置 :如果是SocketConnector,则可以在构造方法中指定,如:new SocketConnector(processorCount, Executors.newCachedThreadPool());如果是SocketAcceptor,也是一样的:SocketAcceptor acceptor = new SocketAcceptor(ProcessorCount, Executors.newCachedThreadPool()); 

processorCount为最大Porcessor线程数,这个值可以通过性能测试进行调优,默认值是cpu核数量+1(Runtime.getRuntime().availableProcessors() + 1)。 

比较奇怪的是,每个IoProcessor在创建的时候会本地自己和自己建立一个连接? 

IoSession : IoSession是用来保持IoService的上下文,一个IoService在建立Connect之后建立一个IoSession(一个连接一个session),IoSession的生命周期从Connection建立到断开为止 

IoSession做两件事情: 

1.通过IoSession可以获取IoService的所有相关配置对象(持有对IoService,Processor池,SocketChannel,SessionConfig和IoService.IoHandler的引用) 

2.通过IoSession.write 是数据写出的入口 

关于线程 

ThreadModel 1.x版本的mina还有线程模式选项在2.x之后就没有了 

1.x版本指定线程模式 

SocketConnectorConfig cfg = new SocketConnectorConfig();

cfg.setThreadModel(ThreadModel.MANUAL);

MINA有3种worker线程 

Acceptor、Connector、I/O processor 线程 

Acceptor Thread: 一般作为服务器端链接的接收线程,实现了接口IoService,线程的数量就是创建SocketAcceptor 的数量 

Connector Thread :一般作为客户端的请求建立链接线程,实现了接口IoService,维持了一个和服务器端Acceptor的一个链接,线程数量就是创建SocketConnector 的数量 

Mina的SocketAcceptor和SocketConnector均是继承了BaseIoService,是对IoService的两种不同的实现 

I/O processor Thread :作为I/O真正处理的线程,存在于服务器端和客户端,用来处理I/O的读写操作,线程的数量是可以配置的,默认最大数量是CPU个数+1 

服务器端:在创建SocketAcceptor的时候指定ProcessorCount 

SocketAcceptor acceptor = new SocketAcceptor(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool()); 

客户端:在创建SocketConnector 的时候指定ProcessorCount 

SocketConnector connector = new SocketConnector(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool());


I/O Processor Thread,是依附于IoService,类似上面的例子SocketConnector connector = new SocketConnector(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool());是指SocketConnector这个线程允许CPU+1个I/O Processor Thread 

NioProcessor虽然是多线程,但是对与一个连接的时候业务处理只会使用一个线程进行处理(Processor线程对于一个客户端连接只使用一个线程NioProcessor-n)如果handler的业务比较耗时,会导致NioProcessor线程堵塞 ,在2个客户端同时连接上来的时候会创建第2个(前提是第1个NioProcessor正在忙),创建的最大数量由Acceptor构造方法的时候指定。如果:一个客户端连接同服务器端有很多通信,并且I/O的开销不大,但是Handler处理的业务时间比较长,那么需要采用独立的线程模式,在 FilterChain的最后增加一个ExecutorFitler : 

acceptor.getFilterChain().addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool())); 

这样可以保证processor和handler的线程是分开的,否则:客户端发送3个消息,而服务器对于每个消息要处理10s左右,那么这3个消息是被串行处理,在处理第一个消息的时候,后面的消息将被堵塞,同样反过来客户端也有同样的问题。 

客户端Porcessor堵塞测试情况: 

1. 以下代码在建立连接后连续发送了5个消息(item)

ConnectFuture future = connector.connect( new  InetSocketAddress(HOSTNAME, PORT));

future.awaitUninterruptibly();

session = future.getSession();

Item item = new  Item();

item.setId(12345 );

item.setName("hi");

session.write(item);

session.write(item);

session.write(item);

session.write(item);

session.write(item);

2. 在handle的messageSent方法进行了延时处理,延时3秒

public   void  messageSent(IoSession session, Object message)  throws  Exception {

 Thread.sleep(3000 );

 System.out.println(message);

}

3. 测试结果 

5个消息是串行发送,都由同一个IoPorcessor线程处理

session.write(item);

session.write(item);

session.write(item);

session.write(item);

session.write(item);

服务器端每隔3秒收到一个消息。因为调用是由IoProcessor触发,而一个connector只会使用一个IoProcessor线程 

4. 增加ExecutorFilter,ExecutorFilter保证在处理handler的时候是独立线程 

connector.getFilterChain().addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool()));

5. 测试结果 

4个session.wirte变成了并行处理,服务器端同时收到了5条消息

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,039评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,223评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,916评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,009评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,030评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,011评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,934评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,754评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,202评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,433评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,590评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,321评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,917评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,568评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,738评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,583评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,482评论 2 352

推荐阅读更多精彩内容