【easy-rpc】二、优化日志:支持Reactor模式IO多路复用机制

本文为原创文章,转载请注明出处
查看[easy-rpc]系列内容请点击:https://www.jianshu.com/nb/47424623

源码地址:这里

大家可以用在自己小型的项目上了,我自己测了,针对200个并发量,每个线程500个数字的数组排序的任务,在多线程情况下,仅需1.414秒~

默认情况下,easy-rpc的server端是以单线程模式启动的,启动后事务执行具有原子性。我们可以通过server的configuration来配置,使其以Reactor主从多线程方式运行,只需要按照如下代码:

server.getConfiguration().setThreadType(ThreadType.MULTI);

系统就会以多线程模式运行啦~

另外,这次对项目的结构做了一次优化。主要分为四个包:

  • server:服务器相关内容
  • client:客户端相关内容
  • core:服务器和客户端都需要的工具类
  • samples:一些示例可以从这里找哦~

在server中,IO多路复用机制主要定义在com.codelifeliwan.rpc.server.nio包中,入口类就是·MultiDataAcceptEventLoop·类,对应的单线程模式入口类是:SingleDataAcceptEventLoop

在该包下有一个子包reactor,其中包含了以下几个文件:

  • Reactor.java:Reactor模式的主类
  • Acceptor.java:理论上的读写IO主类(这里暂时读写IO不放在这里,而是放在了 SocketHandler.java中)
  • SocketHandler.java:数据处理主类

下面一一说明。

Reactor.java

Reactor模式的主类,只包含一个线程,主要负责对服务器端口的监听,其中有一个主要的对象ServerSocketChannel对象,以单线程模式监听服务器端口,一旦监听到事件,就会发送到Acceptor中进行处理。

一个Reactor包含了多个Acceptor,分别运行在不同的线程中,在Reactor中通过数组和线程池来管理Acceptor

除了负责监听端口外,Reactor还负责对于Accestor线程池和SocketHandler线程池的初始化工作。

// Reactor.java
package com.codelifeliwan.rpc.server.nio.reactor;

import com.codelifeliwan.rpc.server.config.Configuration;
import org.apache.log4j.Logger;

import java.net.InetSocketAddress;
import java.nio.channels.*;
import java.util.concurrent.*;

/**
 * @author LiWan
 * <p>
 * 主从Reactor多线程 模式处理,本类是主Reactor,只有一个线程
 * 主Reactor线程只负责连接的建立工作,具体的服务器通讯和IO操作放在Acceptor里面实现
 * <p>
 * 默认的Acceptor工作线程数(即Acceptor个数)为CPU线程数*2,可自主设置
 */
public class Reactor extends Thread {
    private static final Logger log = Logger.getLogger(Reactor.class);

    private volatile boolean started = false;

    /**
     * 业务处理的线程池
     * 默认为自动伸缩的线程池
     */
    private Executor processThreadPool;

    /**
     * 执行Accestor的线程池
     */
    private Executor accestorThreadPool;

    /**
     * 服务器配置信息
     */
    private Configuration configuration;

    private Acceptor[] acceptors;
    private volatile int acceptorCount = 1;

    /**
     * 服务器事件监听,只监听 ACCEPT 事件
     */
    private ServerSocketChannel channel;

    public Reactor(Configuration configuration, int acceptorCount) throws Exception {
        this(null, configuration, acceptorCount);
    }

    public Reactor(Configuration configuration) throws Exception {
        this(null, configuration);
    }

    public Reactor(Executor processThreadPool, Configuration configuration) throws Exception {
        this(processThreadPool, configuration, Runtime.getRuntime().availableProcessors() * 2);
    }

