本文为原创文章,转载请注明出处
查看[easy-rpc]系列内容请点击:https://www.jianshu.com/nb/47424623
源码地址:这里
大家可以用在自己小型的项目上了,我自己测了,针对200个并发量,每个线程500个数字的数组排序的任务,在多线程情况下,仅需1.414秒~
默认情况下,easy-rpc的server端是以单线程模式启动的,启动后事务执行具有原子性。我们可以通过server的configuration
来配置,使其以Reactor主从多线程方式运行,只需要按照如下代码:
server.getConfiguration().setThreadType(ThreadType.MULTI);
系统就会以多线程模式运行啦~
另外,这次对项目的结构做了一次优化。主要分为四个包:
server
:服务器相关内容client
:客户端相关内容core
:服务器和客户端都需要的工具类samples
:一些示例可以从这里找哦~
在server中,IO多路复用机制主要定义在com.codelifeliwan.rpc.server.nio
包中,入口类就是·MultiDataAcceptEventLoop·类,对应的单线程模式入口类是:SingleDataAcceptEventLoop
在该包下有一个子包reactor
,其中包含了以下几个文件:
Reactor.java
:Reactor模式的主类Acceptor.java
:理论上的读写IO主类(这里暂时读写IO不放在这里,而是放在了SocketHandler.java
中)SocketHandler.java
:数据处理主类
下面一一说明。
Reactor.java
Reactor模式的主类,只包含一个线程,主要负责对服务器端口的监听,其中有一个主要的对象ServerSocketChannel
对象,以单线程模式监听服务器端口,一旦监听到事件,就会发送到Acceptor中进行处理。
一个Reactor包含了多个Acceptor,分别运行在不同的线程中,在Reactor中通过数组和线程池来管理Acceptor
除了负责监听端口外,Reactor还负责对于Accestor线程池和SocketHandler线程池的初始化工作。
// Reactor.java
package com.codelifeliwan.rpc.server.nio.reactor;
import com.codelifeliwan.rpc.server.config.Configuration;
import org.apache.log4j.Logger;
import java.net.InetSocketAddress;
import java.nio.channels.*;
import java.util.concurrent.*;
/**
* @author LiWan
* <p>
* 主从Reactor多线程 模式处理,本类是主Reactor,只有一个线程
* 主Reactor线程只负责连接的建立工作,具体的服务器通讯和IO操作放在Acceptor里面实现
* <p>
* 默认的Acceptor工作线程数(即Acceptor个数)为CPU线程数*2,可自主设置
*/
public class Reactor extends Thread {
private static final Logger log = Logger.getLogger(Reactor.class);
private volatile boolean started = false;
/**
* 业务处理的线程池
* 默认为自动伸缩的线程池
*/
private Executor processThreadPool;
/**
* 执行Accestor的线程池
*/
private Executor accestorThreadPool;
/**
* 服务器配置信息
*/
private Configuration configuration;
private Acceptor[] acceptors;
private volatile int acceptorCount = 1;
/**
* 服务器事件监听,只监听 ACCEPT 事件
*/
private ServerSocketChannel channel;
public Reactor(Configuration configuration, int acceptorCount) throws Exception {
this(null, configuration, acceptorCount);
}
public Reactor(Configuration configuration) throws Exception {
this(null, configuration);
}
public Reactor(Executor processThreadPool, Configuration configuration) throws Exception {
this(processThreadPool, configuration, Runtime.getRuntime().availableProcessors() * 2);
}
public Reactor(Executor processThreadPool, Configuration configuration, int acceptorCount) throws Exception {
if (processThreadPool != null) {
this.processThreadPool = processThreadPool;
} else {
// 默认创建线程池中的线程个数从 cpu线程数 到 cpu线程数*5,线程过期时间1分钟
this.processThreadPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 5,
1,
TimeUnit.MINUTES,
new LinkedBlockingDeque<>());
}
this.configuration = configuration;
this.acceptorCount = acceptorCount;
}
/**
* 初始化Reactor
* 初始化IO线程池(Acceptor线程池)并开始执行
*
* @throws Exception
*/
private synchronized void init() throws Exception {
channel = ServerSocketChannel.open();
// channel.configureBlocking(false);
channel.socket().bind(new InetSocketAddress(configuration.getListeningPort()));
acceptors = new Acceptor[this.acceptorCount];
for (int i = 0; i < acceptors.length; i++) {
acceptors[i] = new Acceptor(processThreadPool, configuration);
}
// 初始化并执行IO线程池
accestorThreadPool = Executors.newFixedThreadPool(acceptors.length);
for (Acceptor acceptor : acceptors) {
accestorThreadPool.execute(acceptor);
}
}
/**
* 复写run方法
*/
public synchronized void run() {
if (started) {
log.info("Thread " + Thread.currentThread().getId() + " already started.");
return;
}
started = true;
try {
init();
// 轮流使用Acceptor执行IO任务
int round = 0; // 下一个要使用的Acceptor指针
acceptorCount = acceptors.length;
while (started && (!Thread.currentThread().isInterrupted())) {
// 此处会阻塞到有连接请求为止
SocketChannel c = channel.accept();
// 将该请求转发到对应的线程上执行IO操作
c.configureBlocking(false);
acceptors[round++].registerConnectChannel(c);
round = round % acceptorCount;
}
} catch (Exception e) {
e.printStackTrace();
log.error(e.getMessage());
}
}
public void shutdown() throws Exception {
for (Acceptor acceptor : acceptors) acceptor.close();
started = false;
channel.close();
Thread.currentThread().interrupt();
}
}
Acceptor.java
理论上来说,Accestor是对网络IO读写的操作,但是这里因为直接使用Socket连接来进行,所以目前Acceptor中只负责对于不同网络事件的轮询、转发和状态转换工作。
// Acceptor.java
package com.codelifeliwan.rpc.server.nio.reactor;
import com.codelifeliwan.rpc.server.config.Configuration;
import com.codelifeliwan.rpc.core.RPCByteBuffer;
import lombok.Getter;
import org.apache.log4j.Logger;
import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;
/**
* @author LiWan
* <p>
* 负责具体的网络IO操作,并将执行下发到线程池中运行
*/
@Getter
public class Acceptor implements Closeable, Runnable {
private static final Logger log = Logger.getLogger(Acceptor.class);
/**
* 具体处理的线程池,在一个Server内共享该线程池
*/
private Executor processThreadPool;
private volatile Selector selector;
private Configuration configuration;
private volatile boolean closed = true;
public Acceptor(Executor processThreadPool, Configuration configuration) throws Exception {
this.processThreadPool = processThreadPool;
this.configuration = configuration;
this.selector = Selector.open();
}
@Override
public void close() throws IOException {
selector.close();
closed = true;
Thread.currentThread().interrupt();
}
/**
* 从主Reactor注册channel,这里只监听OP_CONNECT和OP_READ请求
*
* @param sc
* @throws Exception
*/
public void registerConnectChannel(SocketChannel sc) throws Exception {
registerConnectChannel(sc, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
}
public void registerConnectChannel(SocketChannel sc, int status) throws Exception {
sc.register(selector, status, this);
selector.wakeup();
}
/**
* 读取并处理消息
*/
public void handleExecutor(SocketChannel channel, RPCByteBuffer buffer) throws Exception {
processThreadPool.execute(new SocketHandler(configuration, channel, buffer));
}
private void handleKey(SelectionKey key) throws Exception {
if (key.isConnectable()) {
handleConnectEvent(key);
} else if (key.isAcceptable()) {
handleAcceptEvent(key);
} else if (key.isReadable()) {
handleReadEvent(key);
} else if (key.isWritable()) {
handleOtherEvent(key);
} else {
handleOtherEvent(key);
}
}
public void run() {
closed = false;
while (!closed && (!Thread.currentThread().isInterrupted())) {
try {
int eventCount = selector.select();
if (eventCount == 0) continue;
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
try {
if (!key.isValid()) {
key.channel().close();
key.cancel();
continue;
}
handleKey(key);
} catch (Exception e1) {
e1.printStackTrace();
log.error(e1.getMessage());
}
}
} catch (Exception e) {
e.printStackTrace();
log.error(e.getMessage());
}
}
}
/**
* 处理CONNECT事件
*
* @param key
* @throws Exception
*/
private void handleConnectEvent(SelectionKey key) throws Exception {
log.info("*** connectable");
SocketChannel ch = (SocketChannel) key.channel();
ch.finishConnect();
key.interestOps(SelectionKey.OP_READ);
}
/**
* 读取数据
*
* @param key
* @throws Exception
*/
private void handleReadEvent(SelectionKey key) throws Exception {
log.info("*** readable");
key.cancel();
SocketChannel ch = (SocketChannel) key.channel();
RPCByteBuffer buffer = RPCByteBuffer.fromChannel(ch);
ch.shutdownInput();
handleExecutor(ch, buffer);
}
/**
* 测试此键的通道是否已准备好接受新的套接字连接
* 本项目中,应该是在Reactor中使用,这里不使用
*
* @param key
* @throws Exception
*/
private void handleAcceptEvent(SelectionKey key) throws Exception {
log.error("*** acceptable, this is impossiable");
throw new Exception("no such event : accept");
}
/**
* 写事件等,暂时不使用
*
* @param key
* @throws Exception
*/
private void handleOtherEvent(SelectionKey key) throws Exception {
log.error("*** not used");
throw new Exception("no such event : other");
}
}
SocketHandler.java
SocketHandler负责对于服务器的方法执行操作,也是通过线程池来管理SocketHandler的:
// SocketHandler.java
package com.codelifeliwan.rpc.server.nio.reactor;
import com.codelifeliwan.rpc.core.RPCDefaultMessage;
import com.codelifeliwan.rpc.core.RPCStatus;
import com.codelifeliwan.rpc.core.serializer.MessageSerializer;
import com.codelifeliwan.rpc.server.config.BeanScope;
import com.codelifeliwan.rpc.server.config.ClassInfo;
import com.codelifeliwan.rpc.server.config.Configuration;
import com.codelifeliwan.rpc.core.RPCByteBuffer;
import com.google.gson.Gson;
import org.apache.log4j.Logger;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author LiWan
* <p>
* 负责具体的消息序列化、方法调用等操作
*/
public class SocketHandler implements Runnable {
private static final Logger log = Logger.getLogger(SocketHandler.class);
private Configuration configuration;
private Map<String, ClassInfo<?>> beans;
private RPCByteBuffer buffer;
/**
* Method的缓存
* key = beanName-methodName
*/
private static Map<String, Method> methodsCache = new ConcurrentHashMap<>();
/**
* 消息会从该socket解析,channel.socket()
* 并通过该socket写回消息,用完后关闭
*/
private SocketChannel channel;
public SocketHandler(Configuration configuration, SocketChannel channel, RPCByteBuffer buffer) {
this.configuration = configuration;
this.channel = channel;
this.beans = configuration.getBeanClasses();
this.buffer = buffer;
}
@Override
public void run() {
MessageSerializer serializer = configuration.getMessageSerializer();
try {
byte[] bytes = this.buffer.getByteArray();
RPCDefaultMessage fromMessage = (RPCDefaultMessage) serializer.unSerializeMessage(new String(bytes), null, null);
if (!beans.containsKey(fromMessage.getBeanName())) {
String warn = "unknown message : " + fromMessage.getBeanName();
log.error(warn);
throw new Exception(warn);
}
ClassInfo bean = beans.get(fromMessage.getBeanName());
Object beanObj = null;
if (bean.getScope() == BeanScope.SINGLETON) {
// 单例模式
beanObj = bean.getDefaultObject();
} else {
// 原型模式
beanObj = bean.getClazz().getConstructor().newInstance();
}
// 实际方法调用处理过程, 这次重新序列化类型
RPCDefaultMessage response = processMessage(beanObj, fromMessage);
// 将返回结果写回
String responseStr = serializer.serializeMessage(response);
channel.write(ByteBuffer.wrap(responseStr.getBytes()));
} catch (Exception e) {
e.printStackTrace();
RPCDefaultMessage response = new RPCDefaultMessage();
response.setStatus(RPCStatus.GENERAL_ERRPR);
response.setValue("error:" + e.getMessage());
try {
String responseStr = serializer.serializeMessage(response);
channel.write(ByteBuffer.wrap(responseStr.getBytes()));
} catch (Exception ex) {
ex.printStackTrace();
}
} finally {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
log.error(e.getMessage());
}
}
}
/**
* 具体调用方法实现
*
* @param bean 实例化的bean
* @param message 客户端传来的消息
* @return
* @throws Exception
*/
private RPCDefaultMessage processMessage(Object bean, RPCDefaultMessage message) throws Exception {
String key = message.getBeanName() + "-" + message.getMethodName();
if (!methodsCache.containsKey(key)) {
synchronized (SocketHandler.class) {
if (!methodsCache.containsKey(key)) {
Method[] methods = bean.getClass().getDeclaredMethods();
if (methods != null) {
for (Method m : methods) {
if (m.getName().equalsIgnoreCase(message.getMethodName())) {
methodsCache.put(key, m);
break;
}
}
}
}
}
}
// 利用反射机制来实现方法调用
if (!methodsCache.containsKey(key)) throw new Exception("no such method : " + message.getMethodName());
Method method = methodsCache.get(key);
RPCDefaultMessage response = new RPCDefaultMessage();
// 参数类型转化,避免因为json传输造成的类型不匹配问题
Gson gson = new Gson();
Object[] params = message.getParamValues();
if (params == null) params = new Object[0];
Class[] paramTypes = method.getParameterTypes();
if (paramTypes == null) paramTypes = new Class[0];
if (params.length != paramTypes.length) throw new Exception("method param(s) not match.");
for (int i = 0; i < params.length; i++) {
params[i] = gson.fromJson(gson.toJson(params[i]), paramTypes[i]);
}
response.setValue(method.invoke(bean, params));
return response;
}
}
欢迎不懂的小伙伴留言~