之前的项目需要用到mina,实现的功能主要是:服务端主动发送消息到客户端,这个的服务端为外网的tomcat,客户端为内网的tomcat,由于无法知道内网tomcat 的地址,也就不能直接通过http的方式发送信息回来,最后想来想去用mina实现了这个功能。
当然,我这里的服务端是整合的了spring 的,也可以直接把服务端独立出来,不整合spring,这个都一样,区别不大。
代码和配置如下:
---------------------------
1,jar包,我这里使用的是spring4.0.5,mina2.0.7
maven部分文件如下,这个包会自动也依赖进来mina-filter-ssl-1.1.7.jar
[java] view plain copy
[java] view plain copy
org.springframework
spring-context-support
${springframework-version}
org.springframework
spring-jdbc
${springframework-version}
org.springframework
spring-orm
${springframework-version}
org.springframework
spring-aop
${springframework-version}
org.springframework
spring-web
${springframework-version}
commons-logging
commons-logging
org.springframework
spring-webmvc
${springframework-version}
org.aspectj
aspectjrt
${aspectj-version}
org.aspectj
aspectjweaver
${aspectj-version}
org.apache.mina
mina-core
2.0.7
org.apache.mina
mina-integration-spring
1.1.7
org.apache.mina
mina-integration-beans
2.0.8
2,spring-ztc_app-mina.xml (spring与mina 的配置文件)
spring与mina 的配置文件,需要导入到spring 的总配置文件中,或者加入到web.xml的spring监听扫描中
[java] view plain copy
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
-->
value="org.apache.mina.integration.beans.InetSocketAddressEditor">
init-method="bind" destroy-method="unbind">
/> -->
class="org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder">
3,mina服务端业务处理类
[java] view plain copy
package cn.hydom.ztc.ztc_app.controller.mina;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
/**
* @Description: mina服务端业务处理类
* @author whl
* @date 2014-9-30 下午12:36:28
*
*/
public class ServerHandler extends IoHandlerAdapter {
private final static Log log = LogFactory.getLog(ServerHandler.class);
public ServerHandler() {
// TODO Auto-generated constructor stub
}
@Override
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception {
}
@Override
public void messageReceived(IoSession session, Object message)
throws Exception {
log.debug("服务端收到信息-------------");
//获取客户端发过来的key
String key = message.toString();
System.out.println("message :"+message.toString());
String carPark_id = key.substring(key.indexOf("=") + 1);
System.out.println("carPark_id :"+carPark_id);
//保存客户端的会话session
SessionMap sessionMap = SessionMap.newInstance();
sessionMap.addSession(carPark_id, session);
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
log.debug("------------服务端发消息到客户端---");
}
@Override
public void sessionClosed(IoSession session) throws Exception {
// TODO Auto-generated method stub
log.debug("远程session关闭了一个..." + session.getRemoteAddress().toString());
}
@Override
public void sessionCreated(IoSession session) throws Exception {
log.debug(session.getRemoteAddress().toString() +"----------------------create");
}
@Override
public void sessionIdle(IoSession session, IdleStatus status)
throws Exception {
log.debug(session.getServiceAddress() +"IDS");
}
@Override
public void sessionOpened(IoSession session) throws Exception {
log.debug("连接打开:"+session.getLocalAddress());
}
}
4,服务端缓存客户端socket连接的单例类
[html] view plain copy
package cn.hydom.ztc.ztc_app.controller.mina;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.mina.core.session.IoSession;
/**
* @Description: 单例工具类,保存所有mina客户端连接
* @author whl
* @date 2014-9-29 上午10:09:15
*
*/
public class SessionMap {
private final static Loglog = LogFactory.getLog(SessionMap.class);
private static SessionMapsessionMap = null;
private Mapmap = new HashMap();
//构造私有化 单例
private SessionMap(){}
/**
* @Description: 获取唯一实例
* @author whl
* @date 2014-9-29 下午1:29:33
*/
public static SessionMap newInstance(){
log.debug("SessionMap单例获取---");
if(sessionMap == null){
sessionMap = new SessionMap();
}
return sessionMap;
}
/**
* @Description: 保存session会话
* @author whl
* @date 2014-9-29 下午1:31:05
*/
public void addSession(String key, IoSession session){
log.debug("保存会话到SessionMap单例---key=" + key);
this.map.put(key, session);
}
/**
* @Description: 根据key查找缓存的session
* @author whl
* @date 2014-9-29 下午1:31:55
*/
public IoSession getSession(String key){
log.debug("获取会话从SessionMap单例---key=" + key);
return this.map.get(key);
}
/**
* @Description: 发送消息到客户端
* @author whl
* @date 2014-9-29 下午1:57:51
*/
public void sendMessage(String[] keys, Object message){
for(String key : keys){
IoSessionsession = getSession(key);
log.debug("反向发送消息到客户端Session---key=" + key + "----------消息=" + message);
if(session == null){
return;
}
session.write(message);
}
}
}
5,编码解码器,
HCoderFactory.java
[java] view plain copy
package cn.hydom.ztc.ztc_app.controller.mina;
import java.nio.charset.Charset;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;
/**
* @Description: 编码和解码器工厂类.
* @author whl
* @date 2014-9-30 下午12:34:59
*
*/
public class HCoderFactory implements ProtocolCodecFactory {
private final HEncoder encoder;
private final HDecoder decoder;
public HCoderFactory() {
//this(Charset.defaultCharset());
this(Charset.forName("UTF-8"));
}
public HCoderFactory(Charset charSet) {
this.encoder = new HEncoder(charSet);
this.decoder = new HDecoder(charSet);
}
@Override
public ProtocolDecoder getDecoder(IoSession arg0) throws Exception {
return decoder;
}
@Override
public ProtocolEncoder getEncoder(IoSession arg0) throws Exception {
return encoder;
}
}
HDecoder.java
[java] view plain copy
package cn.hydom.ztc.ztc_app.controller.mina;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
/**
* @Description: 解码工具类
* @author whl
* @date 2014-9-30 下午12:35:22
*
*/
public class HDecoder extends CumulativeProtocolDecoder {
private final Charset charset;
public HDecoder(Charset charset) {
this.charset = charset;
}
public boolean doDecode(IoSession session, IoBuffer in,
ProtocolDecoderOutput out)throws Exception {
//System.out.println("-------doDecode----------");
CharsetDecoder cd = charset.newDecoder();
String mes = in.getString(cd);
out.write(mes);
return true;
/*
if (in.remaining() > 4) {// 有数据时,读取字节判断消息长度
in.mark();// 标记当前位置,以便reset
int size = in.getInt();
// 如果消息内容不够,则重置,相当于不读取size
if (size > in.remaining()) {
in.reset();
return false;// 接收新数据,以拼凑成完整数据
} else if (size != 0 && (size - 4 >= 0)) {
byte[] bytes = new byte[size - 4];
//int protocol = in.getInt();
// 拿到客户端发过来的数据组装成基础包写出去
in.get(bytes, 0, size - 4);
//in.get(bytes, size - 4, size);
PackageBeanFactory beanFactory = (PackageBeanFactory) session
.getAttribute(ServerHandler.BEAN_FACTORY);
//out.write(beanFactory.getPackage(protocol, size, bytes));
String mes = in.getString(cd);
out.write(mes);
// 如果读取内容后还粘了包,就让父类再给读取进行下次解析
if (in.remaining() > 0) {
return true;
}
}
}
return false;// 处理成功,让父类进行接收下个包
*/
}
}
HEncoder.java
[java] view plain copy
package cn.hydom.ztc.ztc_app.controller.mina;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoder;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
/**
* @Description: 编码工具类
* @author whl
* @date 2014-9-30 下午12:35:35
*
*/
public class HEncoder implements ProtocolEncoder {
private final static Log log = LogFactory.getLog(HEncoder.class);
private final Charset charset;
public HEncoder(Charset charset) {
this.charset = charset;
}
@Override
public void encode(IoSession session, Object message,
ProtocolEncoderOutput out)throws Exception {
CharsetEncoder ce = charset.newEncoder();
String mes = (String) message;
IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true);
buffer.putString(mes,ce);
buffer.flip();
out.write(buffer);
/*System.out.println("---------encode-------------");
String mes = (String) message;
byte[] data = mes.getBytes("UTF-8");
IoBuffer buffer = IoBuffer.allocate(data.length + 4);
buffer.putInt(data.length);
buffer.put(data);
buffer.flip();
out.write(buffer);
out.flush();*/
}
@Override
public void dispose(IoSession session) throws Exception {
log.info("Dispose called,session is " + session);
}
}
6,客户端程序
ClentMain.java
[java] view plain copy
package cn.hydom.ztc.ztc_app.controller.mina;
import java.net.InetSocketAddress;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.mina.core.filterchain.IoFilterAdapter;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
/**
* @Description: mina客户端,包含断线重连机制,空闲重连机制
* @author whl
* @date 2014-11-2
*/
public class ClentMain extends Thread{
private final static Log log = LogFactory.getLog(ClentMain.class);
@Override
public void run() {
//ip
String host ="192.168.0.38";
//端口
int port = 6007;
//停车场id
final String carPark_id = "1";
// 创建客户端连接器.
final NioSocketConnector connector = new NioSocketConnector();
//设置连接超时
connector.setConnectTimeoutMillis(30000);
// 设置默认访问地址
connector.setDefaultRemoteAddress(new InetSocketAddress(host, port));
//将IoSession的主键属性注入线程映射表MDC中
//connector.getFilterChain().addLast("mdc", new MdcInjectionFilter());
//日志过滤器
connector.getFilterChain().addLast("logger", new LoggingFilter());
// 设置编码过滤器
connector.getFilterChain().addLast("codec",new ProtocolCodecFilter(new HCoderFactory()));
//添加处理器
connector.setHandler(new ClintHandler());
// 设置接收缓冲区的大小
connector.getSessionConfig().setReceiveBufferSize(10240);
// 设置输出缓冲区的大小
connector.getSessionConfig().setSendBufferSize(10240);
/**
* 空闲重连的机制,根据需要选择相应的配置
*/
// 读写都空闲时间:30秒
//connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 30);
// 读(接收通道)空闲时间:40秒
//connector.getSessionConfig().setIdleTime(IdleStatus.READER_IDLE, 40);
// 写(发送通道)空闲时间:50秒
//connector.getSessionConfig().setIdleTime(IdleStatus.WRITER_IDLE, 50);
//断线重连回调拦截器
connector.getFilterChain().addFirst("reconnection", new IoFilterAdapter() {
@Override
public void sessionClosed(NextFilter nextFilter, IoSession ioSession) throws Exception {
for(;;){
try{
Thread.sleep(3000);
ConnectFuture future = connector.connect();
future.awaitUninterruptibly();// 等待连接创建成功
IoSession session = future.getSession();// 获取会话
session.write("key="+carPark_id);
if(session.isConnected()){
log.info("断线重连["+ connector.getDefaultRemoteAddress().getHostName() +":"+ connector.getDefaultRemoteAddress().getPort()+"]成功");
//System.out.println("断线重连["+ connector.getDefaultRemoteAddress().getHostName() +":"+ connector.getDefaultRemoteAddress().getPort()+"]成功");
break;
}
}catch(Exception ex){
log.info("重连服务器登录失败,3秒再连接一次:" + ex.getMessage());
//System.out.println("重连服务器登录失败,3秒再连接一次:" + ex.getMessage());
}
}
}
});
//开始连接
for (;;) {
try {
ConnectFuture future = connector.connect();
// 等待连接创建成功
future.awaitUninterruptibly();
// 获取会话
IoSession session = future.getSession();
//发送消息
session.write("key=" + carPark_id);
log.error("连接服务端" + host + ":" + port + "[成功]" + ",,时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
break;
}catch (Exception e) {
//System.out.println("连接服务端" + host + ":" + port + "失败" + ",,时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ", 连接MSG异常,请检查MSG端口、IP是否正确,MSG服务是否启动,异常内容:" + e.getMessage());
log.error("连接服务端" + host + ":" + port + "失败" + ",,时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ", 连接MSG异常,请检查MSG端口、IP是否正确,MSG服务是否启动,异常内容:" + e.getMessage(), e);
// 连接失败后,重连10次,间隔30s
try {
Thread.sleep(5000);
}catch (InterruptedException e1) {
e1.printStackTrace();
log.error("连接服务端失败后,睡眠5秒发生异常!");
}
}
}
// cf.getSession().write("quit");//发送消息
//cf.getSession().close();
//cf.getSession().getCloseFuture().awaitUninterruptibly();// 等待连接断开
//connector.dispose();
}
}
ClintHandler.java
[java] view plain copy
package cn.hydom.ztc.ztc_app.controller.mina;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
/**
* @Description: 客户端业务处理类
* @author whl
* @date 2014-11-2
*/
public class ClintHandler extends IoHandlerAdapter {
private final static Log log = LogFactory.getLog(ClintHandler.class);
/**
* 写处理服务端推送的信息的逻辑
*/
@Override
public void messageReceived(IoSession session, Object message)
throws Exception {
System.out.println("-----服务顿返回的json数据----");
String s = message.toString();
System.out.println("message :" + s);
System.out.println("message length:" + s.length());
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
log.info("-客户端与服务端连接[空闲] - " + status.toString());
System.out.println("-客户端与服务端连接[空闲] - " + status.toString());
if(session != null){
session.close(true);
}
}
}
7,测试
这里就不写实际的代码了,
1,首先部署web工程启动tomcat,让服务端先运行起来,
2,然后运行客户端
[java] view plain copy
public class ClintTest1 {
public static void main(String[] args) throws Exception {
ClentMain mina =new ClentMain();
mina.start();
}
}
3,想要服务端主动发回来信息,还得在服务端的web工程的action中写一个htpp访问的方法,方法中去sessionMap类中缓存的map中查找session,然后发送消息。
代码如下,我用的是springmvc
[java] view plain copy
/**
* @Description: 发送消息到客户端
* @author whl
* @date 2014-9-29 下午1:18:54
*/
@ResponseBody
@RequestMapping(value="/sendMessage")
public String sendMessage(HttpServletRequest request, String[] carPark_id){
try{
//
//获取链接的参数carPark_id
log.debug("carPark_id[].length--------- " + carPark_id.length);
for(String id : carPark_id){
log.debug("carPark_id --------- " + id);
}
//这里用的假数据 carPark_id = new String[]{"1"};
//发送的信息String jsonstr = "123";//反向发送信息log.debug("开始反向发送消息到客户端-------- ");SessionMap sessionMap = SessionMap.newInstance();sessionMap.sendMessage(carPark_id, jsonstr);//返回信息return "发送的信息为" + jsonstr;
}catch (Exception e) {log.error(e);return "出错了。。";}}
4,好了,现在重新发布工程,启动服务端和客户端,然后访问这个链接就可以了,当然springmvc需要自己配置,这也不是这里的重点。
注意:客户端发过来的carPark_id必须与服务端查找的一致,不让就找不到相应的客户端session连接,消息无法发送成功。