    public Reactor(Executor processThreadPool, Configuration configuration, int acceptorCount) throws Exception {
        if (processThreadPool != null) {
            this.processThreadPool = processThreadPool;
        } else {
            // 默认创建线程池中的线程个数从 cpu线程数 到 cpu线程数*5,线程过期时间1分钟
            this.processThreadPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
                    Runtime.getRuntime().availableProcessors() * 5,
                    1,
                    TimeUnit.MINUTES,
                    new LinkedBlockingDeque<>());
        }
        this.configuration = configuration;
        this.acceptorCount = acceptorCount;
    }

    /**
     * 初始化Reactor
     * 初始化IO线程池(Acceptor线程池)并开始执行
     *
     * @throws Exception
     */
    private synchronized void init() throws Exception {
        channel = ServerSocketChannel.open();
        // channel.configureBlocking(false);
        channel.socket().bind(new InetSocketAddress(configuration.getListeningPort()));

        acceptors = new Acceptor[this.acceptorCount];
        for (int i = 0; i < acceptors.length; i++) {
            acceptors[i] = new Acceptor(processThreadPool, configuration);
        }

        // 初始化并执行IO线程池
        accestorThreadPool = Executors.newFixedThreadPool(acceptors.length);
        for (Acceptor acceptor : acceptors) {
            accestorThreadPool.execute(acceptor);
        }
    }


    /**
     * 复写run方法
     */
    public synchronized void run() {
        if (started) {
            log.info("Thread " + Thread.currentThread().getId() + " already started.");
            return;
        }

        started = true;

        try {
            init();

            // 轮流使用Acceptor执行IO任务
            int round = 0; // 下一个要使用的Acceptor指针
            acceptorCount = acceptors.length;
            while (started && (!Thread.currentThread().isInterrupted())) {

                // 此处会阻塞到有连接请求为止
                SocketChannel c = channel.accept();

                // 将该请求转发到对应的线程上执行IO操作
                c.configureBlocking(false);
                acceptors[round++].registerConnectChannel(c);

                round = round % acceptorCount;
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.error(e.getMessage());
        }
    }

    public void shutdown() throws Exception {
        for (Acceptor acceptor : acceptors) acceptor.close();
        started = false;
        channel.close();
        Thread.currentThread().interrupt();
    }
}

Acceptor.java

理论上来说,Accestor是对网络IO读写的操作,但是这里因为直接使用Socket连接来进行,所以目前Acceptor中只负责对于不同网络事件的轮询、转发和状态转换工作。

// Acceptor.java
package com.codelifeliwan.rpc.server.nio.reactor;

import com.codelifeliwan.rpc.server.config.Configuration;
import com.codelifeliwan.rpc.core.RPCByteBuffer;
import lombok.Getter;
import org.apache.log4j.Logger;

import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;

/**
 * @author LiWan
 * <p>
 * 负责具体的网络IO操作,并将执行下发到线程池中运行
 */
@Getter
public class Acceptor implements Closeable, Runnable {
    private static final Logger log = Logger.getLogger(Acceptor.class);

    /**
     * 具体处理的线程池,在一个Server内共享该线程池
     */
    private Executor processThreadPool;

    private volatile Selector selector;

    private Configuration configuration;

    private volatile boolean closed = true;

    public Acceptor(Executor processThreadPool, Configuration configuration) throws Exception {
        this.processThreadPool = processThreadPool;
        this.configuration = configuration;
        this.selector = Selector.open();
    }


    @Override
    public void close() throws IOException {
        selector.close();
        closed = true;
        Thread.currentThread().interrupt();
    }

    /**
     * 从主Reactor注册channel,这里只监听OP_CONNECT和OP_READ请求
     *
     * @param sc
     * @throws Exception
     */
    public void registerConnectChannel(SocketChannel sc) throws Exception {
        registerConnectChannel(sc, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
    }

    public void registerConnectChannel(SocketChannel sc, int status) throws Exception {
        sc.register(selector, status, this);
        selector.wakeup();
    }

    /**
     * 读取并处理消息
     */
    public void handleExecutor(SocketChannel channel, RPCByteBuffer buffer) throws Exception {
        processThreadPool.execute(new SocketHandler(configuration, channel, buffer));
    }

    private void handleKey(SelectionKey key) throws Exception {
        if (key.isConnectable()) {
            handleConnectEvent(key);
        } else if (key.isAcceptable()) {
            handleAcceptEvent(key);
        } else if (key.isReadable()) {
            handleReadEvent(key);
        } else if (key.isWritable()) {
            handleOtherEvent(key);
        } else {
            handleOtherEvent(key);
        }
    }

    public void run() {
        closed = false;
        while (!closed && (!Thread.currentThread().isInterrupted())) {
            try {
                int eventCount = selector.select();
                if (eventCount == 0) continue;

                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> iter = keys.iterator();

                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    iter.remove();

                    try {
                        if (!key.isValid()) {
                            key.channel().close();
                            key.cancel();
                            continue;
                        }
                        handleKey(key);
                    } catch (Exception e1) {
                        e1.printStackTrace();
                        log.error(e1.getMessage());
                    }
                }

            } catch (Exception e) {
                e.printStackTrace();
                log.error(e.getMessage());
            }
        }
    }

    /**
     * 处理CONNECT事件
     *
     * @param key
     * @throws Exception
     */
    private void handleConnectEvent(SelectionKey key) throws Exception {
        log.info("*** connectable");
        SocketChannel ch = (SocketChannel) key.channel();
        ch.finishConnect();
        key.interestOps(SelectionKey.OP_READ);
    }

    /**
     * 读取数据
     *
     * @param key
     * @throws Exception
     */
    private void handleReadEvent(SelectionKey key) throws Exception {
        log.info("*** readable");
        key.cancel();
        SocketChannel ch = (SocketChannel) key.channel();

        RPCByteBuffer buffer = RPCByteBuffer.fromChannel(ch);
        ch.shutdownInput();
        handleExecutor(ch, buffer);
    }

    /**
     * 测试此键的通道是否已准备好接受新的套接字连接
     * 本项目中,应该是在Reactor中使用,这里不使用
     *
     * @param key
     * @throws Exception
     */
    private void handleAcceptEvent(SelectionKey key) throws Exception {
        log.error("*** acceptable, this is impossiable");
        throw new Exception("no such event : accept");
    }

    /**
     * 写事件等,暂时不使用
     *
     * @param key
     * @throws Exception
     */
    private void handleOtherEvent(SelectionKey key) throws Exception {
        log.error("*** not used");
        throw new Exception("no such event : other");
    }
}

SocketHandler.java

SocketHandler负责对于服务器的方法执行操作,也是通过线程池来管理SocketHandler的:

// SocketHandler.java
package com.codelifeliwan.rpc.server.nio.reactor;

import com.codelifeliwan.rpc.core.RPCDefaultMessage;
import com.codelifeliwan.rpc.core.RPCStatus;
import com.codelifeliwan.rpc.core.serializer.MessageSerializer;
import com.codelifeliwan.rpc.server.config.BeanScope;
import com.codelifeliwan.rpc.server.config.ClassInfo;
import com.codelifeliwan.rpc.server.config.Configuration;
import com.codelifeliwan.rpc.core.RPCByteBuffer;
import com.google.gson.Gson;
import org.apache.log4j.Logger;

import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author LiWan
 * <p>
 * 负责具体的消息序列化、方法调用等操作
 */
public class SocketHandler implements Runnable {
    private static final Logger log = Logger.getLogger(SocketHandler.class);

    private Configuration configuration;
    private Map<String, ClassInfo<?>> beans;
    private RPCByteBuffer buffer;

    /**
     * Method的缓存
     * key = beanName-methodName
     */
    private static Map<String, Method> methodsCache = new ConcurrentHashMap<>();

    /**
     * 消息会从该socket解析,channel.socket()
     * 并通过该socket写回消息,用完后关闭
     */
    private SocketChannel channel;

    public SocketHandler(Configuration configuration, SocketChannel channel, RPCByteBuffer buffer) {
        this.configuration = configuration;
        this.channel = channel;
        this.beans = configuration.getBeanClasses();
        this.buffer = buffer;
    }

    @Override
    public void run() {
        MessageSerializer serializer = configuration.getMessageSerializer();

        try {
            byte[] bytes = this.buffer.getByteArray();
            RPCDefaultMessage fromMessage = (RPCDefaultMessage) serializer.unSerializeMessage(new String(bytes), null, null);

            if (!beans.containsKey(fromMessage.getBeanName())) {
                String warn = "unknown message : " + fromMessage.getBeanName();
                log.error(warn);
                throw new Exception(warn);
            }

            ClassInfo bean = beans.get(fromMessage.getBeanName());

            Object beanObj = null;
            if (bean.getScope() == BeanScope.SINGLETON) {
                // 单例模式
                beanObj = bean.getDefaultObject();
            } else {
                // 原型模式
                beanObj = bean.getClazz().getConstructor().newInstance();
            }

            // 实际方法调用处理过程, 这次重新序列化类型
            RPCDefaultMessage response = processMessage(beanObj, fromMessage);

            // 将返回结果写回
            String responseStr = serializer.serializeMessage(response);
            channel.write(ByteBuffer.wrap(responseStr.getBytes()));
        } catch (Exception e) {
            e.printStackTrace();
            RPCDefaultMessage response = new RPCDefaultMessage();
            response.setStatus(RPCStatus.GENERAL_ERRPR);
            response.setValue("error:" + e.getMessage());
            try {
                String responseStr = serializer.serializeMessage(response);
                channel.write(ByteBuffer.wrap(responseStr.getBytes()));
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        } finally {
            try {
                channel.close();
            } catch (Exception e) {
                e.printStackTrace();
                log.error(e.getMessage());
            }
        }
    }


    /**
     * 具体调用方法实现
     *
     * @param bean    实例化的bean
     * @param message 客户端传来的消息
     * @return
     * @throws Exception
     */
    private RPCDefaultMessage processMessage(Object bean, RPCDefaultMessage message) throws Exception {
        String key = message.getBeanName() + "-" + message.getMethodName();
        if (!methodsCache.containsKey(key)) {
            synchronized (SocketHandler.class) {
                if (!methodsCache.containsKey(key)) {
                    Method[] methods = bean.getClass().getDeclaredMethods();
                    if (methods != null) {
                        for (Method m : methods) {
                            if (m.getName().equalsIgnoreCase(message.getMethodName())) {
                                methodsCache.put(key, m);
                                break;
                            }
                        }
                    }
                }
            }
        }

        // 利用反射机制来实现方法调用
        if (!methodsCache.containsKey(key)) throw new Exception("no such method : " + message.getMethodName());
        Method method = methodsCache.get(key);

        RPCDefaultMessage response = new RPCDefaultMessage();

        // 参数类型转化,避免因为json传输造成的类型不匹配问题
        Gson gson = new Gson();
        Object[] params = message.getParamValues();
        if (params == null) params = new Object[0];
        Class[] paramTypes = method.getParameterTypes();
        if (paramTypes == null) paramTypes = new Class[0];

        if (params.length != paramTypes.length) throw new Exception("method param(s) not match.");
        for (int i = 0; i < params.length; i++) {
            params[i] = gson.fromJson(gson.toJson(params[i]), paramTypes[i]);
        }

        response.setValue(method.invoke(bean, params));
        return response;
    }
}

欢迎不懂的小伙伴留言~

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,386评论 6 479
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,939评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,851评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,953评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,971评论 5 369
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,784评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,126评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,765评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,148评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,744评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,858评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,479评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,080评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,053评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,278评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,245评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,590评论 2 343